HADOOP-17770 WASB : Support disabling buffered reads in positional reads (#3233)
This commit is contained in:
parent
c4f1db1966
commit
913d06ad4d
|
@ -83,4 +83,17 @@
|
|||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
|
||||
<!-- This field is instance of BlockBlobInputStream and read(long, byte[], int, int)
|
||||
calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
|
||||
is configured false. Super class FSInputStream's implementation is having
|
||||
proper synchronization.
|
||||
When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
|
||||
implementation of blob read. Here we don't use any of the InputStream's
|
||||
shared resource (buffer) and also don't change any cursor position etc.
|
||||
So its safe to go with unsynchronized way of read. -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsInputStream" />
|
||||
<Field name="in" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -40,6 +40,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -235,6 +236,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
*/
|
||||
public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";
|
||||
|
||||
/**
|
||||
* Optional config to enable a lock free pread which will bypass buffer in
|
||||
* BlockBlobInputStream.
|
||||
* This is not a config which can be set at cluster level. It can be used as
|
||||
* an option on FutureDataInputStreamBuilder.
|
||||
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
|
||||
*/
|
||||
public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE =
|
||||
"fs.azure.block.blob.buffered.pread.disable";
|
||||
|
||||
/**
|
||||
* The set of directories where we should apply atomic folder rename
|
||||
* synchronized with createNonRecursive.
|
||||
|
@ -1577,8 +1588,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
* Opens a new input stream for the given blob (page or block blob)
|
||||
* to read its data.
|
||||
*/
|
||||
private InputStream openInputStream(CloudBlobWrapper blob)
|
||||
throws StorageException, IOException {
|
||||
private InputStream openInputStream(CloudBlobWrapper blob,
|
||||
Optional<Configuration> options) throws StorageException, IOException {
|
||||
if (blob instanceof CloudBlockBlobWrapper) {
|
||||
LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
|
||||
switch(inputStreamVersion) {
|
||||
|
@ -1586,9 +1597,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
return blob.openInputStream(getDownloadOptions(),
|
||||
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
||||
case 2:
|
||||
boolean bufferedPreadDisabled = options.map(c -> c
|
||||
.getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false))
|
||||
.orElse(false);
|
||||
return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
|
||||
getDownloadOptions(),
|
||||
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
||||
getInstrumentedContext(isConcurrentOOBAppendAllowed()),
|
||||
bufferedPreadDisabled);
|
||||
default:
|
||||
throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
|
||||
}
|
||||
|
@ -2262,6 +2277,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
@Override
|
||||
public InputStream retrieve(String key, long startByteOffset)
|
||||
throws AzureException, IOException {
|
||||
return retrieve(key, startByteOffset, Optional.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream retrieve(String key, long startByteOffset,
|
||||
Optional<Configuration> options) throws AzureException, IOException {
|
||||
try {
|
||||
// Check if a session exists, if not create a session with the
|
||||
// Azure storage server.
|
||||
|
@ -2273,7 +2294,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
checkContainer(ContainerAccessType.PureRead);
|
||||
|
||||
InputStream inputStream = openInputStream(getBlobReference(key));
|
||||
InputStream inputStream = openInputStream(getBlobReference(key), options);
|
||||
if (startByteOffset > 0) {
|
||||
// Skip bytes and ignore return value. This is okay
|
||||
// because if you try to skip too far you will be positioned
|
||||
|
@ -2824,7 +2845,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
OutputStream opStream = null;
|
||||
try {
|
||||
if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
|
||||
ipStream = openInputStream(srcBlob);
|
||||
ipStream = openInputStream(srcBlob, Optional.empty());
|
||||
opStream = openOutputStream(dstBlob);
|
||||
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
|
||||
int len;
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.microsoft.azure.storage.StorageException;
|
|||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
|
||||
|
||||
|
@ -36,10 +37,11 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
|
|||
* random access and seek. Random access performance is improved by several
|
||||
* orders of magnitude.
|
||||
*/
|
||||
final class BlockBlobInputStream extends InputStream implements Seekable {
|
||||
final class BlockBlobInputStream extends FSInputStream {
|
||||
private final CloudBlockBlobWrapper blob;
|
||||
private final BlobRequestOptions options;
|
||||
private final OperationContext opContext;
|
||||
private final boolean bufferedPreadDisabled;
|
||||
private InputStream blobInputStream = null;
|
||||
private int minimumReadSizeInBytes = 0;
|
||||
private long streamPositionAfterLastRead = -1;
|
||||
|
@ -64,10 +66,12 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
*/
|
||||
BlockBlobInputStream(CloudBlockBlobWrapper blob,
|
||||
BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException {
|
||||
OperationContext opContext, boolean bufferedPreadDisabled)
|
||||
throws IOException {
|
||||
this.blob = blob;
|
||||
this.options = options;
|
||||
this.opContext = opContext;
|
||||
this.bufferedPreadDisabled = bufferedPreadDisabled;
|
||||
|
||||
this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
|
||||
|
||||
|
@ -263,6 +267,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
synchronized (this) {
|
||||
checkState();
|
||||
}
|
||||
if (!bufferedPreadDisabled) {
|
||||
// This will do a seek + read in which the streamBuffer will get used.
|
||||
return super.read(position, buffer, offset, length);
|
||||
}
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
if (length == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (position >= streamLength) {
|
||||
throw new EOFException("position is beyond stream capacity");
|
||||
}
|
||||
MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length);
|
||||
long bytesToRead = Math.min(minimumReadSizeInBytes,
|
||||
Math.min(os.capacity(), streamLength - position));
|
||||
try {
|
||||
blob.downloadRange(position, bytesToRead, os, options, opContext);
|
||||
} catch (StorageException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
int bytesRead = os.size();
|
||||
if (bytesRead == 0) {
|
||||
// This may happen if the blob was modified after the length was obtained.
|
||||
throw new EOFException("End of stream reached unexpectedly.");
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads up to <code>len</code> bytes of data from the input stream into an
|
||||
* array of bytes.
|
||||
|
|
|
@ -33,11 +33,14 @@ import java.util.Date;
|
|||
import java.util.EnumSet;
|
||||
import java.util.TimeZone;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Stack;
|
||||
import java.util.HashMap;
|
||||
|
||||
|
@ -61,6 +64,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
|
@ -70,6 +74,8 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
|||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
|
||||
import org.apache.hadoop.fs.impl.OpenFileParameters;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -79,6 +85,7 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.util.LambdaUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
|
@ -915,6 +922,43 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
// SpotBugs reports bug type IS2_INCONSISTENT_SYNC here.
|
||||
// This report is not valid here.
|
||||
// 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int)
|
||||
// calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
|
||||
// is configured false. Super class FSInputStream's implementation is having
|
||||
// proper synchronization.
|
||||
// When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
|
||||
// implementation of blob read. Here we don't use any of the InputStream's
|
||||
// shared resource (buffer) and also don't change any cursor position etc.
|
||||
// So its safe to go with unsynchronized way of read.
|
||||
if (in instanceof PositionedReadable) {
|
||||
try {
|
||||
int result = ((PositionedReadable) this.in).read(position, buffer,
|
||||
offset, length);
|
||||
if (null != statistics && result > 0) {
|
||||
statistics.incrementBytesRead(result);
|
||||
}
|
||||
return result;
|
||||
} catch (IOException e) {
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
if (innerException instanceof StorageException) {
|
||||
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
||||
+ " Exception details: {} Error Code : {}",
|
||||
key, e, ((StorageException) innerException).getErrorCode());
|
||||
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return super.read(position, buffer, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (!closed) {
|
||||
|
@ -3043,6 +3087,12 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
|
||||
return open(f, bufferSize, Optional.empty());
|
||||
}
|
||||
|
||||
private FSDataInputStream open(Path f, int bufferSize,
|
||||
Optional<Configuration> options)
|
||||
throws FileNotFoundException, IOException {
|
||||
|
||||
LOG.debug("Opening file: {}", f.toString());
|
||||
|
||||
|
@ -3077,7 +3127,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
InputStream inputStream;
|
||||
try {
|
||||
inputStream = store.retrieve(key);
|
||||
inputStream = store.retrieve(key, 0, options);
|
||||
} catch(Exception ex) {
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
|
@ -3094,6 +3144,18 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
|
||||
OpenFileParameters parameters) throws IOException {
|
||||
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
||||
parameters.getMandatoryKeys(),
|
||||
Collections.emptySet(),
|
||||
"for " + path);
|
||||
return LambdaUtils.eval(
|
||||
new CompletableFuture<>(), () ->
|
||||
open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -50,6 +51,9 @@ interface NativeFileSystemStore {
|
|||
|
||||
InputStream retrieve(String key, long byteRangeStart) throws IOException;
|
||||
|
||||
InputStream retrieve(String key, long byteRangeStart,
|
||||
Optional<Configuration> options) throws IOException;
|
||||
|
||||
DataOutputStream storefile(String keyEncoded,
|
||||
PermissionStatus permissionStatus,
|
||||
String key) throws AzureException;
|
||||
|
|
|
@ -545,6 +545,17 @@ The maximum number of entries that that cache can hold can be customized using t
|
|||
</property>
|
||||
```
|
||||
|
||||
### Performance optimization configurations
|
||||
|
||||
`fs.azure.block.blob.buffered.pread.disable`: By default the positional read API will do a
|
||||
seek and read on input stream. This read will fill the buffer cache in
|
||||
BlockBlobInputStream. If this configuration is true it will skip usage of buffer and do a
|
||||
lock free call for reading from blob. This optimization is very much helpful for HBase kind
|
||||
of short random read over a shared InputStream instance.
|
||||
Note: This is not a config which can be set at cluster level. It can be used as
|
||||
an option on FutureDataInputStreamBuilder.
|
||||
See FileSystem#openFile(Path path)
|
||||
|
||||
## Further Reading
|
||||
|
||||
* [Testing the Azure WASB client](testing_azure.html).
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
|
||||
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
|
||||
|
@ -306,6 +307,61 @@ public class ITestBlockBlobInputStream extends AbstractAzureScaleTest {
|
|||
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_202_PosReadTest() throws Exception {
|
||||
assumeHugeFileExists();
|
||||
FutureDataInputStreamBuilder builder = accountUsingInputStreamV2
|
||||
.getFileSystem().openFile(TEST_FILE_PATH);
|
||||
builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true);
|
||||
try (
|
||||
FSDataInputStream inputStreamV1
|
||||
= accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
|
||||
FSDataInputStream inputStreamV2
|
||||
= accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
|
||||
FSDataInputStream inputStreamV2NoBuffer = builder.build().get();
|
||||
) {
|
||||
final int bufferSize = 4 * KILOBYTE;
|
||||
byte[] bufferV1 = new byte[bufferSize];
|
||||
byte[] bufferV2 = new byte[bufferSize];
|
||||
byte[] bufferV2NoBuffer = new byte[bufferSize];
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0,
|
||||
bufferV1, bufferV2, bufferV2NoBuffer);
|
||||
|
||||
int pos = 2 * KILOBYTE;
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
|
||||
bufferV1, bufferV2, bufferV2NoBuffer);
|
||||
|
||||
pos = 10 * KILOBYTE;
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
|
||||
bufferV1, bufferV2, bufferV2NoBuffer);
|
||||
|
||||
pos = 4100 * KILOBYTE;
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
|
||||
bufferV1, bufferV2, bufferV2NoBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
|
||||
FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer,
|
||||
int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer)
|
||||
throws IOException {
|
||||
int size = bufferV1.length;
|
||||
int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size);
|
||||
assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
|
||||
|
||||
int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size);
|
||||
assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
|
||||
|
||||
int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos,
|
||||
bufferV2NoBuffer, 0, size);
|
||||
assertEquals("Bytes read from V2 stream (buffered pread disabled)", size,
|
||||
numBytesReadV2NoBuffer);
|
||||
|
||||
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
|
||||
assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.markSupported.
|
||||
* @throws IOException
|
||||
|
|
Loading…
Reference in New Issue