HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636)
part of HADOOP-18103. Contributed By: Mukund Thakur
This commit is contained in:
parent
1b9135e3b5
commit
147a466c6d
|
@ -47,7 +47,7 @@ public final class StreamStatisticNames {
|
||||||
public static final String STREAM_READ_ABORTED = "stream_aborted";
|
public static final String STREAM_READ_ABORTED = "stream_aborted";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bytes read from an input stream in read() calls.
|
* Bytes read from an input stream in read()/readVectored() calls.
|
||||||
* Does not include bytes read and then discarded in seek/close etc.
|
* Does not include bytes read and then discarded in seek/close etc.
|
||||||
* These are the bytes returned to the caller.
|
* These are the bytes returned to the caller.
|
||||||
* Value: {@value}.
|
* Value: {@value}.
|
||||||
|
@ -110,6 +110,34 @@ public final class StreamStatisticNames {
|
||||||
public static final String STREAM_READ_OPERATIONS =
|
public static final String STREAM_READ_OPERATIONS =
|
||||||
"stream_read_operations";
|
"stream_read_operations";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count of readVectored() operations in an input stream.
|
||||||
|
* Value: {@value}.
|
||||||
|
*/
|
||||||
|
public static final String STREAM_READ_VECTORED_OPERATIONS =
|
||||||
|
"stream_read_vectored_operations";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count of bytes discarded during readVectored() operation
|
||||||
|
* in an input stream.
|
||||||
|
* Value: {@value}.
|
||||||
|
*/
|
||||||
|
public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED =
|
||||||
|
"stream_read_vectored_read_bytes_discarded";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count of incoming file ranges during readVectored() operation.
|
||||||
|
* Value: {@value}
|
||||||
|
*/
|
||||||
|
public static final String STREAM_READ_VECTORED_INCOMING_RANGES =
|
||||||
|
"stream_read_vectored_incoming_ranges";
|
||||||
|
/**
|
||||||
|
* Count of combined file ranges during readVectored() operation.
|
||||||
|
* Value: {@value}
|
||||||
|
*/
|
||||||
|
public static final String STREAM_READ_VECTORED_COMBINED_RANGES =
|
||||||
|
"stream_read_vectored_combined_ranges";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count of incomplete read() operations in an input stream,
|
* Count of incomplete read() operations in an input stream,
|
||||||
* that is, when the bytes returned were less than that requested.
|
* that is, when the bytes returned were less than that requested.
|
||||||
|
|
|
@ -84,6 +84,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
||||||
return allocate;
|
return allocate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public WeakReferencedElasticByteBufferPool getPool() {
|
||||||
|
return pool;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
|
@ -382,6 +386,13 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
||||||
return fileRanges;
|
return fileRanges;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected List<FileRange> getConsecutiveRanges() {
|
||||||
|
List<FileRange> fileRanges = new ArrayList<>();
|
||||||
|
fileRanges.add(FileRange.createFileRange(100, 500));
|
||||||
|
fileRanges.add(FileRange.createFileRange(600, 500));
|
||||||
|
return fileRanges;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate that exceptions must be thrown during a vectored
|
* Validate that exceptions must be thrown during a vectored
|
||||||
* read operation with specific input ranges.
|
* read operation with specific input ranges.
|
||||||
|
|
|
@ -963,7 +963,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
@Override
|
@Override
|
||||||
public void readVectored(List<? extends FileRange> ranges,
|
public void readVectored(List<? extends FileRange> ranges,
|
||||||
IntFunction<ByteBuffer> allocate) throws IOException {
|
IntFunction<ByteBuffer> allocate) throws IOException {
|
||||||
|
|
||||||
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
|
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
|
||||||
checkNotClosed();
|
checkNotClosed();
|
||||||
if (stopVectoredIOOperations.getAndSet(false)) {
|
if (stopVectoredIOOperations.getAndSet(false)) {
|
||||||
|
@ -978,6 +977,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
|
|
||||||
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
|
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
|
||||||
LOG.debug("Not merging the ranges as they are disjoint");
|
LOG.debug("Not merging the ranges as they are disjoint");
|
||||||
|
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
|
||||||
for (FileRange range: sortedRanges) {
|
for (FileRange range: sortedRanges) {
|
||||||
ByteBuffer buffer = allocate.apply(range.getLength());
|
ByteBuffer buffer = allocate.apply(range.getLength());
|
||||||
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
|
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
|
||||||
|
@ -987,6 +987,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
|
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
|
||||||
1, minSeekForVectorReads(),
|
1, minSeekForVectorReads(),
|
||||||
maxReadSizeForVectorReads());
|
maxReadSizeForVectorReads());
|
||||||
|
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
|
||||||
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
|
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
|
||||||
ranges.size(), combinedFileRanges.size());
|
ranges.size(), combinedFileRanges.size());
|
||||||
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
|
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
|
||||||
|
@ -1088,6 +1089,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
}
|
}
|
||||||
drainBytes += readCount;
|
drainBytes += readCount;
|
||||||
}
|
}
|
||||||
|
streamStatistics.readVectoredBytesDiscarded(drainBytes);
|
||||||
LOG.debug("{} bytes drained from stream ", drainBytes);
|
LOG.debug("{} bytes drained from stream ", drainBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1168,6 +1170,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
} else {
|
} else {
|
||||||
readByteArray(objectContent, buffer.array(), 0, length);
|
readByteArray(objectContent, buffer.array(), 0, length);
|
||||||
}
|
}
|
||||||
|
// update io stats.
|
||||||
|
incrementBytesRead(length);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -803,6 +803,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
||||||
private final AtomicLong readOperations;
|
private final AtomicLong readOperations;
|
||||||
private final AtomicLong readFullyOperations;
|
private final AtomicLong readFullyOperations;
|
||||||
private final AtomicLong seekOperations;
|
private final AtomicLong seekOperations;
|
||||||
|
private final AtomicLong readVectoredOperations;
|
||||||
|
private final AtomicLong bytesDiscardedInVectoredIO;
|
||||||
|
private final AtomicLong readVectoredIncomingRanges;
|
||||||
|
private final AtomicLong readVectoredCombinedRanges;
|
||||||
|
|
||||||
/** Bytes read by the application and any when draining streams . */
|
/** Bytes read by the application and any when draining streams . */
|
||||||
private final AtomicLong totalBytesRead;
|
private final AtomicLong totalBytesRead;
|
||||||
|
@ -836,6 +840,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
||||||
StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
|
StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
|
||||||
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
|
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
|
||||||
StreamStatisticNames.STREAM_READ_UNBUFFERED,
|
StreamStatisticNames.STREAM_READ_UNBUFFERED,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||||
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
|
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
|
||||||
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
|
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
|
||||||
.withDurationTracking(ACTION_HTTP_GET_REQUEST,
|
.withDurationTracking(ACTION_HTTP_GET_REQUEST,
|
||||||
|
@ -872,6 +880,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
||||||
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE);
|
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE);
|
||||||
readOperations = st.getCounterReference(
|
readOperations = st.getCounterReference(
|
||||||
StreamStatisticNames.STREAM_READ_OPERATIONS);
|
StreamStatisticNames.STREAM_READ_OPERATIONS);
|
||||||
|
readVectoredOperations = st.getCounterReference(
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS);
|
||||||
|
bytesDiscardedInVectoredIO = st.getCounterReference(
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED);
|
||||||
|
readVectoredIncomingRanges = st.getCounterReference(
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES);
|
||||||
|
readVectoredCombinedRanges = st.getCounterReference(
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES);
|
||||||
readFullyOperations = st.getCounterReference(
|
readFullyOperations = st.getCounterReference(
|
||||||
StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
|
StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
|
||||||
seekOperations = st.getCounterReference(
|
seekOperations = st.getCounterReference(
|
||||||
|
@ -1017,6 +1033,19 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readVectoredOperationStarted(int numIncomingRanges,
|
||||||
|
int numCombinedRanges) {
|
||||||
|
readVectoredIncomingRanges.addAndGet(numIncomingRanges);
|
||||||
|
readVectoredCombinedRanges.addAndGet(numCombinedRanges);
|
||||||
|
readVectoredOperations.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readVectoredBytesDiscarded(int discarded) {
|
||||||
|
bytesDiscardedInVectoredIO.addAndGet(discarded);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@code close()} merges the stream statistics into the filesystem's
|
* {@code close()} merges the stream statistics into the filesystem's
|
||||||
* instrumentation instance.
|
* instrumentation instance.
|
||||||
|
|
|
@ -96,6 +96,20 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
|
||||||
*/
|
*/
|
||||||
void readOperationCompleted(int requested, int actual);
|
void readOperationCompleted(int requested, int actual);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A vectored read operation has started..
|
||||||
|
* @param numIncomingRanges number of input ranges.
|
||||||
|
* @param numCombinedRanges number of combined ranges.
|
||||||
|
*/
|
||||||
|
void readVectoredOperationStarted(int numIncomingRanges,
|
||||||
|
int numCombinedRanges);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of bytes discarded during vectored read.
|
||||||
|
* @param discarded discarded bytes during vectored read.
|
||||||
|
*/
|
||||||
|
void readVectoredBytesDiscarded(int discarded);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
|
|
|
@ -195,6 +195,17 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readVectoredOperationStarted(int numIncomingRanges,
|
||||||
|
int numCombinedRanges) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readVectoredBytesDiscarded(int discarded) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
||||||
|
|
|
@ -19,28 +19,41 @@
|
||||||
package org.apache.hadoop.fs.contract.s3a;
|
package org.apache.hadoop.fs.contract.s3a;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileRange;
|
import org.apache.hadoop.fs.FileRange;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
|
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
|
||||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
import org.apache.hadoop.fs.s3a.Constants;
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||||
|
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||||
|
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
|
||||||
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
|
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
|
||||||
|
|
||||||
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
|
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class);
|
||||||
|
|
||||||
public ITestS3AContractVectoredRead(String bufferType) {
|
public ITestS3AContractVectoredRead(String bufferType) {
|
||||||
super(bufferType);
|
super(bufferType);
|
||||||
}
|
}
|
||||||
|
@ -156,4 +169,162 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
|
||||||
List<FileRange> fileRanges = getSampleSameRanges();
|
List<FileRange> fileRanges = getSampleSameRanges();
|
||||||
verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
|
verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* As the minimum seek value is 4*1024, the first three ranges will be
|
||||||
|
* merged into and other two will remain as it is.
|
||||||
|
* */
|
||||||
|
@Test
|
||||||
|
public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
|
||||||
|
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
|
||||||
|
List<FileRange> fileRanges = new ArrayList<>();
|
||||||
|
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
||||||
|
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
||||||
|
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
||||||
|
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
||||||
|
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
||||||
|
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
||||||
|
CompletableFuture<FSDataInputStream> builder =
|
||||||
|
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||||
|
.withFileStatus(fileStatus)
|
||||||
|
.build();
|
||||||
|
try (FSDataInputStream in = builder.get()) {
|
||||||
|
in.readVectored(fileRanges, getAllocate());
|
||||||
|
validateVectoredReadResult(fileRanges, DATASET);
|
||||||
|
returnBuffersToPoolPostRead(fileRanges, getPool());
|
||||||
|
|
||||||
|
// audit the io statistics for this stream
|
||||||
|
IOStatistics st = in.getIOStatistics();
|
||||||
|
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
|
||||||
|
|
||||||
|
// the vectored io operation must be tracked
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||||
|
1);
|
||||||
|
|
||||||
|
// the vectored io operation is being called with 5 input ranges.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||||
|
5);
|
||||||
|
|
||||||
|
// 5 input ranges got combined in 3 as some of them are close.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||||
|
3);
|
||||||
|
|
||||||
|
// number of bytes discarded will be based on the above input ranges.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||||
|
5944);
|
||||||
|
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||||
|
3);
|
||||||
|
|
||||||
|
// read bytes should match the sum of requested length for each input ranges.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_BYTES,
|
||||||
|
1424);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<FSDataInputStream> builder1 =
|
||||||
|
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||||
|
.withFileStatus(fileStatus)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (FSDataInputStream in = builder1.get()) {
|
||||||
|
for (FileRange range : fileRanges) {
|
||||||
|
byte[] temp = new byte[range.getLength()];
|
||||||
|
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
|
||||||
|
}
|
||||||
|
|
||||||
|
// audit the statistics for this stream
|
||||||
|
IOStatistics st = in.getIOStatistics();
|
||||||
|
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
|
||||||
|
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||||
|
0);
|
||||||
|
|
||||||
|
// all other counter values consistent.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||||
|
0);
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||||
|
5);
|
||||||
|
|
||||||
|
// read bytes should match the sum of requested length for each input ranges.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_BYTES,
|
||||||
|
1424);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiVectoredReadStatsCollection() throws Exception {
|
||||||
|
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
|
||||||
|
List<FileRange> ranges1 = getConsecutiveRanges();
|
||||||
|
List<FileRange> ranges2 = getConsecutiveRanges();
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
||||||
|
CompletableFuture<FSDataInputStream> builder =
|
||||||
|
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||||
|
.withFileStatus(fileStatus)
|
||||||
|
.build();
|
||||||
|
try (FSDataInputStream in = builder.get()) {
|
||||||
|
in.readVectored(ranges1, getAllocate());
|
||||||
|
in.readVectored(ranges2, getAllocate());
|
||||||
|
validateVectoredReadResult(ranges1, DATASET);
|
||||||
|
validateVectoredReadResult(ranges2, DATASET);
|
||||||
|
returnBuffersToPoolPostRead(ranges1, getPool());
|
||||||
|
returnBuffersToPoolPostRead(ranges2, getPool());
|
||||||
|
|
||||||
|
// audit the io statistics for this stream
|
||||||
|
IOStatistics st = in.getIOStatistics();
|
||||||
|
|
||||||
|
// 2 vectored io calls are made above.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||||
|
2);
|
||||||
|
|
||||||
|
// 2 vectored io operation is being called with 2 input ranges.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||||
|
4);
|
||||||
|
|
||||||
|
// 2 ranges are getting merged in 1 during both vectored io operation.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||||
|
2);
|
||||||
|
|
||||||
|
// number of bytes discarded will be 0 as the ranges are consecutive.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||||
|
0);
|
||||||
|
// only 2 http get request will be made because ranges in both range list will be merged
|
||||||
|
// to 1 because they are consecutive.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||||
|
2);
|
||||||
|
// read bytes should match the sum of requested length for each input ranges.
|
||||||
|
verifyStatisticCounterValue(st,
|
||||||
|
StreamStatisticNames.STREAM_READ_BYTES,
|
||||||
|
2000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
|
||||||
|
Configuration conf = getFileSystem().getConf();
|
||||||
|
// also resetting the min seek and max size values is important
|
||||||
|
// as this same test suite has test which overrides these params.
|
||||||
|
S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
||||||
|
Constants.READAHEAD_RANGE,
|
||||||
|
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||||
|
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
|
||||||
|
S3ATestUtils.disableFilesystemCaching(conf);
|
||||||
|
conf.setInt(Constants.READAHEAD_RANGE, 0);
|
||||||
|
return S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||||
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
||||||
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
||||||
|
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
||||||
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
|
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
|
||||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
@ -69,6 +70,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
|
@ -1457,4 +1459,33 @@ public final class S3ATestUtils {
|
||||||
+ " in " + secrets);
|
+ " in " + secrets);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the input stream statistics of an input stream.
|
||||||
|
* Raises an exception if the inner stream is not an S3A input stream
|
||||||
|
* @param in wrapper
|
||||||
|
* @return the statistics for the inner stream
|
||||||
|
*/
|
||||||
|
public static S3AInputStreamStatistics getInputStreamStatistics(
|
||||||
|
FSDataInputStream in) {
|
||||||
|
return getS3AInputStream(in).getS3AStreamStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the inner stream of an input stream.
|
||||||
|
* Raises an exception if the inner stream is not an S3A input stream
|
||||||
|
* @param in wrapper
|
||||||
|
* @return the inner stream
|
||||||
|
* @throws AssertionError if the inner stream is of the wrong type
|
||||||
|
*/
|
||||||
|
public static S3AInputStream getS3AInputStream(
|
||||||
|
FSDataInputStream in) {
|
||||||
|
InputStream inner = in.getWrappedStream();
|
||||||
|
if (inner instanceof S3AInputStream) {
|
||||||
|
return (S3AInputStream) inner;
|
||||||
|
} else {
|
||||||
|
throw new AssertionError("Not an S3AInputStream: " + inner);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,8 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_RE
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
|
||||||
|
|
|
@ -19,19 +19,14 @@
|
||||||
package org.apache.hadoop.fs.s3a.scale;
|
package org.apache.hadoop.fs.s3a.scale;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
import org.apache.hadoop.fs.s3a.S3AInputStream;
|
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
||||||
import org.apache.hadoop.fs.s3a.Statistic;
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic;
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic;
|
||||||
|
|
||||||
|
@ -154,34 +149,6 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
|
||||||
return getTestTimeoutSeconds() * 1000;
|
return getTestTimeoutSeconds() * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the input stream statistics of an input stream.
|
|
||||||
* Raises an exception if the inner stream is not an S3A input stream
|
|
||||||
* @param in wrapper
|
|
||||||
* @return the statistics for the inner stream
|
|
||||||
*/
|
|
||||||
protected S3AInputStreamStatistics getInputStreamStatistics(
|
|
||||||
FSDataInputStream in) {
|
|
||||||
return getS3AInputStream(in).getS3AStreamStatistics();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the inner stream of an input stream.
|
|
||||||
* Raises an exception if the inner stream is not an S3A input stream
|
|
||||||
* @param in wrapper
|
|
||||||
* @return the inner stream
|
|
||||||
* @throws AssertionError if the inner stream is of the wrong type
|
|
||||||
*/
|
|
||||||
protected S3AInputStream getS3AInputStream(
|
|
||||||
FSDataInputStream in) {
|
|
||||||
InputStream inner = in.getWrappedStream();
|
|
||||||
if (inner instanceof S3AInputStream) {
|
|
||||||
return (S3AInputStream) inner;
|
|
||||||
} else {
|
|
||||||
throw new AssertionError("Not an S3AInputStream: " + inner);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the gauge value of a statistic from the
|
* Get the gauge value of a statistic from the
|
||||||
* IOStatistics of the filesystem. Raises an assertion if
|
* IOStatistics of the filesystem. Raises an assertion if
|
||||||
|
|
Loading…
Reference in New Issue