HADOOP-17296. ABFS: Force reads to be always of buffer size.

Contributed by Sneha Vijayarajan.

(cherry picked from commit 142941b96e)
This commit is contained in:
Sneha Vijayarajan 2020-11-27 19:52:34 +05:30 committed by Steve Loughran
parent 1520b84b36
commit a44890eb63
11 changed files with 635 additions and 68 deletions

View File

@ -201,6 +201,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
private int readAheadQueueDepth;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
DefaultValue = DEFAULT_READ_AHEAD_BLOCK_SIZE)
private int readAheadBlockSize;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_BUFFER_SIZE,
DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE)
private boolean alwaysReadBufferSize;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
@ -599,6 +609,14 @@ public class AbfsConfiguration{
return this.readAheadQueueDepth;
}
public int getReadAheadBlockSize() {
return this.readAheadBlockSize;
}
public boolean shouldReadBufferSizeAlways() {
return this.alwaysReadBufferSize;
}
public boolean isFlushEnabled() {
return this.enableFlush;
}

View File

@ -644,6 +644,9 @@ public class AzureBlobFileSystemStore implements Closeable {
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
.build();
}

View File

@ -75,6 +75,8 @@ public final class ConfigurationKeys {
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";

View File

@ -57,6 +57,8 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
@ -74,6 +76,7 @@ public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;

View File

@ -47,6 +47,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
private int readAheadBlockSize;
private final AbfsClient client;
private final Statistics statistics;
private final String path;
@ -56,6 +57,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean alwaysReadBufferSize;
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
@ -89,9 +91,16 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadEnabled = true;
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
}
public String getPath() {
@ -178,11 +187,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
if (alwaysReadBufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
}
if (bytesRead == -1) {
@ -223,16 +236,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
// First read to queue needs to be of readBufferSize and later
// of readAhead Block size
long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
// From next round onwards should be of readahead block size.
nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset);
}
// try reading from buffers first
@ -527,6 +543,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
return bytesFromRemoteRead;
}
@VisibleForTesting
public int getBufferSize() {
return bufferSize;
}
@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}
@VisibleForTesting
public boolean shouldAlwaysReadBufferSize() {
return alwaysReadBufferSize;
}
/**
* Get the statistics of the stream.
* @return a string value.

View File

@ -18,10 +18,15 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to hold extra input stream configs.
*/
public class AbfsInputStreamContext extends AbfsStreamContext {
// Retaining logger of AbfsInputStream
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
private int readBufferSize;
@ -29,6 +34,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean tolerateOobAppends;
private boolean alwaysReadBufferSize;
private int readAheadBlockSize;
private AbfsInputStreamStatistics streamStatistics;
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
@ -60,7 +69,27 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
return this;
}
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
return this;
}
public AbfsInputStreamContext withReadAheadBlockSize(
final int readAheadBlockSize) {
this.readAheadBlockSize = readAheadBlockSize;
return this;
}
public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
"fs.azure.read.request.size[={}] is configured for higher size than "
+ "fs.azure.read.readahead.blocksize[={}]. Auto-align "
+ "readAhead block size to be same as readRequestSize.",
readBufferSize, readAheadBlockSize);
readAheadBlockSize = readBufferSize;
}
// Validation of parameters to be done here.
return this;
}
@ -80,4 +109,13 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
public int getReadAheadBlockSize() {
return readAheadBlockSize;
}
}

View File

@ -28,6 +28,7 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -36,12 +37,14 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int ONE_KB = 1024;
private static final int ONE_MB = ONE_KB * ONE_KB;
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private static int blockSize = 4 * ONE_MB;
private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
@ -50,21 +53,37 @@ final class ReadBufferManager {
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
}
private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block
private static final ReentrantLock LOCK = new ReentrantLock();
static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
bufferManager = new ReadBufferManager();
bufferManager.init();
}
} finally {
LOCK.unlock();
}
}
return bufferManager;
}
static void setReadBufferManagerConfigs(int readAheadBlockSize) {
if (bufferManager == null) {
LOGGER.debug(
"ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}",
readAheadBlockSize);
blockSize = readAheadBlockSize;
}
}
private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
@ -124,10 +143,10 @@ final class ReadBufferManager {
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
}
@ -272,6 +291,7 @@ final class ReadBufferManager {
return evict(nodeToEvict);
}
LOGGER.trace("No buffer eligible for eviction");
// nothing can be evicted
return false;
}
@ -483,6 +503,67 @@ final class ReadBufferManager {
tryEvict();
}
/**
* Test method that can clean up the current state of readAhead buffers and
* the lists. Will also trigger a fresh init.
*/
@VisibleForTesting
void testResetReadBufferManager() {
synchronized (this) {
ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
for (ReadBuffer buf : completedReadList) {
if (buf != null) {
completedBuffers.add(buf);
}
}
for (ReadBuffer buf : completedBuffers) {
evict(buf);
}
readAheadQueue.clear();
inProgressList.clear();
completedReadList.clear();
freeList.clear();
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = null;
}
buffers = null;
resetBufferManager();
}
}
/**
* Reset buffer manager to null.
*/
@VisibleForTesting
static void resetBufferManager() {
bufferManager = null;
}
/**
* Reset readAhead buffer to needed readAhead block size and
* thresholdAgeMilliseconds.
* @param readAheadBlockSize
* @param thresholdAgeMilliseconds
*/
@VisibleForTesting
void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) {
setBlockSize(readAheadBlockSize);
setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
testResetReadBufferManager();
}
@VisibleForTesting
static void setBlockSize(int readAheadBlockSize) {
blockSize = readAheadBlockSize;
}
@VisibleForTesting
int getReadAheadBlockSize() {
return blockSize;
}
/**
* Test method that can mimic no free buffers scenario and also add a ReadBuffer
* into completedReadList. This readBuffer will get picked up by TryEvict()

View File

@ -789,6 +789,17 @@ to 100 MB). The default value will be 8388608 (8 MB).
bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
100 MB). The default value will be 4194304 (4 MB).
`fs.azure.read.alwaysReadBufferSize`: Read request size configured by
`fs.azure.read.request.size` will be honoured only when the reads done are in
sequential pattern. When the read pattern is detected to be random, read size
will be same as the buffer length provided by the calling process.
This config when set to true will force random reads to also read in same
request sizes as sequential reads. This is a means to have same read patterns
as of ADLS Gen1, as it does not differentiate read patterns and always reads by
the configured read request size. The default value for this config will be
false, where reads for the provided buffer length is done when random read
pattern is detected.
`fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
AbfsInputStream. In case the set value is negative the read ahead queue depth
will be set as Runtime.getRuntime().availableProcessors(). By default the value
@ -796,6 +807,11 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
doing only random reads (non-sequential) or you are seeing throttling, you
may try setting this value to 0.
`fs.azure.read.readahead.blocksize`: To set the read buffer size for the read
aheads. Specify the value in bytes. The value should be between 16384 to
104857600 both inclusive (16 KB to 100 MB). The default value will be
4194304 (4 MB).
To run under limited memory situations configure the following. Especially
when there are too many writes from the same process.

View File

@ -392,6 +392,14 @@ public abstract class AbstractAbfsIntegrationTest extends
return path;
}
public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
return fs.getAbfsStore();
}
public Path makeQualified(Path path) throws java.io.IOException {
return getFileSystem().makeQualified(path);
}
/**
* Create a path under the test path provided by
* {@link #getTestPath()}.

View File

@ -21,6 +21,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.UUID;
import org.junit.Assume;
import org.junit.Ignore;
@ -28,6 +29,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
@ -37,30 +39,43 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ETAG;
/**
* Test random read operation.
*/
public class ITestAzureBlobFileSystemRandomRead extends
AbstractAbfsScaleTest {
private static final int BYTE = 1;
private static final int THREE_BYTES = 3;
private static final int FIVE_BYTES = 5;
private static final int TWENTY_BYTES = 20;
private static final int THIRTY_BYTES = 30;
private static final int KILOBYTE = 1024;
private static final int MEGABYTE = KILOBYTE * KILOBYTE;
private static final int FOUR_MB = 4 * MEGABYTE;
private static final int NINE_MB = 9 * MEGABYTE;
private static final long TEST_FILE_SIZE = 8 * MEGABYTE;
private static final int MAX_ELAPSEDTIMEMS = 20;
private static final int SEQUENTIAL_READ_BUFFER_SIZE = 16 * KILOBYTE;
private static final int CREATE_BUFFER_SIZE = 26 * KILOBYTE;
private static final int SEEK_POSITION_ONE = 2* KILOBYTE;
private static final int SEEK_POSITION_TWO = 5 * KILOBYTE;
private static final int SEEK_POSITION_THREE = 10 * KILOBYTE;
private static final int SEEK_POSITION_FOUR = 4100 * KILOBYTE;
private static final Path TEST_FILE_PATH = new Path(
"/TestRandomRead.txt");
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * MEGABYTE;
private static final int DISABLED_READAHEAD_DEPTH = 0;
private static final String TEST_FILE_PREFIX = "/TestRandomRead";
private static final String WASB = "WASB";
private static final String ABFS = "ABFS";
private static long testFileLength = 0;
private static final Logger LOG =
LoggerFactory.getLogger(ITestAzureBlobFileSystemRandomRead.class);
@ -71,9 +86,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
@Test
public void testBasicRead() throws Exception {
assumeHugeFileExists();
Path testPath = new Path(TEST_FILE_PREFIX + "_testBasicRead");
assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
byte[] buffer = new byte[3 * MEGABYTE];
// forward seek and read a kilobyte into first kilobyte of bufferV2
@ -99,12 +115,14 @@ public class ITestAzureBlobFileSystemRandomRead extends
public void testRandomRead() throws Exception {
Assume.assumeFalse("This test does not support namespace enabled account",
this.getFileSystem().getIsNamespaceEnabled());
assumeHugeFileExists();
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomRead");
assumeHugeFileExists(testPath);
try (
FSDataInputStream inputStreamV1
= this.getFileSystem().open(TEST_FILE_PATH);
= this.getFileSystem().open(testPath);
FSDataInputStream inputStreamV2
= this.getWasbFileSystem().open(TEST_FILE_PATH);
= this.getWasbFileSystem().open(testPath);
) {
final int bufferSize = 4 * KILOBYTE;
byte[] bufferV1 = new byte[bufferSize];
@ -156,8 +174,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
*/
@Test
public void testSeekToNewSource() throws Exception {
assumeHugeFileExists();
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekToNewSource");
assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
assertFalse(inputStream.seekToNewSource(0));
}
}
@ -169,8 +189,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
*/
@Test
public void testSkipBounds() throws Exception {
assumeHugeFileExists();
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipBounds");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
long skipped = inputStream.skip(-1);
@ -208,8 +230,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
*/
@Test
public void testValidateSeekBounds() throws Exception {
assumeHugeFileExists();
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
Path testPath = new Path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
inputStream.seek(0);
@ -257,8 +281,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
*/
@Test
public void testSeekAndAvailableAndPosition() throws Exception {
assumeHugeFileExists();
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
@ -321,8 +347,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
*/
@Test
public void testSkipAndAvailableAndPosition() throws Exception {
assumeHugeFileExists();
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
@ -385,15 +413,16 @@ public class ITestAzureBlobFileSystemRandomRead extends
@Test
public void testSequentialReadAfterReverseSeekPerformance()
throws Exception {
assumeHugeFileExists();
Path testPath = new Path(TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
assumeHugeFileExists(testPath);
final int maxAttempts = 10;
final double maxAcceptableRatio = 1.01;
double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
double ratio = Double.MAX_VALUE;
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
beforeSeekElapsedMs = sequentialRead(ABFS,
beforeSeekElapsedMs = sequentialRead(ABFS, testPath,
this.getFileSystem(), false);
afterSeekElapsedMs = sequentialRead(ABFS,
afterSeekElapsedMs = sequentialRead(ABFS, testPath,
this.getFileSystem(), true);
ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
LOG.info((String.format(
@ -417,8 +446,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
public void testRandomReadPerformance() throws Exception {
Assume.assumeFalse("This test does not support namespace enabled account",
this.getFileSystem().getIsNamespaceEnabled());
createTestFile();
assumeHugeFileExists();
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
assumeHugeFileExists(testPath);
final AzureBlobFileSystem abFs = this.getFileSystem();
final NativeAzureFileSystem wasbFs = this.getWasbFileSystem();
@ -428,8 +457,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
double v1ElapsedMs = 0, v2ElapsedMs = 0;
double ratio = Double.MAX_VALUE;
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
v1ElapsedMs = randomRead(1, wasbFs);
v2ElapsedMs = randomRead(2, abFs);
v1ElapsedMs = randomRead(1, testPath, wasbFs);
v2ElapsedMs = randomRead(2, testPath, abFs);
ratio = v2ElapsedMs / v1ElapsedMs;
@ -448,15 +477,112 @@ public class ITestAzureBlobFileSystemRandomRead extends
ratio < maxAcceptableRatio);
}
/**
* With this test we should see a full buffer read being triggered in case
* alwaysReadBufferSize is on, else only the requested buffer size.
* Hence a seek done few bytes away from last read position will trigger
* a network read when alwaysReadBufferSize is off, whereas it will return
* from the internal buffer when it is on.
* Reading a full buffer size is the Gen1 behaviour.
* @throws Throwable
*/
@Test
public void testAlwaysReadBufferSizeConfig() throws Throwable {
testAlwaysReadBufferSizeConfig(false);
testAlwaysReadBufferSizeConfig(true);
}
public void testAlwaysReadBufferSizeConfig(boolean alwaysReadBufferSizeConfigValue)
throws Throwable {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.readaheadqueue.depth", "0");
config.set("fs.azure.read.alwaysReadBufferSize",
Boolean.toString(alwaysReadBufferSizeConfigValue));
final Path testFile = new Path("/FileName_"
+ UUID.randomUUID().toString());
final AzureBlobFileSystem fs = createTestFile(testFile, 16 * MEGABYTE,
1 * MEGABYTE, config);
String eTag = fs.getAbfsClient()
.getPathStatus(testFile.toUri().getPath(), false)
.getResult()
.getResponseHeader(ETAG);
TestAbfsInputStream testInputStream = new TestAbfsInputStream();
AbfsInputStream inputStream = testInputStream.getAbfsInputStream(
fs.getAbfsClient(),
testFile.getName(), ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, eTag,
DISABLED_READAHEAD_DEPTH, FOUR_MB,
alwaysReadBufferSizeConfigValue, FOUR_MB);
long connectionsAtStart = fs.getInstrumentationMap()
.get(GET_RESPONSES.getStatName());
long dateSizeReadStatAtStart = fs.getInstrumentationMap()
.get(BYTES_RECEIVED.getStatName());
long newReqCount = 0;
long newDataSizeRead = 0;
byte[] buffer20b = new byte[TWENTY_BYTES];
byte[] buffer30b = new byte[THIRTY_BYTES];
byte[] byteBuffer5 = new byte[FIVE_BYTES];
// first read
// if alwaysReadBufferSize is off, this is a sequential read
inputStream.read(byteBuffer5, 0, FIVE_BYTES);
newReqCount++;
newDataSizeRead += FOUR_MB;
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount,
fs.getInstrumentationMap());
assertAbfsStatistics(BYTES_RECEIVED,
dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
// second read beyond that the buffer holds
// if alwaysReadBufferSize is off, this is a random read. Reads only
// incoming buffer size
// else, reads a buffer size
inputStream.seek(NINE_MB);
inputStream.read(buffer20b, 0, BYTE);
newReqCount++;
if (alwaysReadBufferSizeConfigValue) {
newDataSizeRead += FOUR_MB;
} else {
newDataSizeRead += TWENTY_BYTES;
}
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
assertAbfsStatistics(BYTES_RECEIVED,
dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
// third read adjacent to second but not exactly sequential.
// if alwaysReadBufferSize is off, this is another random read
// else second read would have read this too.
inputStream.seek(NINE_MB + TWENTY_BYTES + THREE_BYTES);
inputStream.read(buffer30b, 0, THREE_BYTES);
if (!alwaysReadBufferSizeConfigValue) {
newReqCount++;
newDataSizeRead += THIRTY_BYTES;
}
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
assertAbfsStatistics(BYTES_RECEIVED, dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
}
private long sequentialRead(String version,
Path testPath,
FileSystem fs,
boolean afterReverseSeek) throws IOException {
byte[] buffer = new byte[SEQUENTIAL_READ_BUFFER_SIZE];
long totalBytesRead = 0;
long bytesRead = 0;
try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
long testFileLength = fs.getFileStatus(testPath).getLen();
try(FSDataInputStream inputStream = fs.open(testPath)) {
if (afterReverseSeek) {
while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
bytesRead = inputStream.read(buffer);
@ -487,14 +613,14 @@ public class ITestAzureBlobFileSystemRandomRead extends
}
}
private long randomRead(int version, FileSystem fs) throws Exception {
assumeHugeFileExists();
private long randomRead(int version, Path testPath, FileSystem fs) throws Exception {
assumeHugeFileExists(testPath);
final long minBytesToRead = 2 * MEGABYTE;
Random random = new Random();
byte[] buffer = new byte[8 * KILOBYTE];
long totalBytesRead = 0;
long bytesRead = 0;
try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
try(FSDataInputStream inputStream = fs.open(testPath)) {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
do {
bytesRead = inputStream.read(buffer);
@ -526,28 +652,48 @@ public class ITestAzureBlobFileSystemRandomRead extends
return bytes / 1000.0 * 8 / milliseconds;
}
private void createTestFile() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
if (fs.exists(TEST_FILE_PATH)) {
FileStatus status = fs.getFileStatus(TEST_FILE_PATH);
if (status.getLen() >= TEST_FILE_SIZE) {
return;
private long createTestFile(Path testPath) throws Exception {
createTestFile(testPath,
TEST_FILE_SIZE,
MEGABYTE,
null);
return TEST_FILE_SIZE;
}
private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
int createBufferSize, Configuration config) throws Exception {
AzureBlobFileSystem fs;
if (config == null) {
config = this.getRawConfiguration();
}
final AzureBlobFileSystem currentFs = getFileSystem();
fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
if (fs.exists(testFilePath)) {
FileStatus status = fs.getFileStatus(testFilePath);
if (status.getLen() == testFileSize) {
return fs;
}
}
byte[] buffer = new byte[CREATE_BUFFER_SIZE];
byte[] buffer = new byte[createBufferSize];
char character = 'a';
for (int i = 0; i < buffer.length; i++) {
buffer[i] = (byte) character;
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
}
LOG.info(String.format("Creating test file %s of size: %d ", TEST_FILE_PATH, TEST_FILE_SIZE));
LOG.info(String.format("Creating test file %s of size: %d ", testFilePath, testFileSize));
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try (FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
String bufferContents = new String(buffer);
int bytesWritten = 0;
while (bytesWritten < TEST_FILE_SIZE) {
while (bytesWritten < testFileSize) {
outputStream.write(buffer);
bytesWritten += buffer.length;
}
@ -557,18 +703,18 @@ public class ITestAzureBlobFileSystemRandomRead extends
outputStream.close();
closeTimer.end("time to close() output stream");
}
timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
timer.end("time to write %d KB", testFileSize / 1024);
return fs;
}
private void assumeHugeFileExists() throws Exception{
createTestFile();
private long assumeHugeFileExists(Path testPath) throws Exception{
long fileSize = createTestFile(testPath);
FileSystem fs = this.getFileSystem();
ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", TEST_FILE_PATH);
FileStatus status = fs.getFileStatus(TEST_FILE_PATH);
ContractTestUtils.assertIsFile(TEST_FILE_PATH, status);
assertTrue("File " + TEST_FILE_PATH + " is empty", status.getLen() > 0);
ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", testPath);
FileStatus status = fs.getFileStatus(testPath);
ContractTestUtils.assertIsFile(testPath, status);
assertTrue("File " + testPath + " is not of expected size " + fileSize + ":actual=" + status.getLen(), status.getLen() == fileSize);
return fileSize;
}
private void verifyConsistentReads(FSDataInputStream inputStreamV1,

View File

@ -22,10 +22,17 @@ import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@ -51,9 +58,17 @@ public class TestAbfsInputStream extends
private static final int ONE_KB = 1 * 1024;
private static final int TWO_KB = 2 * 1024;
private static final int THREE_KB = 3 * 1024;
private static final int SIXTEEN_KB = 16 * ONE_KB;
private static final int FORTY_EIGHT_KB = 48 * ONE_KB;
private static final int ONE_MB = 1 * 1024 * 1024;
private static final int FOUR_MB = 4 * ONE_MB;
private static final int EIGHT_MB = 8 * ONE_MB;
private static final int TEST_READAHEAD_DEPTH_2 = 2;
private static final int TEST_READAHEAD_DEPTH_4 = 4;
private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
@ -84,7 +99,7 @@ public class TestAbfsInputStream extends
null,
FORWARD_SLASH + fileName,
THREE_KB,
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
"eTag");
inputStream.setCachedSasToken(
@ -93,6 +108,33 @@ public class TestAbfsInputStream extends
return inputStream;
}
public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
String fileName,
int fileSize,
String eTag,
int readAheadQueueDepth,
int readBufferSize,
boolean alwaysReadBufferSize,
int readAheadBlockSize) {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
abfsClient,
null,
FORWARD_SLASH + fileName,
fileSize,
inputStreamContext.withReadBufferSize(readBufferSize)
.withReadAheadQueueDepth(readAheadQueueDepth)
.withShouldReadBufferSizeAlways(alwaysReadBufferSize)
.withReadAheadBlockSize(readAheadBlockSize),
eTag);
inputStream.setCachedSasToken(
TestCachedSASToken.getTestCachedSASTokenInstance());
return inputStream;
}
private void queueReadAheads(AbfsInputStream inputStream) {
// Mimic AbfsInputStream readAhead queue requests
ReadBufferManager.getBufferManager()
@ -496,4 +538,183 @@ public class TestAbfsInputStream extends
checkEvictedStatus(inputStream, 0, true);
}
/**
* Test readahead with different config settings for request request size and
* readAhead block size
* @throws Exception
*/
@Test
public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception {
// Set requestRequestSize = 4MB and readAheadBufferSize=8MB
resetReadBufferManager(FOUR_MB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
testReadAheadConfigs(FOUR_MB, TEST_READAHEAD_DEPTH_4, false, EIGHT_MB);
// Test for requestRequestSize =16KB and readAheadBufferSize=16KB
resetReadBufferManager(SIXTEEN_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
AbfsInputStream inputStream = testReadAheadConfigs(SIXTEEN_KB,
TEST_READAHEAD_DEPTH_2, true, SIXTEEN_KB);
testReadAheads(inputStream, SIXTEEN_KB, SIXTEEN_KB);
// Test for requestRequestSize =16KB and readAheadBufferSize=48KB
resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
inputStream = testReadAheadConfigs(SIXTEEN_KB, TEST_READAHEAD_DEPTH_2, true,
FORTY_EIGHT_KB);
testReadAheads(inputStream, SIXTEEN_KB, FORTY_EIGHT_KB);
// Test for requestRequestSize =48KB and readAheadBufferSize=16KB
resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
inputStream = testReadAheadConfigs(FORTY_EIGHT_KB, TEST_READAHEAD_DEPTH_2,
true,
SIXTEEN_KB);
testReadAheads(inputStream, FORTY_EIGHT_KB, SIXTEEN_KB);
}
private void testReadAheads(AbfsInputStream inputStream,
int readRequestSize,
int readAheadRequestSize)
throws Exception {
if (readRequestSize > readAheadRequestSize) {
readAheadRequestSize = readRequestSize;
}
byte[] firstReadBuffer = new byte[readRequestSize];
byte[] secondReadBuffer = new byte[readAheadRequestSize];
// get the expected bytes to compare
byte[] expectedFirstReadAheadBufferContents = new byte[readRequestSize];
byte[] expectedSecondReadAheadBufferContents = new byte[readAheadRequestSize];
getExpectedBufferData(0, readRequestSize, expectedFirstReadAheadBufferContents);
getExpectedBufferData(readRequestSize, readAheadRequestSize,
expectedSecondReadAheadBufferContents);
Assertions.assertThat(inputStream.read(firstReadBuffer, 0, readRequestSize))
.describedAs("Read should be of exact requested size")
.isEqualTo(readRequestSize);
assertTrue("Data mismatch found in RAH1",
Arrays.equals(firstReadBuffer,
expectedFirstReadAheadBufferContents));
Assertions.assertThat(inputStream.read(secondReadBuffer, 0, readAheadRequestSize))
.describedAs("Read should be of exact requested size")
.isEqualTo(readAheadRequestSize);
assertTrue("Data mismatch found in RAH2",
Arrays.equals(secondReadBuffer,
expectedSecondReadAheadBufferContents));
}
public AbfsInputStream testReadAheadConfigs(int readRequestSize,
int readAheadQueueDepth,
boolean alwaysReadBufferSizeEnabled,
int readAheadBlockSize) throws Exception {
Configuration
config = new Configuration(
this.getRawConfiguration());
config.set("fs.azure.read.request.size", Integer.toString(readRequestSize));
config.set("fs.azure.readaheadqueue.depth",
Integer.toString(readAheadQueueDepth));
config.set("fs.azure.read.alwaysReadBufferSize",
Boolean.toString(alwaysReadBufferSizeEnabled));
config.set("fs.azure.read.readahead.blocksize",
Integer.toString(readAheadBlockSize));
if (readRequestSize > readAheadBlockSize) {
readAheadBlockSize = readRequestSize;
}
Path testPath = new Path(
"/testReadAheadConfigs");
final AzureBlobFileSystem fs = createTestFile(testPath,
ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config);
byte[] byteBuffer = new byte[ONE_MB];
AbfsInputStream inputStream = this.getAbfsStore(fs)
.openFileForRead(testPath, null);
Assertions.assertThat(inputStream.getBufferSize())
.describedAs("Unexpected AbfsInputStream buffer size")
.isEqualTo(readRequestSize);
Assertions.assertThat(inputStream.getReadAheadQueueDepth())
.describedAs("Unexpected ReadAhead queue depth")
.isEqualTo(readAheadQueueDepth);
Assertions.assertThat(inputStream.shouldAlwaysReadBufferSize())
.describedAs("Unexpected AlwaysReadBufferSize settings")
.isEqualTo(alwaysReadBufferSizeEnabled);
Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize())
.describedAs("Unexpected readAhead block size")
.isEqualTo(readAheadBlockSize);
return inputStream;
}
private void getExpectedBufferData(int offset, int length, byte[] b) {
boolean startFillingIn = false;
int indexIntoBuffer = 0;
char character = 'a';
for (int i = 0; i < (offset + length); i++) {
if (i == offset) {
startFillingIn = true;
}
if ((startFillingIn) && (indexIntoBuffer < length)) {
b[indexIntoBuffer] = (byte) character;
indexIntoBuffer++;
}
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
}
}
private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
Configuration config) throws Exception {
AzureBlobFileSystem fs;
if (config == null) {
fs = this.getFileSystem();
} else {
final AzureBlobFileSystem currentFs = getFileSystem();
fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
}
if (fs.exists(testFilePath)) {
FileStatus status = fs.getFileStatus(testFilePath);
if (status.getLen() >= testFileSize) {
return fs;
}
}
byte[] buffer = new byte[EIGHT_MB];
char character = 'a';
for (int i = 0; i < buffer.length; i++) {
buffer[i] = (byte) character;
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
}
try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
int bytesWritten = 0;
while (bytesWritten < testFileSize) {
outputStream.write(buffer);
bytesWritten += buffer.length;
}
}
Assertions.assertThat(fs.getFileStatus(testFilePath).getLen())
.describedAs("File not created of expected size")
.isEqualTo(testFileSize);
return fs;
}
private void resetReadBufferManager(int bufferSize, int threshold) {
ReadBufferManager.getBufferManager()
.testResetReadBufferManager(bufferSize, threshold);
// Trigger GC as aggressive recreation of ReadBufferManager buffers
// by successive tests can lead to OOM based on the dev VM/machine capacity.
System.gc();
}
}