HADOOP-13871. ITestS3AInputStreamPerformance.testTimeToOpenAndReadWholeFileBlocks performance awful. Contributed by Steve Loughran

This commit is contained in:
Mingliang Liu 2016-12-12 14:55:34 -08:00
parent f66f61892a
commit c6a3923245
5 changed files with 140 additions and 15 deletions

View File

@ -132,7 +132,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
throws IOException {
if (wrappedStream != null) {
closeStream("reopen(" + reason + ")", contentRangeFinish);
closeStream("reopen(" + reason + ")", contentRangeFinish, false);
}
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@ -257,7 +257,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
// if the code reaches here, the stream needs to be reopened.
// close the stream; if read the object will be opened at the new pos
closeStream("seekInStream()", this.contentRangeFinish);
closeStream("seekInStream()", this.contentRangeFinish, false);
pos = targetPos;
}
@ -414,7 +414,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
closed = true;
try {
// close or abort the stream
closeStream("close() operation", this.contentRangeFinish);
closeStream("close() operation", this.contentRangeFinish, false);
// this is actually a no-op
super.close();
} finally {
@ -431,17 +431,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* an abort.
*
* This does not set the {@link #closed} flag.
*
* @param reason reason for stream being closed; used in messages
* @param length length of the stream.
* @param forceAbort force an abort; used if explicitly requested.
*/
private void closeStream(String reason, long length) {
private void closeStream(String reason, long length, boolean forceAbort) {
if (wrappedStream != null) {
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
boolean shouldAbort = remaining > readahead;
boolean shouldAbort = forceAbort || remaining > readahead;
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
@ -470,6 +470,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
}
}
/**
* Forcibly reset the stream, by aborting the connection. The next
* {@code read()} operation will trigger the opening of a new HTTPS
* connection.
*
* This is potentially very inefficient, and should only be invoked
* in extreme circumstances. It logs at info for this reason.
* @return true if the connection was actually reset.
* @throws IOException if invoked on a closed stream.
*/
@InterfaceStability.Unstable
public synchronized boolean resetConnection() throws IOException {
checkNotClosed();
boolean connectionOpen = wrappedStream != null;
if (connectionOpen) {
LOG.info("Forced reset of connection to {}", uri);
closeStream("reset()", contentRangeFinish, true);
}
return connectionOpen;
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();

View File

@ -509,6 +509,7 @@ public final class S3AUtils {
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
LOG.debug("Value of {} is {}", key, v);
return v;
}
@ -529,6 +530,7 @@ public final class S3AUtils {
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
LOG.debug("Value of {} is {}", key, v);
return v;
}
@ -550,6 +552,7 @@ public final class S3AUtils {
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
LOG.debug("Value of {} is {}", key, v);
return v;
}

View File

@ -0,0 +1,52 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# Troubleshooting S3A
Here are some lower level details and hints on troubleshooting and tuning
the S3A client.
## Logging at lower levels
The AWS SDK and the Apache HTTP components can be configured to log at
more detail, as can S3A itself.
```properties
log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
log4j.logger.com.amazonaws.request=DEBUG
log4j.logger.org.apache.http=DEBUG
log4j.logger.org.apache.http.wire=ERROR
```
Be aware that logging HTTP headers may leak sensitive AWS account information,
so should not be shared.
## Advanced: network performance
An example of this is covered in [HADOOP-13871](https://issues.apache.org/jira/browse/HADOOP-13871).
1. For public data, use `curl`:
curl -O https://landsat-pds.s3.amazonaws.com/scene_list.gz
1. Use `nettop` to monitor a processes connections.
Consider reducing the connection timeout of the s3a connection.
```xml
<property>
<name>fs.s3a.connection.timeout</name>
<value>15000</value>
</property>
```
This *may* cause the client to react faster to network pauses.

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@ -216,12 +217,18 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
long count = 0;
// implicitly rounding down here
long blockCount = len / blockSize;
long totalToRead = blockCount * blockSize;
long minimumBandwidth = 128 * 1024;
int maxResetCount = 4;
int resetCount = 0;
for (long i = 0; i < blockCount; i++) {
int offset = 0;
int remaining = blockSize;
long blockId = i + 1;
NanoTimer blockTimer = new NanoTimer();
int reads = 0;
while (remaining > 0) {
NanoTimer readTimer = new NanoTimer();
int bytesRead = in.read(block, offset, remaining);
reads++;
if (bytesRead == 1) {
@ -230,14 +237,48 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
remaining -= bytesRead;
offset += bytesRead;
count += bytesRead;
readTimer.end();
if (bytesRead != 0) {
LOG.debug("Bytes in read #{}: {} , block bytes: {}," +
" remaining in block: {}" +
" duration={} nS; ns/byte: {}, bandwidth={} MB/s",
reads, bytesRead, blockSize - remaining, remaining,
readTimer.duration(),
readTimer.nanosPerOperation(bytesRead),
readTimer.bandwidthDescription(bytesRead));
} else {
LOG.warn("0 bytes returned by read() operation #{}", reads);
}
blockTimer.end("Reading block %d in %d reads", i, reads);
}
timer2.end("Time to read %d bytes in %d blocks", len, blockCount);
bandwidth(timer2, count);
blockTimer.end("Reading block %d in %d reads", blockId, reads);
String bw = blockTimer.bandwidthDescription(blockSize);
LOG.info("Bandwidth of block {}: {} MB/s: ", blockId, bw);
if (bandwidth(blockTimer, blockSize) < minimumBandwidth) {
LOG.warn("Bandwidth {} too low on block {}: resetting connection",
bw, blockId);
Assert.assertTrue("Bandwidth of " + bw +" too low after "
+ resetCount + " attempts", resetCount <= maxResetCount);
resetCount++;
// reset the connection
getS3AInputStream(in).resetConnection();
}
}
timer2.end("Time to read %d bytes in %d blocks", totalToRead, blockCount);
LOG.info("Overall Bandwidth {} MB/s; reset connections {}",
timer2.bandwidth(totalToRead), resetCount);
logStreamStatistics();
}
/**
* Work out the bandwidth in bytes/second.
* @param timer timer measuring the duration
* @param bytes bytes
* @return the number of bytes/second of the recorded operation
*/
public static double bandwidth(NanoTimer timer, long bytes) {
return bytes * 1.0e9 / timer.duration();
}
@Test
public void testLazySeekEnabled() throws Throwable {
describe("Verify that seeks do not trigger any IO");

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.junit.Assert;
import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -163,14 +162,23 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
*/
protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
FSDataInputStream in) {
return getS3AInputStream(in).getS3AStreamStatistics();
}
/**
* Get the inner stream of an input stream.
* Raises an exception if the inner stream is not an S3A input stream
* @param in wrapper
* @return the inner stream
* @throws AssertionError if the inner stream is of the wrong type
*/
protected S3AInputStream getS3AInputStream(
FSDataInputStream in) {
InputStream inner = in.getWrappedStream();
if (inner instanceof S3AInputStream) {
S3AInputStream s3a = (S3AInputStream) inner;
return s3a.getS3AStreamStatistics();
return (S3AInputStream) inner;
} else {
Assert.fail("Not an S3AInputStream: " + inner);
// never reached
return null;
throw new AssertionError("Not an S3AInputStream: " + inner);
}
}