diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index dd6cdd72174..3c4093d8cd1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -132,7 +132,7 @@ private synchronized void reopen(String reason, long targetPos, long length) throws IOException { if (wrappedStream != null) { - closeStream("reopen(" + reason + ")", contentRangeFinish); + closeStream("reopen(" + reason + ")", contentRangeFinish, false); } contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, @@ -257,7 +257,7 @@ private void seekInStream(long targetPos, long length) throws IOException { // if the code reaches here, the stream needs to be reopened. // close the stream; if read the object will be opened at the new pos - closeStream("seekInStream()", this.contentRangeFinish); + closeStream("seekInStream()", this.contentRangeFinish, false); pos = targetPos; } @@ -414,7 +414,7 @@ public synchronized void close() throws IOException { closed = true; try { // close or abort the stream - closeStream("close() operation", this.contentRangeFinish); + closeStream("close() operation", this.contentRangeFinish, false); // this is actually a no-op super.close(); } finally { @@ -431,17 +431,17 @@ public synchronized void close() throws IOException { * an abort. * * This does not set the {@link #closed} flag. - * * @param reason reason for stream being closed; used in messages * @param length length of the stream. + * @param forceAbort force an abort; used if explicitly requested. */ - private void closeStream(String reason, long length) { + private void closeStream(String reason, long length, boolean forceAbort) { if (wrappedStream != null) { // if the amount of data remaining in the current request is greater // than the readahead value: abort. long remaining = remainingInCurrentRequest(); - boolean shouldAbort = remaining > readahead; + boolean shouldAbort = forceAbort || remaining > readahead; if (!shouldAbort) { try { // clean close. This will read to the end of the stream, @@ -470,6 +470,27 @@ private void closeStream(String reason, long length) { } } + /** + * Forcibly reset the stream, by aborting the connection. The next + * {@code read()} operation will trigger the opening of a new HTTPS + * connection. + * + * This is potentially very inefficient, and should only be invoked + * in extreme circumstances. It logs at info for this reason. + * @return true if the connection was actually reset. + * @throws IOException if invoked on a closed stream. + */ + @InterfaceStability.Unstable + public synchronized boolean resetConnection() throws IOException { + checkNotClosed(); + boolean connectionOpen = wrappedStream != null; + if (connectionOpen) { + LOG.info("Forced reset of connection to {}", uri); + closeStream("reset()", contentRangeFinish, true); + } + return connectionOpen; + } + @Override public synchronized int available() throws IOException { checkNotClosed(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index dedbfd420d0..aeb8403ff8d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -509,6 +509,7 @@ static int intOption(Configuration conf, String key, int defVal, int min) { Preconditions.checkArgument(v >= min, String.format("Value of %s: %d is below the minimum value %d", key, v, min)); + LOG.debug("Value of {} is {}", key, v); return v; } @@ -529,6 +530,7 @@ static long longOption(Configuration conf, Preconditions.checkArgument(v >= min, String.format("Value of %s: %d is below the minimum value %d", key, v, min)); + LOG.debug("Value of {} is {}", key, v); return v; } @@ -550,6 +552,7 @@ static long longBytesOption(Configuration conf, Preconditions.checkArgument(v >= min, String.format("Value of %s: %d is below the minimum value %d", key, v, min)); + LOG.debug("Value of {} is {}", key, v); return v; } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md new file mode 100644 index 00000000000..d79720e76fc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -0,0 +1,52 @@ + + +# Troubleshooting S3A + +Here are some lower level details and hints on troubleshooting and tuning +the S3A client. + +## Logging at lower levels + +The AWS SDK and the Apache HTTP components can be configured to log at +more detail, as can S3A itself. + +```properties +log4j.logger.org.apache.hadoop.fs.s3a=DEBUG +log4j.logger.com.amazonaws.request=DEBUG +log4j.logger.org.apache.http=DEBUG +log4j.logger.org.apache.http.wire=ERROR +``` + +Be aware that logging HTTP headers may leak sensitive AWS account information, +so should not be shared. + +## Advanced: network performance + +An example of this is covered in [HADOOP-13871](https://issues.apache.org/jira/browse/HADOOP-13871). + +1. For public data, use `curl`: + + curl -O https://landsat-pds.s3.amazonaws.com/scene_list.gz +1. Use `nettop` to monitor a processes connections. + +Consider reducing the connection timeout of the s3a connection. + +```xml + + fs.s3a.connection.timeout + 15000 + +``` +This *may* cause the client to react faster to network pauses. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index cc8187e6276..e36d086ba79 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -35,6 +35,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.util.LineReader; import org.junit.After; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -216,12 +217,18 @@ public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { long count = 0; // implicitly rounding down here long blockCount = len / blockSize; + long totalToRead = blockCount * blockSize; + long minimumBandwidth = 128 * 1024; + int maxResetCount = 4; + int resetCount = 0; for (long i = 0; i < blockCount; i++) { int offset = 0; int remaining = blockSize; + long blockId = i + 1; NanoTimer blockTimer = new NanoTimer(); int reads = 0; while (remaining > 0) { + NanoTimer readTimer = new NanoTimer(); int bytesRead = in.read(block, offset, remaining); reads++; if (bytesRead == 1) { @@ -230,14 +237,48 @@ public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { remaining -= bytesRead; offset += bytesRead; count += bytesRead; + readTimer.end(); + if (bytesRead != 0) { + LOG.debug("Bytes in read #{}: {} , block bytes: {}," + + " remaining in block: {}" + + " duration={} nS; ns/byte: {}, bandwidth={} MB/s", + reads, bytesRead, blockSize - remaining, remaining, + readTimer.duration(), + readTimer.nanosPerOperation(bytesRead), + readTimer.bandwidthDescription(bytesRead)); + } else { + LOG.warn("0 bytes returned by read() operation #{}", reads); + } + } + blockTimer.end("Reading block %d in %d reads", blockId, reads); + String bw = blockTimer.bandwidthDescription(blockSize); + LOG.info("Bandwidth of block {}: {} MB/s: ", blockId, bw); + if (bandwidth(blockTimer, blockSize) < minimumBandwidth) { + LOG.warn("Bandwidth {} too low on block {}: resetting connection", + bw, blockId); + Assert.assertTrue("Bandwidth of " + bw +" too low after " + + resetCount + " attempts", resetCount <= maxResetCount); + resetCount++; + // reset the connection + getS3AInputStream(in).resetConnection(); } - blockTimer.end("Reading block %d in %d reads", i, reads); } - timer2.end("Time to read %d bytes in %d blocks", len, blockCount); - bandwidth(timer2, count); + timer2.end("Time to read %d bytes in %d blocks", totalToRead, blockCount); + LOG.info("Overall Bandwidth {} MB/s; reset connections {}", + timer2.bandwidth(totalToRead), resetCount); logStreamStatistics(); } + /** + * Work out the bandwidth in bytes/second. + * @param timer timer measuring the duration + * @param bytes bytes + * @return the number of bytes/second of the recorded operation + */ + public static double bandwidth(NanoTimer timer, long bytes) { + return bytes * 1.0e9 / timer.duration(); + } + @Test public void testLazySeekEnabled() throws Throwable { describe("Verify that seeks do not trigger any IO"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index c4174bfd63b..9da621f872e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; -import org.junit.Assert; import org.junit.Assume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,14 +162,23 @@ protected int getTestTimeoutMillis() { */ protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics( FSDataInputStream in) { + return getS3AInputStream(in).getS3AStreamStatistics(); + } + + /** + * Get the inner stream of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the inner stream + * @throws AssertionError if the inner stream is of the wrong type + */ + protected S3AInputStream getS3AInputStream( + FSDataInputStream in) { InputStream inner = in.getWrappedStream(); if (inner instanceof S3AInputStream) { - S3AInputStream s3a = (S3AInputStream) inner; - return s3a.getS3AStreamStatistics(); + return (S3AInputStream) inner; } else { - Assert.fail("Not an S3AInputStream: " + inner); - // never reached - return null; + throw new AssertionError("Not an S3AInputStream: " + inner); } }