HADOOP-13871. ITestS3AInputStreamPerformance.testTimeToOpenAndReadWholeFileBlocks performance awful. Contributed by Steve Loughran
(cherry picked from commit c6a3923245
)
This commit is contained in:
parent
69db9eecfa
commit
01b50b36b6
|
@ -132,7 +132,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
if (wrappedStream != null) {
|
if (wrappedStream != null) {
|
||||||
closeStream("reopen(" + reason + ")", contentRangeFinish);
|
closeStream("reopen(" + reason + ")", contentRangeFinish, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
|
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
|
||||||
|
@ -257,7 +257,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
||||||
|
|
||||||
// if the code reaches here, the stream needs to be reopened.
|
// if the code reaches here, the stream needs to be reopened.
|
||||||
// close the stream; if read the object will be opened at the new pos
|
// close the stream; if read the object will be opened at the new pos
|
||||||
closeStream("seekInStream()", this.contentRangeFinish);
|
closeStream("seekInStream()", this.contentRangeFinish, false);
|
||||||
pos = targetPos;
|
pos = targetPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,7 +414,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
||||||
closed = true;
|
closed = true;
|
||||||
try {
|
try {
|
||||||
// close or abort the stream
|
// close or abort the stream
|
||||||
closeStream("close() operation", this.contentRangeFinish);
|
closeStream("close() operation", this.contentRangeFinish, false);
|
||||||
// this is actually a no-op
|
// this is actually a no-op
|
||||||
super.close();
|
super.close();
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -431,17 +431,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
||||||
* an abort.
|
* an abort.
|
||||||
*
|
*
|
||||||
* This does not set the {@link #closed} flag.
|
* This does not set the {@link #closed} flag.
|
||||||
*
|
|
||||||
* @param reason reason for stream being closed; used in messages
|
* @param reason reason for stream being closed; used in messages
|
||||||
* @param length length of the stream.
|
* @param length length of the stream.
|
||||||
|
* @param forceAbort force an abort; used if explicitly requested.
|
||||||
*/
|
*/
|
||||||
private void closeStream(String reason, long length) {
|
private void closeStream(String reason, long length, boolean forceAbort) {
|
||||||
if (wrappedStream != null) {
|
if (wrappedStream != null) {
|
||||||
|
|
||||||
// if the amount of data remaining in the current request is greater
|
// if the amount of data remaining in the current request is greater
|
||||||
// than the readahead value: abort.
|
// than the readahead value: abort.
|
||||||
long remaining = remainingInCurrentRequest();
|
long remaining = remainingInCurrentRequest();
|
||||||
boolean shouldAbort = remaining > readahead;
|
boolean shouldAbort = forceAbort || remaining > readahead;
|
||||||
if (!shouldAbort) {
|
if (!shouldAbort) {
|
||||||
try {
|
try {
|
||||||
// clean close. This will read to the end of the stream,
|
// clean close. This will read to the end of the stream,
|
||||||
|
@ -470,6 +470,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forcibly reset the stream, by aborting the connection. The next
|
||||||
|
* {@code read()} operation will trigger the opening of a new HTTPS
|
||||||
|
* connection.
|
||||||
|
*
|
||||||
|
* This is potentially very inefficient, and should only be invoked
|
||||||
|
* in extreme circumstances. It logs at info for this reason.
|
||||||
|
* @return true if the connection was actually reset.
|
||||||
|
* @throws IOException if invoked on a closed stream.
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public synchronized boolean resetConnection() throws IOException {
|
||||||
|
checkNotClosed();
|
||||||
|
boolean connectionOpen = wrappedStream != null;
|
||||||
|
if (connectionOpen) {
|
||||||
|
LOG.info("Forced reset of connection to {}", uri);
|
||||||
|
closeStream("reset()", contentRangeFinish, true);
|
||||||
|
}
|
||||||
|
return connectionOpen;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int available() throws IOException {
|
public synchronized int available() throws IOException {
|
||||||
checkNotClosed();
|
checkNotClosed();
|
||||||
|
|
|
@ -509,6 +509,7 @@ public final class S3AUtils {
|
||||||
Preconditions.checkArgument(v >= min,
|
Preconditions.checkArgument(v >= min,
|
||||||
String.format("Value of %s: %d is below the minimum value %d",
|
String.format("Value of %s: %d is below the minimum value %d",
|
||||||
key, v, min));
|
key, v, min));
|
||||||
|
LOG.debug("Value of {} is {}", key, v);
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -529,6 +530,7 @@ public final class S3AUtils {
|
||||||
Preconditions.checkArgument(v >= min,
|
Preconditions.checkArgument(v >= min,
|
||||||
String.format("Value of %s: %d is below the minimum value %d",
|
String.format("Value of %s: %d is below the minimum value %d",
|
||||||
key, v, min));
|
key, v, min));
|
||||||
|
LOG.debug("Value of {} is {}", key, v);
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -550,6 +552,7 @@ public final class S3AUtils {
|
||||||
Preconditions.checkArgument(v >= min,
|
Preconditions.checkArgument(v >= min,
|
||||||
String.format("Value of %s: %d is below the minimum value %d",
|
String.format("Value of %s: %d is below the minimum value %d",
|
||||||
key, v, min));
|
key, v, min));
|
||||||
|
LOG.debug("Value of {} is {}", key, v);
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
<!---
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
|
||||||
|
# Troubleshooting S3A
|
||||||
|
|
||||||
|
Here are some lower level details and hints on troubleshooting and tuning
|
||||||
|
the S3A client.
|
||||||
|
|
||||||
|
## Logging at lower levels
|
||||||
|
|
||||||
|
The AWS SDK and the Apache HTTP components can be configured to log at
|
||||||
|
more detail, as can S3A itself.
|
||||||
|
|
||||||
|
```properties
|
||||||
|
log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
|
||||||
|
log4j.logger.com.amazonaws.request=DEBUG
|
||||||
|
log4j.logger.org.apache.http=DEBUG
|
||||||
|
log4j.logger.org.apache.http.wire=ERROR
|
||||||
|
```
|
||||||
|
|
||||||
|
Be aware that logging HTTP headers may leak sensitive AWS account information,
|
||||||
|
so should not be shared.
|
||||||
|
|
||||||
|
## Advanced: network performance
|
||||||
|
|
||||||
|
An example of this is covered in [HADOOP-13871](https://issues.apache.org/jira/browse/HADOOP-13871).
|
||||||
|
|
||||||
|
1. For public data, use `curl`:
|
||||||
|
|
||||||
|
curl -O https://landsat-pds.s3.amazonaws.com/scene_list.gz
|
||||||
|
1. Use `nettop` to monitor a processes connections.
|
||||||
|
|
||||||
|
Consider reducing the connection timeout of the s3a connection.
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.connection.timeout</name>
|
||||||
|
<value>15000</value>
|
||||||
|
</property>
|
||||||
|
```
|
||||||
|
This *may* cause the client to react faster to network pauses.
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||||
import org.apache.hadoop.util.LineReader;
|
import org.apache.hadoop.util.LineReader;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -216,12 +217,18 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
|
||||||
long count = 0;
|
long count = 0;
|
||||||
// implicitly rounding down here
|
// implicitly rounding down here
|
||||||
long blockCount = len / blockSize;
|
long blockCount = len / blockSize;
|
||||||
|
long totalToRead = blockCount * blockSize;
|
||||||
|
long minimumBandwidth = 128 * 1024;
|
||||||
|
int maxResetCount = 4;
|
||||||
|
int resetCount = 0;
|
||||||
for (long i = 0; i < blockCount; i++) {
|
for (long i = 0; i < blockCount; i++) {
|
||||||
int offset = 0;
|
int offset = 0;
|
||||||
int remaining = blockSize;
|
int remaining = blockSize;
|
||||||
|
long blockId = i + 1;
|
||||||
NanoTimer blockTimer = new NanoTimer();
|
NanoTimer blockTimer = new NanoTimer();
|
||||||
int reads = 0;
|
int reads = 0;
|
||||||
while (remaining > 0) {
|
while (remaining > 0) {
|
||||||
|
NanoTimer readTimer = new NanoTimer();
|
||||||
int bytesRead = in.read(block, offset, remaining);
|
int bytesRead = in.read(block, offset, remaining);
|
||||||
reads++;
|
reads++;
|
||||||
if (bytesRead == 1) {
|
if (bytesRead == 1) {
|
||||||
|
@ -230,14 +237,48 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
|
||||||
remaining -= bytesRead;
|
remaining -= bytesRead;
|
||||||
offset += bytesRead;
|
offset += bytesRead;
|
||||||
count += bytesRead;
|
count += bytesRead;
|
||||||
|
readTimer.end();
|
||||||
|
if (bytesRead != 0) {
|
||||||
|
LOG.debug("Bytes in read #{}: {} , block bytes: {}," +
|
||||||
|
" remaining in block: {}" +
|
||||||
|
" duration={} nS; ns/byte: {}, bandwidth={} MB/s",
|
||||||
|
reads, bytesRead, blockSize - remaining, remaining,
|
||||||
|
readTimer.duration(),
|
||||||
|
readTimer.nanosPerOperation(bytesRead),
|
||||||
|
readTimer.bandwidthDescription(bytesRead));
|
||||||
|
} else {
|
||||||
|
LOG.warn("0 bytes returned by read() operation #{}", reads);
|
||||||
}
|
}
|
||||||
blockTimer.end("Reading block %d in %d reads", i, reads);
|
|
||||||
}
|
}
|
||||||
timer2.end("Time to read %d bytes in %d blocks", len, blockCount);
|
blockTimer.end("Reading block %d in %d reads", blockId, reads);
|
||||||
bandwidth(timer2, count);
|
String bw = blockTimer.bandwidthDescription(blockSize);
|
||||||
|
LOG.info("Bandwidth of block {}: {} MB/s: ", blockId, bw);
|
||||||
|
if (bandwidth(blockTimer, blockSize) < minimumBandwidth) {
|
||||||
|
LOG.warn("Bandwidth {} too low on block {}: resetting connection",
|
||||||
|
bw, blockId);
|
||||||
|
Assert.assertTrue("Bandwidth of " + bw +" too low after "
|
||||||
|
+ resetCount + " attempts", resetCount <= maxResetCount);
|
||||||
|
resetCount++;
|
||||||
|
// reset the connection
|
||||||
|
getS3AInputStream(in).resetConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
timer2.end("Time to read %d bytes in %d blocks", totalToRead, blockCount);
|
||||||
|
LOG.info("Overall Bandwidth {} MB/s; reset connections {}",
|
||||||
|
timer2.bandwidth(totalToRead), resetCount);
|
||||||
logStreamStatistics();
|
logStreamStatistics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Work out the bandwidth in bytes/second.
|
||||||
|
* @param timer timer measuring the duration
|
||||||
|
* @param bytes bytes
|
||||||
|
* @return the number of bytes/second of the recorded operation
|
||||||
|
*/
|
||||||
|
public static double bandwidth(NanoTimer timer, long bytes) {
|
||||||
|
return bytes * 1.0e9 / timer.duration();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLazySeekEnabled() throws Throwable {
|
public void testLazySeekEnabled() throws Throwable {
|
||||||
describe("Verify that seeks do not trigger any IO");
|
describe("Verify that seeks do not trigger any IO");
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
||||||
import org.apache.hadoop.fs.s3a.Statistic;
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -163,14 +162,23 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
|
||||||
*/
|
*/
|
||||||
protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
|
protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
|
||||||
FSDataInputStream in) {
|
FSDataInputStream in) {
|
||||||
|
return getS3AInputStream(in).getS3AStreamStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the inner stream of an input stream.
|
||||||
|
* Raises an exception if the inner stream is not an S3A input stream
|
||||||
|
* @param in wrapper
|
||||||
|
* @return the inner stream
|
||||||
|
* @throws AssertionError if the inner stream is of the wrong type
|
||||||
|
*/
|
||||||
|
protected S3AInputStream getS3AInputStream(
|
||||||
|
FSDataInputStream in) {
|
||||||
InputStream inner = in.getWrappedStream();
|
InputStream inner = in.getWrappedStream();
|
||||||
if (inner instanceof S3AInputStream) {
|
if (inner instanceof S3AInputStream) {
|
||||||
S3AInputStream s3a = (S3AInputStream) inner;
|
return (S3AInputStream) inner;
|
||||||
return s3a.getS3AStreamStatistics();
|
|
||||||
} else {
|
} else {
|
||||||
Assert.fail("Not an S3AInputStream: " + inner);
|
throw new AssertionError("Not an S3AInputStream: " + inner);
|
||||||
// never reached
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue