diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index a809bde6c30..926c23d7c53 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -238,6 +238,9 @@ private int readInternal(final long position, final byte[] b, final int offset, if (receivedBytes > 0) { incrementReadOps(); LOG.debug("Received data from read ahead, not doing remote read"); + if (streamStatistics != null) { + streamStatistics.readAheadBytesRead(receivedBytes); + } return receivedBytes; } @@ -292,6 +295,9 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti throw new IOException(ex); } long bytesRead = op.getResult().getBytesReceived(); + if (streamStatistics != null) { + streamStatistics.remoteBytesRead(bytesRead); + } if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java index 2603394c933..c910a1f75e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java @@ -84,6 +84,18 @@ public interface AbfsInputStreamStatistics { */ void remoteReadOperation(); + /** + * Records the bytes read from readAhead buffer. + * @param bytes the bytes to be incremented. + */ + void readAheadBytesRead(long bytes); + + /** + * Records bytes read remotely after nothing from readAheadBuffer was read. + * @param bytes the bytes to be incremented. + */ + void remoteBytesRead(long bytes); + /** * Makes the string of all the AbfsInputStream statistics. * @return the string with all the statistics. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java index fd18910813d..12cc407dcbc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl private long readOperations; private long bytesReadFromBuffer; private long remoteReadOperations; + private long readAheadBytesRead; + private long remoteBytesRead; /** * Seek backwards, incrementing the seek and backward seek counters. @@ -128,6 +130,30 @@ public void readOperationStarted(long pos, long len) { readOperations++; } + /** + * Total bytes read from readAhead buffer during a read operation. + * + * @param bytes the bytes to be incremented. + */ + @Override + public void readAheadBytesRead(long bytes) { + if (bytes > 0) { + readAheadBytesRead += bytes; + } + } + + /** + * Total bytes read remotely after nothing was read from readAhead buffer. + * + * @param bytes the bytes to be incremented. + */ + @Override + public void remoteBytesRead(long bytes) { + if (bytes > 0) { + remoteBytesRead += bytes; + } + } + /** * {@inheritDoc} * @@ -178,6 +204,14 @@ public long getRemoteReadOperations() { return remoteReadOperations; } + public long getReadAheadBytesRead() { + return readAheadBytesRead; + } + + public long getRemoteBytesRead() { + return remoteBytesRead; + } + /** * String operator describes all the current statistics. * Important: there are no guarantees as to the stability @@ -199,6 +233,8 @@ public String toString() { sb.append(", ReadOperations=").append(readOperations); sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer); sb.append(", remoteReadOperations=").append(remoteReadOperations); + sb.append(", readAheadBytesRead=").append(readAheadBytesRead); + sb.append(", remoteBytesRead=").append(remoteBytesRead); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 7a62ecab7f4..8385099a78d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class); private static final int ONE_MB = 1024 * 1024; private static final int ONE_KB = 1024; + private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024; + private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE; + private static final int THREAD_SLEEP_10_SECONDS = 10; + private static final int TIMEOUT_30_SECONDS = 30000; private byte[] defBuffer = new byte[ONE_MB]; public ITestAbfsInputStreamStatistics() throws Exception { @@ -75,6 +80,8 @@ public void testInitValues() throws IOException { checkInitValue(stats.getReadOperations(), "readOps"); checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer"); checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps"); + checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead"); + checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead"); } finally { IOUtils.cleanupWithLogger(LOG, outputStream, inputStream); @@ -285,6 +292,94 @@ public void testWithNullStreamStatistics() throws IOException { } } + /** + * Testing readAhead counters in AbfsInputStream with 30 seconds timeout. + */ + @Test(timeout = TIMEOUT_30_SECONDS) + public void testReadAheadCounters() throws IOException, InterruptedException { + describe("Test to check correct values for readAhead counters in " + + "AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path readAheadCountersPath = path(getMethodName()); + + /* + * Setting the block size for readAhead as 4KB. + */ + abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + + /* + * Creating a file of 1MB size. + */ + out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath); + out.write(defBuffer); + out.close(); + + in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics()); + + /* + * Reading 1KB after each i * KB positions. Hence the reads are from 0 + * to 1KB, 1KB to 2KB, and so on.. for 5 operations. + */ + for (int i = 0; i < 5; i++) { + in.seek(ONE_KB * i); + in.read(defBuffer, ONE_KB * i, ONE_KB); + } + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + + /* + * Since, readAhead is done in background threads. Sometimes, the + * threads aren't finished in the background and could result in + * inaccurate results. So, we wait till we have the accurate values + * with a limit of 30 seconds as that's when the test times out. + * + */ + while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE + || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) { + Thread.sleep(THREAD_SLEEP_10_SECONDS); + } + + /* + * Verifying the counter values of readAheadBytesRead and remoteBytesRead. + * + * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go + * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since + * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB + * buffer. Our read is till 5KB, hence readAhead would ideally read 2 + * blocks of 4KB which is equal to 8KB. But, sometimes to get more than + * one block from readAhead buffer we might have to wait for background + * threads to fill the buffer and hence we might do remote read which + * would be faster. Therefore, readAheadBytesRead would be equal to or + * greater than 4KB. + * + * remoteBytesRead : Since, the bufferSize is set to 4KB and the number + * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4 + * KB buffer on the first read, which is equal to 32KB. But, if we are not + * able to read some bytes that were in the buffer after doing + * readAhead, we might use remote read again. Thus, the bytes read + * remotely could also be greater than 32Kb. + * + */ + Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs( + "Mismatch in readAheadBytesRead counter value") + .isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE); + + Assertions.assertThat(stats.getRemoteBytesRead()).describedAs( + "Mismatch in remoteBytesRead counter value") + .isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + /** * Method to assert the initial values of the statistics. *