diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 94d7701cd61..1172a318b94 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -84,7 +84,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private final S3AInstrumentation.InputStreamStatistics streamStatistics; private S3AEncryptionMethods serverSideEncryptionAlgorithm; private String serverSideEncryptionKey; - private final S3AInputPolicy inputPolicy; + private S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; /** @@ -124,10 +124,20 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { this.serverSideEncryptionAlgorithm = s3Attributes.getServerSideEncryptionAlgorithm(); this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey(); - this.inputPolicy = inputPolicy; + setInputPolicy(inputPolicy); setReadahead(readahead); } + /** + * Set/update the input policy of the stream. + * This updates the stream statistics. + * @param inputPolicy new input policy. + */ + private void setInputPolicy(S3AInputPolicy inputPolicy) { + this.inputPolicy = inputPolicy; + streamStatistics.inputPolicySet(inputPolicy.ordinal()); + } + /** * Opens up the stream at specified target position and for given length. * @@ -146,8 +156,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, length, contentLength, readahead); LOG.debug("reopen({}) for {} range[{}-{}], length={}," + - " streamPosition={}, nextReadPosition={}", - uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos); + " streamPosition={}, nextReadPosition={}, policy={}", + uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos, + inputPolicy); streamStatistics.streamOpened(); try { @@ -258,6 +269,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { } else if (diff < 0) { // backwards seek streamStatistics.seekBackwards(diff); + // if the stream is in "Normal" mode, switch to random IO at this + // point, as it is indicative of columnar format IO + if (inputPolicy.equals(S3AInputPolicy.Normal)) { + LOG.info("Switching to Random IO seek policy"); + setInputPolicy(S3AInputPolicy.Random); + } } else { // targetPos == pos if (remainingInCurrentRequest() > 0) { @@ -427,6 +444,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { try { // close or abort the stream closeStream("close() operation", this.contentRangeFinish, false); + LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); // this is actually a no-op super.close(); } finally { @@ -697,6 +715,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { break; case Normal: + // normal is considered sequential until a backwards seek switches + // it to 'Random' default: rangeLimit = contentLength; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index da1fc5a807c..90aec12635c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -553,6 +553,8 @@ public class S3AInstrumentation { public long readsIncomplete; public long bytesReadInClose; public long bytesDiscardedInAbort; + public long policySetCount; + public long inputPolicy; private InputStreamStatistics() { } @@ -665,6 +667,15 @@ public class S3AInstrumentation { mergeInputStreamStatistics(this); } + /** + * The input policy has been switched. + * @param updatedPolicy enum value of new policy. + */ + public void inputPolicySet(int updatedPolicy) { + policySetCount++; + inputPolicy = updatedPolicy; + } + /** * String operator describes all the current statistics. * Important: there are no guarantees as to the stability @@ -696,6 +707,8 @@ public class S3AInstrumentation { sb.append(", ReadsIncomplete=").append(readsIncomplete); sb.append(", BytesReadInClose=").append(bytesReadInClose); sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort); + sb.append(", InputPolicy=").append(inputPolicy); + sb.append(", InputPolicySetCount=").append(policySetCount); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 75c638f5831..ea38db69c63 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1393,7 +1393,18 @@ backward seeks. *"normal" (default)* -This is currently the same as "sequential", though it may evolve in future. +The "Normal" policy starts off reading a file in "sequential" mode, +but if the caller seeks backwards in the stream, it switches from +sequential to "random". + +This policy effectively recognizes the initial read pattern of columnar +storage formats (e.g. Apache ORC and Apache Parquet), which seek to the end +of a file, read in index data and then seek backwards to selectively read +columns. The first seeks may be be expensive compared to the random policy, +however the overall process is much less expensive than either sequentially +reading through a file with the "random" policy, or reading columnar data +with the "sequential" policy. When the exact format/recommended +seek policy of data are known in advance, this policy *"random"* diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index 83ab2102bf6..efd96c4e738 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -427,7 +427,11 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase { long expectedOpenCount = RANDOM_IO_SEQUENCE.length; executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount); assertEquals("streams aborted in " + streamStatistics, - 4, streamStatistics.aborted); + 1, streamStatistics.aborted); + assertEquals("policy changes in " + streamStatistics, + 2, streamStatistics.policySetCount); + assertEquals("input policy in " + streamStatistics, + S3AInputPolicy.Random.ordinal(), streamStatistics.inputPolicy); } /**