HADOOP-17296. ABFS: Force reads to be always of buffer size.
Contributed by Sneha Vijayarajan.
This commit is contained in:
parent
03b4e98971
commit
142941b96e
|
@ -201,6 +201,16 @@ public class AbfsConfiguration{
|
||||||
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
|
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
|
||||||
private int readAheadQueueDepth;
|
private int readAheadQueueDepth;
|
||||||
|
|
||||||
|
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE,
|
||||||
|
MinValue = MIN_BUFFER_SIZE,
|
||||||
|
MaxValue = MAX_BUFFER_SIZE,
|
||||||
|
DefaultValue = DEFAULT_READ_AHEAD_BLOCK_SIZE)
|
||||||
|
private int readAheadBlockSize;
|
||||||
|
|
||||||
|
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_BUFFER_SIZE,
|
||||||
|
DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE)
|
||||||
|
private boolean alwaysReadBufferSize;
|
||||||
|
|
||||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
|
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
|
||||||
DefaultValue = DEFAULT_ENABLE_FLUSH)
|
DefaultValue = DEFAULT_ENABLE_FLUSH)
|
||||||
private boolean enableFlush;
|
private boolean enableFlush;
|
||||||
|
@ -599,6 +609,14 @@ public class AbfsConfiguration{
|
||||||
return this.readAheadQueueDepth;
|
return this.readAheadQueueDepth;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getReadAheadBlockSize() {
|
||||||
|
return this.readAheadBlockSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shouldReadBufferSizeAlways() {
|
||||||
|
return this.alwaysReadBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isFlushEnabled() {
|
public boolean isFlushEnabled() {
|
||||||
return this.enableFlush;
|
return this.enableFlush;
|
||||||
}
|
}
|
||||||
|
|
|
@ -644,6 +644,9 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
|
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
|
||||||
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
|
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
|
||||||
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
|
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
|
||||||
|
.withShouldReadBufferSizeAlways(
|
||||||
|
abfsConfiguration.shouldReadBufferSizeAlways())
|
||||||
|
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,6 +75,8 @@ public final class ConfigurationKeys {
|
||||||
* Default is empty. **/
|
* Default is empty. **/
|
||||||
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
|
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
|
||||||
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
|
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
|
||||||
|
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
|
||||||
|
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
|
||||||
/** Provides a config control to enable or disable ABFS Flush operations -
|
/** Provides a config control to enable or disable ABFS Flush operations -
|
||||||
* HFlush and HSync. Default is true. **/
|
* HFlush and HSync. Default is true. **/
|
||||||
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
|
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
|
||||||
|
|
|
@ -57,6 +57,8 @@ public final class FileSystemConfigurations {
|
||||||
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
|
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
|
||||||
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||||
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||||
|
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
|
||||||
|
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
|
||||||
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
|
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
|
||||||
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
|
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
|
||||||
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
|
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
|
||||||
|
@ -74,6 +76,7 @@ public final class FileSystemConfigurations {
|
||||||
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
|
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
|
||||||
|
|
||||||
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
||||||
|
|
||||||
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
||||||
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
|
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
|
||||||
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
|
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
|
||||||
|
|
|
@ -47,6 +47,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
StreamCapabilities {
|
StreamCapabilities {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
|
||||||
|
|
||||||
|
private int readAheadBlockSize;
|
||||||
private final AbfsClient client;
|
private final AbfsClient client;
|
||||||
private final Statistics statistics;
|
private final Statistics statistics;
|
||||||
private final String path;
|
private final String path;
|
||||||
|
@ -56,6 +57,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
private final String eTag; // eTag of the path when InputStream are created
|
private final String eTag; // eTag of the path when InputStream are created
|
||||||
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
|
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
|
||||||
private final boolean readAheadEnabled; // whether enable readAhead;
|
private final boolean readAheadEnabled; // whether enable readAhead;
|
||||||
|
private final boolean alwaysReadBufferSize;
|
||||||
|
|
||||||
// SAS tokens can be re-used until they expire
|
// SAS tokens can be re-used until they expire
|
||||||
private CachedSASToken cachedSasToken;
|
private CachedSASToken cachedSasToken;
|
||||||
|
@ -89,9 +91,16 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
|
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
|
||||||
this.eTag = eTag;
|
this.eTag = eTag;
|
||||||
this.readAheadEnabled = true;
|
this.readAheadEnabled = true;
|
||||||
|
this.alwaysReadBufferSize
|
||||||
|
= abfsInputStreamContext.shouldReadBufferSizeAlways();
|
||||||
this.cachedSasToken = new CachedSASToken(
|
this.cachedSasToken = new CachedSASToken(
|
||||||
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
||||||
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
|
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
|
||||||
|
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
|
||||||
|
|
||||||
|
// Propagate the config values to ReadBufferManager so that the first instance
|
||||||
|
// to initialize can set the readAheadBlockSize
|
||||||
|
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPath() {
|
public String getPath() {
|
||||||
|
@ -178,11 +187,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
buffer = new byte[bufferSize];
|
buffer = new byte[bufferSize];
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enable readAhead when reading sequentially
|
if (alwaysReadBufferSize) {
|
||||||
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
|
|
||||||
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
|
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
|
||||||
} else {
|
} else {
|
||||||
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
|
// Enable readAhead when reading sequentially
|
||||||
|
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
|
||||||
|
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
|
||||||
|
} else {
|
||||||
|
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytesRead == -1) {
|
if (bytesRead == -1) {
|
||||||
|
@ -223,16 +236,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
|
|
||||||
// queue read-aheads
|
// queue read-aheads
|
||||||
int numReadAheads = this.readAheadQueueDepth;
|
int numReadAheads = this.readAheadQueueDepth;
|
||||||
long nextSize;
|
|
||||||
long nextOffset = position;
|
long nextOffset = position;
|
||||||
|
// First read to queue needs to be of readBufferSize and later
|
||||||
|
// of readAhead Block size
|
||||||
|
long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
|
||||||
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
|
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
|
||||||
while (numReadAheads > 0 && nextOffset < contentLength) {
|
while (numReadAheads > 0 && nextOffset < contentLength) {
|
||||||
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
|
|
||||||
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
|
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
|
||||||
nextOffset, nextSize);
|
nextOffset, nextSize);
|
||||||
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
|
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
|
||||||
nextOffset = nextOffset + nextSize;
|
nextOffset = nextOffset + nextSize;
|
||||||
numReadAheads--;
|
numReadAheads--;
|
||||||
|
// From next round onwards should be of readahead block size.
|
||||||
|
nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
// try reading from buffers first
|
// try reading from buffers first
|
||||||
|
@ -527,6 +543,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
return bytesFromRemoteRead;
|
return bytesFromRemoteRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getBufferSize() {
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getReadAheadQueueDepth() {
|
||||||
|
return readAheadQueueDepth;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean shouldAlwaysReadBufferSize() {
|
||||||
|
return alwaysReadBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the statistics of the stream.
|
* Get the statistics of the stream.
|
||||||
* @return a string value.
|
* @return a string value.
|
||||||
|
|
|
@ -18,10 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.azurebfs.services;
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to hold extra input stream configs.
|
* Class to hold extra input stream configs.
|
||||||
*/
|
*/
|
||||||
public class AbfsInputStreamContext extends AbfsStreamContext {
|
public class AbfsInputStreamContext extends AbfsStreamContext {
|
||||||
|
// Retaining logger of AbfsInputStream
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
|
||||||
|
|
||||||
private int readBufferSize;
|
private int readBufferSize;
|
||||||
|
|
||||||
|
@ -29,6 +34,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
||||||
|
|
||||||
private boolean tolerateOobAppends;
|
private boolean tolerateOobAppends;
|
||||||
|
|
||||||
|
private boolean alwaysReadBufferSize;
|
||||||
|
|
||||||
|
private int readAheadBlockSize;
|
||||||
|
|
||||||
private AbfsInputStreamStatistics streamStatistics;
|
private AbfsInputStreamStatistics streamStatistics;
|
||||||
|
|
||||||
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
|
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
|
||||||
|
@ -60,7 +69,27 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
|
||||||
|
final boolean alwaysReadBufferSize) {
|
||||||
|
this.alwaysReadBufferSize = alwaysReadBufferSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsInputStreamContext withReadAheadBlockSize(
|
||||||
|
final int readAheadBlockSize) {
|
||||||
|
this.readAheadBlockSize = readAheadBlockSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public AbfsInputStreamContext build() {
|
public AbfsInputStreamContext build() {
|
||||||
|
if (readBufferSize > readAheadBlockSize) {
|
||||||
|
LOG.debug(
|
||||||
|
"fs.azure.read.request.size[={}] is configured for higher size than "
|
||||||
|
+ "fs.azure.read.readahead.blocksize[={}]. Auto-align "
|
||||||
|
+ "readAhead block size to be same as readRequestSize.",
|
||||||
|
readBufferSize, readAheadBlockSize);
|
||||||
|
readAheadBlockSize = readBufferSize;
|
||||||
|
}
|
||||||
// Validation of parameters to be done here.
|
// Validation of parameters to be done here.
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -80,4 +109,13 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
||||||
public AbfsInputStreamStatistics getStreamStatistics() {
|
public AbfsInputStreamStatistics getStreamStatistics() {
|
||||||
return streamStatistics;
|
return streamStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean shouldReadBufferSizeAlways() {
|
||||||
|
return alwaysReadBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getReadAheadBlockSize() {
|
||||||
|
return readAheadBlockSize;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.LinkedList;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Stack;
|
import java.util.Stack;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -36,12 +37,14 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
|
||||||
*/
|
*/
|
||||||
final class ReadBufferManager {
|
final class ReadBufferManager {
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
|
||||||
|
private static final int ONE_KB = 1024;
|
||||||
|
private static final int ONE_MB = ONE_KB * ONE_KB;
|
||||||
|
|
||||||
private static final int NUM_BUFFERS = 16;
|
private static final int NUM_BUFFERS = 16;
|
||||||
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
|
|
||||||
private static final int NUM_THREADS = 8;
|
private static final int NUM_THREADS = 8;
|
||||||
private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
|
private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
|
||||||
|
|
||||||
|
private static int blockSize = 4 * ONE_MB;
|
||||||
private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
|
private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
|
||||||
private Thread[] threads = new Thread[NUM_THREADS];
|
private Thread[] threads = new Thread[NUM_THREADS];
|
||||||
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
|
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
|
||||||
|
@ -50,21 +53,37 @@ final class ReadBufferManager {
|
||||||
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
|
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
|
||||||
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
|
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
|
||||||
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
|
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
|
||||||
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
|
private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block
|
||||||
|
private static final ReentrantLock LOCK = new ReentrantLock();
|
||||||
static {
|
|
||||||
BUFFER_MANAGER = new ReadBufferManager();
|
|
||||||
BUFFER_MANAGER.init();
|
|
||||||
}
|
|
||||||
|
|
||||||
static ReadBufferManager getBufferManager() {
|
static ReadBufferManager getBufferManager() {
|
||||||
return BUFFER_MANAGER;
|
if (bufferManager == null) {
|
||||||
|
LOCK.lock();
|
||||||
|
try {
|
||||||
|
if (bufferManager == null) {
|
||||||
|
bufferManager = new ReadBufferManager();
|
||||||
|
bufferManager.init();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
LOCK.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bufferManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setReadBufferManagerConfigs(int readAheadBlockSize) {
|
||||||
|
if (bufferManager == null) {
|
||||||
|
LOGGER.debug(
|
||||||
|
"ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}",
|
||||||
|
readAheadBlockSize);
|
||||||
|
blockSize = readAheadBlockSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init() {
|
private void init() {
|
||||||
buffers = new byte[NUM_BUFFERS][];
|
buffers = new byte[NUM_BUFFERS][];
|
||||||
for (int i = 0; i < NUM_BUFFERS; i++) {
|
for (int i = 0; i < NUM_BUFFERS; i++) {
|
||||||
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
|
buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC
|
||||||
freeList.add(i);
|
freeList.add(i);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < NUM_THREADS; i++) {
|
for (int i = 0; i < NUM_THREADS; i++) {
|
||||||
|
@ -124,10 +143,10 @@ final class ReadBufferManager {
|
||||||
buffer.setBufferindex(bufferIndex);
|
buffer.setBufferindex(bufferIndex);
|
||||||
readAheadQueue.add(buffer);
|
readAheadQueue.add(buffer);
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
if (LOGGER.isTraceEnabled()) {
|
||||||
if (LOGGER.isTraceEnabled()) {
|
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
|
||||||
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
|
stream.getPath(), requestedOffset, buffer.getBufferindex());
|
||||||
stream.getPath(), requestedOffset, buffer.getBufferindex());
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,6 +291,7 @@ final class ReadBufferManager {
|
||||||
return evict(nodeToEvict);
|
return evict(nodeToEvict);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOGGER.trace("No buffer eligible for eviction");
|
||||||
// nothing can be evicted
|
// nothing can be evicted
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -483,6 +503,67 @@ final class ReadBufferManager {
|
||||||
tryEvict();
|
tryEvict();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test method that can clean up the current state of readAhead buffers and
|
||||||
|
* the lists. Will also trigger a fresh init.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void testResetReadBufferManager() {
|
||||||
|
synchronized (this) {
|
||||||
|
ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
|
||||||
|
for (ReadBuffer buf : completedReadList) {
|
||||||
|
if (buf != null) {
|
||||||
|
completedBuffers.add(buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ReadBuffer buf : completedBuffers) {
|
||||||
|
evict(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
readAheadQueue.clear();
|
||||||
|
inProgressList.clear();
|
||||||
|
completedReadList.clear();
|
||||||
|
freeList.clear();
|
||||||
|
for (int i = 0; i < NUM_BUFFERS; i++) {
|
||||||
|
buffers[i] = null;
|
||||||
|
}
|
||||||
|
buffers = null;
|
||||||
|
resetBufferManager();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset buffer manager to null.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static void resetBufferManager() {
|
||||||
|
bufferManager = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset readAhead buffer to needed readAhead block size and
|
||||||
|
* thresholdAgeMilliseconds.
|
||||||
|
* @param readAheadBlockSize
|
||||||
|
* @param thresholdAgeMilliseconds
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) {
|
||||||
|
setBlockSize(readAheadBlockSize);
|
||||||
|
setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
|
||||||
|
testResetReadBufferManager();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static void setBlockSize(int readAheadBlockSize) {
|
||||||
|
blockSize = readAheadBlockSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getReadAheadBlockSize() {
|
||||||
|
return blockSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test method that can mimic no free buffers scenario and also add a ReadBuffer
|
* Test method that can mimic no free buffers scenario and also add a ReadBuffer
|
||||||
* into completedReadList. This readBuffer will get picked up by TryEvict()
|
* into completedReadList. This readBuffer will get picked up by TryEvict()
|
||||||
|
|
|
@ -789,6 +789,17 @@ to 100 MB). The default value will be 8388608 (8 MB).
|
||||||
bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
|
bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
|
||||||
100 MB). The default value will be 4194304 (4 MB).
|
100 MB). The default value will be 4194304 (4 MB).
|
||||||
|
|
||||||
|
`fs.azure.read.alwaysReadBufferSize`: Read request size configured by
|
||||||
|
`fs.azure.read.request.size` will be honoured only when the reads done are in
|
||||||
|
sequential pattern. When the read pattern is detected to be random, read size
|
||||||
|
will be same as the buffer length provided by the calling process.
|
||||||
|
This config when set to true will force random reads to also read in same
|
||||||
|
request sizes as sequential reads. This is a means to have same read patterns
|
||||||
|
as of ADLS Gen1, as it does not differentiate read patterns and always reads by
|
||||||
|
the configured read request size. The default value for this config will be
|
||||||
|
false, where reads for the provided buffer length is done when random read
|
||||||
|
pattern is detected.
|
||||||
|
|
||||||
`fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
|
`fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
|
||||||
AbfsInputStream. In case the set value is negative the read ahead queue depth
|
AbfsInputStream. In case the set value is negative the read ahead queue depth
|
||||||
will be set as Runtime.getRuntime().availableProcessors(). By default the value
|
will be set as Runtime.getRuntime().availableProcessors(). By default the value
|
||||||
|
@ -796,6 +807,11 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
|
||||||
doing only random reads (non-sequential) or you are seeing throttling, you
|
doing only random reads (non-sequential) or you are seeing throttling, you
|
||||||
may try setting this value to 0.
|
may try setting this value to 0.
|
||||||
|
|
||||||
|
`fs.azure.read.readahead.blocksize`: To set the read buffer size for the read
|
||||||
|
aheads. Specify the value in bytes. The value should be between 16384 to
|
||||||
|
104857600 both inclusive (16 KB to 100 MB). The default value will be
|
||||||
|
4194304 (4 MB).
|
||||||
|
|
||||||
To run under limited memory situations configure the following. Especially
|
To run under limited memory situations configure the following. Especially
|
||||||
when there are too many writes from the same process.
|
when there are too many writes from the same process.
|
||||||
|
|
||||||
|
|
|
@ -392,6 +392,14 @@ public abstract class AbstractAbfsIntegrationTest extends
|
||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
|
||||||
|
return fs.getAbfsStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path makeQualified(Path path) throws java.io.IOException {
|
||||||
|
return getFileSystem().makeQualified(path);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a path under the test path provided by
|
* Create a path under the test path provided by
|
||||||
* {@link #getTestPath()}.
|
* {@link #getTestPath()}.
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
@ -28,6 +29,7 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||||
|
@ -37,30 +39,43 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ETAG;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test random read operation.
|
* Test random read operation.
|
||||||
*/
|
*/
|
||||||
public class ITestAzureBlobFileSystemRandomRead extends
|
public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
AbstractAbfsScaleTest {
|
AbstractAbfsScaleTest {
|
||||||
|
private static final int BYTE = 1;
|
||||||
|
private static final int THREE_BYTES = 3;
|
||||||
|
private static final int FIVE_BYTES = 5;
|
||||||
|
private static final int TWENTY_BYTES = 20;
|
||||||
|
private static final int THIRTY_BYTES = 30;
|
||||||
private static final int KILOBYTE = 1024;
|
private static final int KILOBYTE = 1024;
|
||||||
private static final int MEGABYTE = KILOBYTE * KILOBYTE;
|
private static final int MEGABYTE = KILOBYTE * KILOBYTE;
|
||||||
|
private static final int FOUR_MB = 4 * MEGABYTE;
|
||||||
|
private static final int NINE_MB = 9 * MEGABYTE;
|
||||||
private static final long TEST_FILE_SIZE = 8 * MEGABYTE;
|
private static final long TEST_FILE_SIZE = 8 * MEGABYTE;
|
||||||
private static final int MAX_ELAPSEDTIMEMS = 20;
|
private static final int MAX_ELAPSEDTIMEMS = 20;
|
||||||
private static final int SEQUENTIAL_READ_BUFFER_SIZE = 16 * KILOBYTE;
|
private static final int SEQUENTIAL_READ_BUFFER_SIZE = 16 * KILOBYTE;
|
||||||
private static final int CREATE_BUFFER_SIZE = 26 * KILOBYTE;
|
|
||||||
|
|
||||||
private static final int SEEK_POSITION_ONE = 2* KILOBYTE;
|
private static final int SEEK_POSITION_ONE = 2* KILOBYTE;
|
||||||
private static final int SEEK_POSITION_TWO = 5 * KILOBYTE;
|
private static final int SEEK_POSITION_TWO = 5 * KILOBYTE;
|
||||||
private static final int SEEK_POSITION_THREE = 10 * KILOBYTE;
|
private static final int SEEK_POSITION_THREE = 10 * KILOBYTE;
|
||||||
private static final int SEEK_POSITION_FOUR = 4100 * KILOBYTE;
|
private static final int SEEK_POSITION_FOUR = 4100 * KILOBYTE;
|
||||||
|
|
||||||
private static final Path TEST_FILE_PATH = new Path(
|
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * MEGABYTE;
|
||||||
"/TestRandomRead.txt");
|
private static final int DISABLED_READAHEAD_DEPTH = 0;
|
||||||
|
|
||||||
|
private static final String TEST_FILE_PREFIX = "/TestRandomRead";
|
||||||
private static final String WASB = "WASB";
|
private static final String WASB = "WASB";
|
||||||
private static final String ABFS = "ABFS";
|
private static final String ABFS = "ABFS";
|
||||||
private static long testFileLength = 0;
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ITestAzureBlobFileSystemRandomRead.class);
|
LoggerFactory.getLogger(ITestAzureBlobFileSystemRandomRead.class);
|
||||||
|
@ -71,9 +86,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicRead() throws Exception {
|
public void testBasicRead() throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testBasicRead");
|
||||||
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
byte[] buffer = new byte[3 * MEGABYTE];
|
byte[] buffer = new byte[3 * MEGABYTE];
|
||||||
|
|
||||||
// forward seek and read a kilobyte into first kilobyte of bufferV2
|
// forward seek and read a kilobyte into first kilobyte of bufferV2
|
||||||
|
@ -99,12 +115,14 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
public void testRandomRead() throws Exception {
|
public void testRandomRead() throws Exception {
|
||||||
Assume.assumeFalse("This test does not support namespace enabled account",
|
Assume.assumeFalse("This test does not support namespace enabled account",
|
||||||
this.getFileSystem().getIsNamespaceEnabled());
|
this.getFileSystem().getIsNamespaceEnabled());
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomRead");
|
||||||
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
try (
|
try (
|
||||||
FSDataInputStream inputStreamV1
|
FSDataInputStream inputStreamV1
|
||||||
= this.getFileSystem().open(TEST_FILE_PATH);
|
= this.getFileSystem().open(testPath);
|
||||||
FSDataInputStream inputStreamV2
|
FSDataInputStream inputStreamV2
|
||||||
= this.getWasbFileSystem().open(TEST_FILE_PATH);
|
= this.getWasbFileSystem().open(testPath);
|
||||||
) {
|
) {
|
||||||
final int bufferSize = 4 * KILOBYTE;
|
final int bufferSize = 4 * KILOBYTE;
|
||||||
byte[] bufferV1 = new byte[bufferSize];
|
byte[] bufferV1 = new byte[bufferSize];
|
||||||
|
@ -156,8 +174,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSeekToNewSource() throws Exception {
|
public void testSeekToNewSource() throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekToNewSource");
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
assertFalse(inputStream.seekToNewSource(0));
|
assertFalse(inputStream.seekToNewSource(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -169,8 +189,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSkipBounds() throws Exception {
|
public void testSkipBounds() throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipBounds");
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||||
|
|
||||||
long skipped = inputStream.skip(-1);
|
long skipped = inputStream.skip(-1);
|
||||||
|
@ -208,8 +230,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testValidateSeekBounds() throws Exception {
|
public void testValidateSeekBounds() throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||||
|
|
||||||
inputStream.seek(0);
|
inputStream.seek(0);
|
||||||
|
@ -257,8 +281,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSeekAndAvailableAndPosition() throws Exception {
|
public void testSeekAndAvailableAndPosition() throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
|
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
|
||||||
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
|
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
|
||||||
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
|
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
|
||||||
|
@ -321,8 +347,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSkipAndAvailableAndPosition() throws Exception {
|
public void testSkipAndAvailableAndPosition() throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
|
||||||
try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
|
long testFileLength = assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
|
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
|
||||||
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
|
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
|
||||||
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
|
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
|
||||||
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
|
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
|
||||||
|
@ -385,15 +413,16 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
@Test
|
@Test
|
||||||
public void testSequentialReadAfterReverseSeekPerformance()
|
public void testSequentialReadAfterReverseSeekPerformance()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
assumeHugeFileExists();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
|
||||||
|
assumeHugeFileExists(testPath);
|
||||||
final int maxAttempts = 10;
|
final int maxAttempts = 10;
|
||||||
final double maxAcceptableRatio = 1.01;
|
final double maxAcceptableRatio = 1.01;
|
||||||
double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
|
double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
|
||||||
double ratio = Double.MAX_VALUE;
|
double ratio = Double.MAX_VALUE;
|
||||||
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
||||||
beforeSeekElapsedMs = sequentialRead(ABFS,
|
beforeSeekElapsedMs = sequentialRead(ABFS, testPath,
|
||||||
this.getFileSystem(), false);
|
this.getFileSystem(), false);
|
||||||
afterSeekElapsedMs = sequentialRead(ABFS,
|
afterSeekElapsedMs = sequentialRead(ABFS, testPath,
|
||||||
this.getFileSystem(), true);
|
this.getFileSystem(), true);
|
||||||
ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
|
ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
|
||||||
LOG.info((String.format(
|
LOG.info((String.format(
|
||||||
|
@ -417,8 +446,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
public void testRandomReadPerformance() throws Exception {
|
public void testRandomReadPerformance() throws Exception {
|
||||||
Assume.assumeFalse("This test does not support namespace enabled account",
|
Assume.assumeFalse("This test does not support namespace enabled account",
|
||||||
this.getFileSystem().getIsNamespaceEnabled());
|
this.getFileSystem().getIsNamespaceEnabled());
|
||||||
createTestFile();
|
Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
|
||||||
assumeHugeFileExists();
|
assumeHugeFileExists(testPath);
|
||||||
|
|
||||||
final AzureBlobFileSystem abFs = this.getFileSystem();
|
final AzureBlobFileSystem abFs = this.getFileSystem();
|
||||||
final NativeAzureFileSystem wasbFs = this.getWasbFileSystem();
|
final NativeAzureFileSystem wasbFs = this.getWasbFileSystem();
|
||||||
|
@ -428,8 +457,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
double v1ElapsedMs = 0, v2ElapsedMs = 0;
|
double v1ElapsedMs = 0, v2ElapsedMs = 0;
|
||||||
double ratio = Double.MAX_VALUE;
|
double ratio = Double.MAX_VALUE;
|
||||||
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
||||||
v1ElapsedMs = randomRead(1, wasbFs);
|
v1ElapsedMs = randomRead(1, testPath, wasbFs);
|
||||||
v2ElapsedMs = randomRead(2, abFs);
|
v2ElapsedMs = randomRead(2, testPath, abFs);
|
||||||
|
|
||||||
ratio = v2ElapsedMs / v1ElapsedMs;
|
ratio = v2ElapsedMs / v1ElapsedMs;
|
||||||
|
|
||||||
|
@ -448,15 +477,112 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
ratio < maxAcceptableRatio);
|
ratio < maxAcceptableRatio);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* With this test we should see a full buffer read being triggered in case
|
||||||
|
* alwaysReadBufferSize is on, else only the requested buffer size.
|
||||||
|
* Hence a seek done few bytes away from last read position will trigger
|
||||||
|
* a network read when alwaysReadBufferSize is off, whereas it will return
|
||||||
|
* from the internal buffer when it is on.
|
||||||
|
* Reading a full buffer size is the Gen1 behaviour.
|
||||||
|
* @throws Throwable
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAlwaysReadBufferSizeConfig() throws Throwable {
|
||||||
|
testAlwaysReadBufferSizeConfig(false);
|
||||||
|
testAlwaysReadBufferSizeConfig(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAlwaysReadBufferSizeConfig(boolean alwaysReadBufferSizeConfigValue)
|
||||||
|
throws Throwable {
|
||||||
|
final AzureBlobFileSystem currentFs = getFileSystem();
|
||||||
|
Configuration config = new Configuration(this.getRawConfiguration());
|
||||||
|
config.set("fs.azure.readaheadqueue.depth", "0");
|
||||||
|
config.set("fs.azure.read.alwaysReadBufferSize",
|
||||||
|
Boolean.toString(alwaysReadBufferSizeConfigValue));
|
||||||
|
|
||||||
|
final Path testFile = new Path("/FileName_"
|
||||||
|
+ UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
final AzureBlobFileSystem fs = createTestFile(testFile, 16 * MEGABYTE,
|
||||||
|
1 * MEGABYTE, config);
|
||||||
|
String eTag = fs.getAbfsClient()
|
||||||
|
.getPathStatus(testFile.toUri().getPath(), false)
|
||||||
|
.getResult()
|
||||||
|
.getResponseHeader(ETAG);
|
||||||
|
|
||||||
|
TestAbfsInputStream testInputStream = new TestAbfsInputStream();
|
||||||
|
|
||||||
|
AbfsInputStream inputStream = testInputStream.getAbfsInputStream(
|
||||||
|
fs.getAbfsClient(),
|
||||||
|
testFile.getName(), ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, eTag,
|
||||||
|
DISABLED_READAHEAD_DEPTH, FOUR_MB,
|
||||||
|
alwaysReadBufferSizeConfigValue, FOUR_MB);
|
||||||
|
|
||||||
|
long connectionsAtStart = fs.getInstrumentationMap()
|
||||||
|
.get(GET_RESPONSES.getStatName());
|
||||||
|
|
||||||
|
long dateSizeReadStatAtStart = fs.getInstrumentationMap()
|
||||||
|
.get(BYTES_RECEIVED.getStatName());
|
||||||
|
|
||||||
|
long newReqCount = 0;
|
||||||
|
long newDataSizeRead = 0;
|
||||||
|
|
||||||
|
byte[] buffer20b = new byte[TWENTY_BYTES];
|
||||||
|
byte[] buffer30b = new byte[THIRTY_BYTES];
|
||||||
|
byte[] byteBuffer5 = new byte[FIVE_BYTES];
|
||||||
|
|
||||||
|
// first read
|
||||||
|
// if alwaysReadBufferSize is off, this is a sequential read
|
||||||
|
inputStream.read(byteBuffer5, 0, FIVE_BYTES);
|
||||||
|
newReqCount++;
|
||||||
|
newDataSizeRead += FOUR_MB;
|
||||||
|
|
||||||
|
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount,
|
||||||
|
fs.getInstrumentationMap());
|
||||||
|
assertAbfsStatistics(BYTES_RECEIVED,
|
||||||
|
dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
|
||||||
|
|
||||||
|
// second read beyond that the buffer holds
|
||||||
|
// if alwaysReadBufferSize is off, this is a random read. Reads only
|
||||||
|
// incoming buffer size
|
||||||
|
// else, reads a buffer size
|
||||||
|
inputStream.seek(NINE_MB);
|
||||||
|
inputStream.read(buffer20b, 0, BYTE);
|
||||||
|
newReqCount++;
|
||||||
|
if (alwaysReadBufferSizeConfigValue) {
|
||||||
|
newDataSizeRead += FOUR_MB;
|
||||||
|
} else {
|
||||||
|
newDataSizeRead += TWENTY_BYTES;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
|
||||||
|
assertAbfsStatistics(BYTES_RECEIVED,
|
||||||
|
dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
|
||||||
|
|
||||||
|
// third read adjacent to second but not exactly sequential.
|
||||||
|
// if alwaysReadBufferSize is off, this is another random read
|
||||||
|
// else second read would have read this too.
|
||||||
|
inputStream.seek(NINE_MB + TWENTY_BYTES + THREE_BYTES);
|
||||||
|
inputStream.read(buffer30b, 0, THREE_BYTES);
|
||||||
|
if (!alwaysReadBufferSizeConfigValue) {
|
||||||
|
newReqCount++;
|
||||||
|
newDataSizeRead += THIRTY_BYTES;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
|
||||||
|
assertAbfsStatistics(BYTES_RECEIVED, dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
|
||||||
|
}
|
||||||
|
|
||||||
private long sequentialRead(String version,
|
private long sequentialRead(String version,
|
||||||
|
Path testPath,
|
||||||
FileSystem fs,
|
FileSystem fs,
|
||||||
boolean afterReverseSeek) throws IOException {
|
boolean afterReverseSeek) throws IOException {
|
||||||
byte[] buffer = new byte[SEQUENTIAL_READ_BUFFER_SIZE];
|
byte[] buffer = new byte[SEQUENTIAL_READ_BUFFER_SIZE];
|
||||||
long totalBytesRead = 0;
|
long totalBytesRead = 0;
|
||||||
long bytesRead = 0;
|
long bytesRead = 0;
|
||||||
|
|
||||||
try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
long testFileLength = fs.getFileStatus(testPath).getLen();
|
||||||
|
try(FSDataInputStream inputStream = fs.open(testPath)) {
|
||||||
if (afterReverseSeek) {
|
if (afterReverseSeek) {
|
||||||
while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
|
while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
|
||||||
bytesRead = inputStream.read(buffer);
|
bytesRead = inputStream.read(buffer);
|
||||||
|
@ -487,14 +613,14 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long randomRead(int version, FileSystem fs) throws Exception {
|
private long randomRead(int version, Path testPath, FileSystem fs) throws Exception {
|
||||||
assumeHugeFileExists();
|
assumeHugeFileExists(testPath);
|
||||||
final long minBytesToRead = 2 * MEGABYTE;
|
final long minBytesToRead = 2 * MEGABYTE;
|
||||||
Random random = new Random();
|
Random random = new Random();
|
||||||
byte[] buffer = new byte[8 * KILOBYTE];
|
byte[] buffer = new byte[8 * KILOBYTE];
|
||||||
long totalBytesRead = 0;
|
long totalBytesRead = 0;
|
||||||
long bytesRead = 0;
|
long bytesRead = 0;
|
||||||
try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
try(FSDataInputStream inputStream = fs.open(testPath)) {
|
||||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||||
do {
|
do {
|
||||||
bytesRead = inputStream.read(buffer);
|
bytesRead = inputStream.read(buffer);
|
||||||
|
@ -526,28 +652,48 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
return bytes / 1000.0 * 8 / milliseconds;
|
return bytes / 1000.0 * 8 / milliseconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createTestFile() throws Exception {
|
private long createTestFile(Path testPath) throws Exception {
|
||||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
createTestFile(testPath,
|
||||||
if (fs.exists(TEST_FILE_PATH)) {
|
TEST_FILE_SIZE,
|
||||||
FileStatus status = fs.getFileStatus(TEST_FILE_PATH);
|
MEGABYTE,
|
||||||
if (status.getLen() >= TEST_FILE_SIZE) {
|
null);
|
||||||
return;
|
|
||||||
|
return TEST_FILE_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
|
||||||
|
int createBufferSize, Configuration config) throws Exception {
|
||||||
|
AzureBlobFileSystem fs;
|
||||||
|
|
||||||
|
if (config == null) {
|
||||||
|
config = this.getRawConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
final AzureBlobFileSystem currentFs = getFileSystem();
|
||||||
|
fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
|
||||||
|
config);
|
||||||
|
|
||||||
|
if (fs.exists(testFilePath)) {
|
||||||
|
FileStatus status = fs.getFileStatus(testFilePath);
|
||||||
|
if (status.getLen() == testFileSize) {
|
||||||
|
return fs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] buffer = new byte[CREATE_BUFFER_SIZE];
|
byte[] buffer = new byte[createBufferSize];
|
||||||
char character = 'a';
|
char character = 'a';
|
||||||
for (int i = 0; i < buffer.length; i++) {
|
for (int i = 0; i < buffer.length; i++) {
|
||||||
buffer[i] = (byte) character;
|
buffer[i] = (byte) character;
|
||||||
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
|
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(String.format("Creating test file %s of size: %d ", TEST_FILE_PATH, TEST_FILE_SIZE));
|
LOG.info(String.format("Creating test file %s of size: %d ", testFilePath, testFileSize));
|
||||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||||
|
|
||||||
try (FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
|
try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
|
||||||
|
String bufferContents = new String(buffer);
|
||||||
int bytesWritten = 0;
|
int bytesWritten = 0;
|
||||||
while (bytesWritten < TEST_FILE_SIZE) {
|
while (bytesWritten < testFileSize) {
|
||||||
outputStream.write(buffer);
|
outputStream.write(buffer);
|
||||||
bytesWritten += buffer.length;
|
bytesWritten += buffer.length;
|
||||||
}
|
}
|
||||||
|
@ -557,18 +703,18 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
||||||
outputStream.close();
|
outputStream.close();
|
||||||
closeTimer.end("time to close() output stream");
|
closeTimer.end("time to close() output stream");
|
||||||
}
|
}
|
||||||
timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
|
timer.end("time to write %d KB", testFileSize / 1024);
|
||||||
testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
|
return fs;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assumeHugeFileExists() throws Exception{
|
private long assumeHugeFileExists(Path testPath) throws Exception{
|
||||||
createTestFile();
|
long fileSize = createTestFile(testPath);
|
||||||
FileSystem fs = this.getFileSystem();
|
FileSystem fs = this.getFileSystem();
|
||||||
ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", TEST_FILE_PATH);
|
ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", testPath);
|
||||||
FileStatus status = fs.getFileStatus(TEST_FILE_PATH);
|
FileStatus status = fs.getFileStatus(testPath);
|
||||||
ContractTestUtils.assertIsFile(TEST_FILE_PATH, status);
|
ContractTestUtils.assertIsFile(testPath, status);
|
||||||
assertTrue("File " + TEST_FILE_PATH + " is empty", status.getLen() > 0);
|
assertTrue("File " + testPath + " is not of expected size " + fileSize + ":actual=" + status.getLen(), status.getLen() == fileSize);
|
||||||
|
return fileSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
|
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
|
||||||
|
|
|
@ -22,10 +22,17 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
|
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
|
||||||
|
@ -51,9 +58,17 @@ public class TestAbfsInputStream extends
|
||||||
private static final int ONE_KB = 1 * 1024;
|
private static final int ONE_KB = 1 * 1024;
|
||||||
private static final int TWO_KB = 2 * 1024;
|
private static final int TWO_KB = 2 * 1024;
|
||||||
private static final int THREE_KB = 3 * 1024;
|
private static final int THREE_KB = 3 * 1024;
|
||||||
|
private static final int SIXTEEN_KB = 16 * ONE_KB;
|
||||||
|
private static final int FORTY_EIGHT_KB = 48 * ONE_KB;
|
||||||
|
private static final int ONE_MB = 1 * 1024 * 1024;
|
||||||
|
private static final int FOUR_MB = 4 * ONE_MB;
|
||||||
|
private static final int EIGHT_MB = 8 * ONE_MB;
|
||||||
|
private static final int TEST_READAHEAD_DEPTH_2 = 2;
|
||||||
|
private static final int TEST_READAHEAD_DEPTH_4 = 4;
|
||||||
private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
|
private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
|
||||||
private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
|
private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
|
||||||
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
|
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
|
||||||
|
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
|
||||||
|
|
||||||
private AbfsRestOperation getMockRestOp() {
|
private AbfsRestOperation getMockRestOp() {
|
||||||
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
||||||
|
@ -84,7 +99,7 @@ public class TestAbfsInputStream extends
|
||||||
null,
|
null,
|
||||||
FORWARD_SLASH + fileName,
|
FORWARD_SLASH + fileName,
|
||||||
THREE_KB,
|
THREE_KB,
|
||||||
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
|
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
|
||||||
"eTag");
|
"eTag");
|
||||||
|
|
||||||
inputStream.setCachedSasToken(
|
inputStream.setCachedSasToken(
|
||||||
|
@ -93,6 +108,33 @@ public class TestAbfsInputStream extends
|
||||||
return inputStream;
|
return inputStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
|
||||||
|
String fileName,
|
||||||
|
int fileSize,
|
||||||
|
String eTag,
|
||||||
|
int readAheadQueueDepth,
|
||||||
|
int readBufferSize,
|
||||||
|
boolean alwaysReadBufferSize,
|
||||||
|
int readAheadBlockSize) {
|
||||||
|
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
|
||||||
|
// Create AbfsInputStream with the client instance
|
||||||
|
AbfsInputStream inputStream = new AbfsInputStream(
|
||||||
|
abfsClient,
|
||||||
|
null,
|
||||||
|
FORWARD_SLASH + fileName,
|
||||||
|
fileSize,
|
||||||
|
inputStreamContext.withReadBufferSize(readBufferSize)
|
||||||
|
.withReadAheadQueueDepth(readAheadQueueDepth)
|
||||||
|
.withShouldReadBufferSizeAlways(alwaysReadBufferSize)
|
||||||
|
.withReadAheadBlockSize(readAheadBlockSize),
|
||||||
|
eTag);
|
||||||
|
|
||||||
|
inputStream.setCachedSasToken(
|
||||||
|
TestCachedSASToken.getTestCachedSASTokenInstance());
|
||||||
|
|
||||||
|
return inputStream;
|
||||||
|
}
|
||||||
|
|
||||||
private void queueReadAheads(AbfsInputStream inputStream) {
|
private void queueReadAheads(AbfsInputStream inputStream) {
|
||||||
// Mimic AbfsInputStream readAhead queue requests
|
// Mimic AbfsInputStream readAhead queue requests
|
||||||
ReadBufferManager.getBufferManager()
|
ReadBufferManager.getBufferManager()
|
||||||
|
@ -496,4 +538,183 @@ public class TestAbfsInputStream extends
|
||||||
checkEvictedStatus(inputStream, 0, true);
|
checkEvictedStatus(inputStream, 0, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test readahead with different config settings for request request size and
|
||||||
|
* readAhead block size
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception {
|
||||||
|
// Set requestRequestSize = 4MB and readAheadBufferSize=8MB
|
||||||
|
resetReadBufferManager(FOUR_MB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
|
||||||
|
testReadAheadConfigs(FOUR_MB, TEST_READAHEAD_DEPTH_4, false, EIGHT_MB);
|
||||||
|
|
||||||
|
// Test for requestRequestSize =16KB and readAheadBufferSize=16KB
|
||||||
|
resetReadBufferManager(SIXTEEN_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
|
||||||
|
AbfsInputStream inputStream = testReadAheadConfigs(SIXTEEN_KB,
|
||||||
|
TEST_READAHEAD_DEPTH_2, true, SIXTEEN_KB);
|
||||||
|
testReadAheads(inputStream, SIXTEEN_KB, SIXTEEN_KB);
|
||||||
|
|
||||||
|
// Test for requestRequestSize =16KB and readAheadBufferSize=48KB
|
||||||
|
resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
|
||||||
|
inputStream = testReadAheadConfigs(SIXTEEN_KB, TEST_READAHEAD_DEPTH_2, true,
|
||||||
|
FORTY_EIGHT_KB);
|
||||||
|
testReadAheads(inputStream, SIXTEEN_KB, FORTY_EIGHT_KB);
|
||||||
|
|
||||||
|
// Test for requestRequestSize =48KB and readAheadBufferSize=16KB
|
||||||
|
resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
|
||||||
|
inputStream = testReadAheadConfigs(FORTY_EIGHT_KB, TEST_READAHEAD_DEPTH_2,
|
||||||
|
true,
|
||||||
|
SIXTEEN_KB);
|
||||||
|
testReadAheads(inputStream, FORTY_EIGHT_KB, SIXTEEN_KB);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void testReadAheads(AbfsInputStream inputStream,
|
||||||
|
int readRequestSize,
|
||||||
|
int readAheadRequestSize)
|
||||||
|
throws Exception {
|
||||||
|
if (readRequestSize > readAheadRequestSize) {
|
||||||
|
readAheadRequestSize = readRequestSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] firstReadBuffer = new byte[readRequestSize];
|
||||||
|
byte[] secondReadBuffer = new byte[readAheadRequestSize];
|
||||||
|
|
||||||
|
// get the expected bytes to compare
|
||||||
|
byte[] expectedFirstReadAheadBufferContents = new byte[readRequestSize];
|
||||||
|
byte[] expectedSecondReadAheadBufferContents = new byte[readAheadRequestSize];
|
||||||
|
getExpectedBufferData(0, readRequestSize, expectedFirstReadAheadBufferContents);
|
||||||
|
getExpectedBufferData(readRequestSize, readAheadRequestSize,
|
||||||
|
expectedSecondReadAheadBufferContents);
|
||||||
|
|
||||||
|
Assertions.assertThat(inputStream.read(firstReadBuffer, 0, readRequestSize))
|
||||||
|
.describedAs("Read should be of exact requested size")
|
||||||
|
.isEqualTo(readRequestSize);
|
||||||
|
|
||||||
|
assertTrue("Data mismatch found in RAH1",
|
||||||
|
Arrays.equals(firstReadBuffer,
|
||||||
|
expectedFirstReadAheadBufferContents));
|
||||||
|
|
||||||
|
Assertions.assertThat(inputStream.read(secondReadBuffer, 0, readAheadRequestSize))
|
||||||
|
.describedAs("Read should be of exact requested size")
|
||||||
|
.isEqualTo(readAheadRequestSize);
|
||||||
|
|
||||||
|
assertTrue("Data mismatch found in RAH2",
|
||||||
|
Arrays.equals(secondReadBuffer,
|
||||||
|
expectedSecondReadAheadBufferContents));
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsInputStream testReadAheadConfigs(int readRequestSize,
|
||||||
|
int readAheadQueueDepth,
|
||||||
|
boolean alwaysReadBufferSizeEnabled,
|
||||||
|
int readAheadBlockSize) throws Exception {
|
||||||
|
Configuration
|
||||||
|
config = new Configuration(
|
||||||
|
this.getRawConfiguration());
|
||||||
|
config.set("fs.azure.read.request.size", Integer.toString(readRequestSize));
|
||||||
|
config.set("fs.azure.readaheadqueue.depth",
|
||||||
|
Integer.toString(readAheadQueueDepth));
|
||||||
|
config.set("fs.azure.read.alwaysReadBufferSize",
|
||||||
|
Boolean.toString(alwaysReadBufferSizeEnabled));
|
||||||
|
config.set("fs.azure.read.readahead.blocksize",
|
||||||
|
Integer.toString(readAheadBlockSize));
|
||||||
|
if (readRequestSize > readAheadBlockSize) {
|
||||||
|
readAheadBlockSize = readRequestSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path testPath = new Path(
|
||||||
|
"/testReadAheadConfigs");
|
||||||
|
final AzureBlobFileSystem fs = createTestFile(testPath,
|
||||||
|
ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config);
|
||||||
|
byte[] byteBuffer = new byte[ONE_MB];
|
||||||
|
AbfsInputStream inputStream = this.getAbfsStore(fs)
|
||||||
|
.openFileForRead(testPath, null);
|
||||||
|
|
||||||
|
Assertions.assertThat(inputStream.getBufferSize())
|
||||||
|
.describedAs("Unexpected AbfsInputStream buffer size")
|
||||||
|
.isEqualTo(readRequestSize);
|
||||||
|
|
||||||
|
Assertions.assertThat(inputStream.getReadAheadQueueDepth())
|
||||||
|
.describedAs("Unexpected ReadAhead queue depth")
|
||||||
|
.isEqualTo(readAheadQueueDepth);
|
||||||
|
|
||||||
|
Assertions.assertThat(inputStream.shouldAlwaysReadBufferSize())
|
||||||
|
.describedAs("Unexpected AlwaysReadBufferSize settings")
|
||||||
|
.isEqualTo(alwaysReadBufferSizeEnabled);
|
||||||
|
|
||||||
|
Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize())
|
||||||
|
.describedAs("Unexpected readAhead block size")
|
||||||
|
.isEqualTo(readAheadBlockSize);
|
||||||
|
|
||||||
|
return inputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getExpectedBufferData(int offset, int length, byte[] b) {
|
||||||
|
boolean startFillingIn = false;
|
||||||
|
int indexIntoBuffer = 0;
|
||||||
|
char character = 'a';
|
||||||
|
|
||||||
|
for (int i = 0; i < (offset + length); i++) {
|
||||||
|
if (i == offset) {
|
||||||
|
startFillingIn = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((startFillingIn) && (indexIntoBuffer < length)) {
|
||||||
|
b[indexIntoBuffer] = (byte) character;
|
||||||
|
indexIntoBuffer++;
|
||||||
|
}
|
||||||
|
|
||||||
|
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
|
||||||
|
Configuration config) throws Exception {
|
||||||
|
AzureBlobFileSystem fs;
|
||||||
|
|
||||||
|
if (config == null) {
|
||||||
|
fs = this.getFileSystem();
|
||||||
|
} else {
|
||||||
|
final AzureBlobFileSystem currentFs = getFileSystem();
|
||||||
|
fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
|
||||||
|
config);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fs.exists(testFilePath)) {
|
||||||
|
FileStatus status = fs.getFileStatus(testFilePath);
|
||||||
|
if (status.getLen() >= testFileSize) {
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] buffer = new byte[EIGHT_MB];
|
||||||
|
char character = 'a';
|
||||||
|
for (int i = 0; i < buffer.length; i++) {
|
||||||
|
buffer[i] = (byte) character;
|
||||||
|
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
|
||||||
|
int bytesWritten = 0;
|
||||||
|
while (bytesWritten < testFileSize) {
|
||||||
|
outputStream.write(buffer);
|
||||||
|
bytesWritten += buffer.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assertions.assertThat(fs.getFileStatus(testFilePath).getLen())
|
||||||
|
.describedAs("File not created of expected size")
|
||||||
|
.isEqualTo(testFileSize);
|
||||||
|
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetReadBufferManager(int bufferSize, int threshold) {
|
||||||
|
ReadBufferManager.getBufferManager()
|
||||||
|
.testResetReadBufferManager(bufferSize, threshold);
|
||||||
|
// Trigger GC as aggressive recreation of ReadBufferManager buffers
|
||||||
|
// by successive tests can lead to OOM based on the dev VM/machine capacity.
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue