diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 120a15139d5..e36214dc1fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1564,8 +1564,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, CompletableFuture result = new CompletableFuture<>(); unboundedThreadPool.submit(() -> LambdaUtils.eval(result, () -> { + LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId()); try (AuditSpan span = auditSpan.activate()) { return operation.apply(); + } finally { + LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId()); } })); return result; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c20c3a04863..7604178b2f1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -51,6 +51,8 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.VectoredReadUtils; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; +import org.apache.hadoop.fs.s3a.impl.InternalConstants; +import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -65,7 +67,6 @@ import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -97,10 +98,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, public static final String OPERATION_OPEN = "open"; public static final String OPERATION_REOPEN = "re-open"; - /** - * size of a buffer to create when draining the stream. - */ - private static final int DRAIN_BUFFER_SIZE = 16384; /** * This is the maximum temporary buffer size we use while * populating the data in direct byte buffers during a vectored IO @@ -242,6 +239,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, streamStatistics.inputPolicySet(inputPolicy.ordinal()); } + /** + * Get the current input policy. + * @return input policy. + */ + @VisibleForTesting + public S3AInputPolicy getInputPolicy() { + return inputPolicy; + } + /** * Opens up the stream at specified target position and for given length. * @@ -604,7 +610,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, try { stopVectoredIOOperations.set(true); // close or abort the stream; blocking - awaitFuture(closeStream("close() operation", false, true)); + closeStream("close() operation", false, true); // end the client+audit span. client.close(); // this is actually a no-op @@ -664,18 +670,25 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, forceAbort ? "abort" : "soft"); boolean shouldAbort = forceAbort || remaining > readahead; CompletableFuture operation; + SDKStreamDrainer drainer = new SDKStreamDrainer( + uri, + object, + wrappedStream, + shouldAbort, + (int) remaining, + streamStatistics, + reason); if (blocking || shouldAbort || remaining <= asyncDrainThreshold) { - // don't bother with async io. - operation = CompletableFuture.completedFuture( - drain(shouldAbort, reason, remaining, object, wrappedStream)); + // don't bother with async IO if the caller plans to wait for + // the result, there's an abort (which is fast), or + // there is not much data to read. + operation = CompletableFuture.completedFuture(drainer.apply()); } else { LOG.debug("initiating asynchronous drain of {} bytes", remaining); - // schedule an async drain/abort with references to the fields so they - // can be reused - operation = client.submit( - () -> drain(false, reason, remaining, object, wrappedStream)); + // schedule an async drain/abort + operation = client.submit(drainer); } // either the stream is closed in the blocking call or the async call is @@ -685,117 +698,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, return operation; } - /** - * drain the stream. This method is intended to be - * used directly or asynchronously, and measures the - * duration of the operation in the stream statistics. - * @param shouldAbort force an abort; used if explicitly requested. - * @param reason reason for stream being closed; used in messages - * @param remaining remaining bytes - * @param requestObject http request object; needed to avoid GC issues. - * @param inner stream to close. - * @return was the stream aborted? - */ - private boolean drain( - final boolean shouldAbort, - final String reason, - final long remaining, - final S3Object requestObject, - final S3ObjectInputStream inner) { - - try { - return invokeTrackingDuration( - streamStatistics.initiateInnerStreamClose(shouldAbort), - () -> drainOrAbortHttpStream( - shouldAbort, - reason, - remaining, - requestObject, - inner)); - } catch (IOException e) { - // this is only here because invokeTrackingDuration() has it in its - // signature - return shouldAbort; - } - } - - /** - * Drain or abort the inner stream. - * Exceptions are swallowed. - * If a close() is attempted and fails, the operation escalates to - * an abort. - * - * This does not set the {@link #closed} flag. - * - * A reference to the stream is passed in so that the instance - * {@link #wrappedStream} field can be reused as soon as this - * method is submitted; - * @param shouldAbort force an abort; used if explicitly requested. - * @param reason reason for stream being closed; used in messages - * @param remaining remaining bytes - * @param requestObject http request object; needed to avoid GC issues. - * @param inner stream to close. - * @return was the stream aborted? - */ - private boolean drainOrAbortHttpStream( - boolean shouldAbort, - final String reason, - final long remaining, - final S3Object requestObject, - final S3ObjectInputStream inner) { - // force a use of the request object so IDEs don't warn of - // lack of use. - requireNonNull(requestObject); - - if (!shouldAbort) { - try { - // clean close. This will read to the end of the stream, - // so, while cleaner, can be pathological on a multi-GB object - - // explicitly drain the stream - long drained = 0; - byte[] buffer = new byte[DRAIN_BUFFER_SIZE]; - while (true) { - final int count = inner.read(buffer); - if (count < 0) { - // no more data is left - break; - } - drained += count; - } - LOG.debug("Drained stream of {} bytes", drained); - - // now close it - inner.close(); - // this MUST come after the close, so that if the IO operations fail - // and an abort is triggered, the initial attempt's statistics - // aren't collected. - streamStatistics.streamClose(false, drained); - } catch (Exception e) { - // exception escalates to an abort - LOG.debug("When closing {} stream for {}, will abort the stream", - uri, reason, e); - shouldAbort = true; - } - } - if (shouldAbort) { - // Abort, rather than just close, the underlying stream. Otherwise, the - // remaining object payload is read from S3 while closing the stream. - LOG.debug("Aborting stream {}", uri); - try { - inner.abort(); - } catch (Exception e) { - LOG.warn("When aborting {} stream after failing to close it for {}", - uri, reason, e); - } - streamStatistics.streamClose(true, remaining); - } - LOG.debug("Stream {} {}: {}; remaining={}", - uri, (shouldAbort ? "aborted" : "closed"), reason, - remaining); - return shouldAbort; - } - /** * Forcibly reset the stream, by aborting the connection. The next * {@code read()} operation will trigger the opening of a new HTTPS @@ -1080,8 +982,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, int drainBytes = 0; int readCount; while (drainBytes < drainQuantity) { - if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) { - byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE]; + if (drainBytes + InternalConstants.DRAIN_BUFFER_SIZE <= drainQuantity) { + byte[] drainBuffer = new byte[InternalConstants.DRAIN_BUFFER_SIZE]; readCount = objectContent.read(drainBuffer); } else { byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)]; @@ -1345,6 +1247,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, closeStream("unbuffer()", false, false); } finally { streamStatistics.unbuffered(); + if (inputPolicy.isAdaptive()) { + S3AInputPolicy policy = S3AInputPolicy.Random; + LOG.debug("Switching to seek policy {} after unbuffer() invoked", policy); + setInputPolicy(policy); + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 2f2acff9cab..ace36744085 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -376,6 +376,7 @@ public final class S3AUtils { } else { String name = innerCause.getClass().getName(); if (name.endsWith(".ConnectTimeoutException") + || name.endsWith(".ConnectionPoolTimeoutException") || name.endsWith("$ConnectTimeoutException")) { // TCP connection http timeout from the shaded or unshaded filenames // com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 2dc88eeb85e..f5f88fd8010 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -378,6 +378,10 @@ public enum Statistic { StreamStatisticNames.STREAM_READ_TOTAL_BYTES, "Total count of bytes read from an input stream", TYPE_COUNTER), + STREAM_READ_UNBUFFERED( + StreamStatisticNames.STREAM_READ_UNBUFFERED, + "Total count of input stream unbuffering operations", + TYPE_COUNTER), /* Stream Write statistics */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 9a2ee086251..364966e3082 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -51,6 +51,11 @@ public final class InternalConstants { */ public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true; + /** + * size of a buffer to create when draining the stream. + */ + public static final int DRAIN_BUFFER_SIZE = 16384; + private InternalConstants() { } @@ -97,6 +102,7 @@ public final class InternalConstants { static { Set keys = Stream.of( + Constants.ASYNC_DRAIN_THRESHOLD, Constants.INPUT_FADVISE, Constants.READAHEAD_RANGE) .collect(Collectors.toSet()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java new file mode 100644 index 00000000000..b566f9ad427 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nullable; + +import com.amazonaws.internal.SdkFilterInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.util.functional.CallableRaisingIOE; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Drains/aborts s3 or other AWS SDK streams. + * It is callable so can be passed directly to a submitter + * for async invocation. + * A request object may be passed in; it will be implicitly + * cached until this object is GCd. + * This is because in some versions of the AWS SDK, the S3Object + * has a finalize() method which releases the http connection, + * even when the stream is still open. + * See HADOOP-17338 for details. + */ +public class SDKStreamDrainer implements CallableRaisingIOE { + + private static final Logger LOG = LoggerFactory.getLogger( + SDKStreamDrainer.class); + + /** + * URI for log messages. + */ + private final String uri; + + /** + * Request object; usually S3Object + * Never used, but needed to keep the http connection + * open long enough for draining to take place. + */ + @Nullable + private final Closeable requestObject; + + /** + * Stream from the {@link #requestObject} for draining and closing. + */ + private final SdkFilterInputStream sdkStream; + + /** + * Should the request be aborted? + */ + private final boolean shouldAbort; + + /** + * How many bytes remaining? + * This is decremented as the stream is + * drained; + * If the stream finished before the expected + * remaining value was read, this will show how many + * bytes were still expected. + */ + private int remaining; + + /** + * Statistics to update with the duration. + */ + private final S3AInputStreamStatistics streamStatistics; + + /** + * Reason? for log messages. + */ + private final String reason; + + /** + * Has the operation executed yet? + */ + private final AtomicBoolean executed = new AtomicBoolean(false); + + /** + * Any exception caught during execution. + */ + private Exception thrown; + + /** + * Was the stream aborted? + */ + private boolean aborted; + + /** + * how many bytes were drained? + */ + private int drained = 0; + + /** + * Prepare to drain the stream. + * @param uri URI for messages + * @param requestObject http request object; needed to avoid GC issues. + * @param sdkStream stream to close. + * @param shouldAbort force an abort; used if explicitly requested. + * @param streamStatistics stats to update + * @param reason reason for stream being closed; used in messages + * @param remaining remaining bytes + */ + public SDKStreamDrainer(final String uri, + @Nullable final Closeable requestObject, + final SdkFilterInputStream sdkStream, + final boolean shouldAbort, + final int remaining, + final S3AInputStreamStatistics streamStatistics, + final String reason) { + this.uri = uri; + this.requestObject = requestObject; + this.sdkStream = requireNonNull(sdkStream); + this.shouldAbort = shouldAbort; + this.remaining = remaining; + this.streamStatistics = requireNonNull(streamStatistics); + this.reason = reason; + } + + /** + * drain the stream. This method is intended to be + * used directly or asynchronously, and measures the + * duration of the operation in the stream statistics. + * @return was the stream aborted? + */ + @Override + public Boolean apply() { + try { + Boolean outcome = invokeTrackingDuration( + streamStatistics.initiateInnerStreamClose(shouldAbort), + this::drainOrAbortHttpStream); + aborted = outcome; + return outcome; + } catch (Exception e) { + thrown = e; + return aborted; + } + } + + /** + * Apply, raising any exception. + * For testing. + * @return the outcome. + * @throws Exception anything raised. + */ + @VisibleForTesting + boolean applyRaisingException() throws Exception { + Boolean outcome = apply(); + if (thrown != null) { + throw thrown; + } + return outcome; + } + + /** + * Drain or abort the inner stream. + * Exceptions are saved then swallowed. + * If a close() is attempted and fails, the operation escalates to + * an abort. + * @return true if the stream was aborted. + */ + private boolean drainOrAbortHttpStream() { + if (executed.getAndSet(true)) { + throw new IllegalStateException( + "duplicate invocation of drain operation"); + } + boolean executeAbort = shouldAbort; + LOG.debug("drain or abort reason {} remaining={} abort={}", + reason, remaining, executeAbort); + + if (!executeAbort) { + try { + // clean close. This will read to the end of the stream, + // so, while cleaner, can be pathological on a multi-GB object + + if (remaining > 0) { + // explicitly drain the stream + LOG.debug("draining {} bytes", remaining); + drained = 0; + int size = Math.min(remaining, DRAIN_BUFFER_SIZE); + byte[] buffer = new byte[size]; + // read the data; bail out early if + // the connection breaks. + // this may be a bit overaggressive on buffer underflow. + while (remaining > 0) { + final int count = sdkStream.read(buffer); + LOG.debug("read {} bytes", count); + if (count <= 0) { + // no more data is left + break; + } + drained += count; + remaining -= count; + } + LOG.debug("Drained stream of {} bytes", drained); + } + + if (remaining != 0) { + // fewer bytes than expected came back; not treating as a + // reason to escalate to an abort(). + // just log. + LOG.debug("drained fewer bytes than expected; {} remaining", + remaining); + } + + // now close it. + // if there is still data in the stream, the SDK + // will warn and escalate to an abort itself. + LOG.debug("Closing stream"); + sdkStream.close(); + + cleanupWithLogger(LOG, requestObject); + // this MUST come after the close, so that if the IO operations fail + // and an abort is triggered, the initial attempt's statistics + // aren't collected. + streamStatistics.streamClose(false, drained); + return false; + } catch (Exception e) { + // exception escalates to an abort + LOG.debug("When closing {} stream for {}, will abort the stream", + uri, reason, e); + thrown = e; + } + } + // Abort, rather than just close, the underlying stream. Otherwise, the + // remaining object payload is read from S3 while closing the stream. + LOG.debug("Aborting stream {}", uri); + try { + sdkStream.abort(); + } catch (Exception e) { + LOG.warn("When aborting {} stream after failing to close it for {}", + uri, reason, e); + thrown = e; + } finally { + cleanupWithLogger(LOG, requestObject); + } + + streamStatistics.streamClose(true, remaining); + LOG.debug("Stream {} aborted: {}; remaining={}", + uri, reason, remaining); + return true; + } + + public String getUri() { + return uri; + } + + public Object getRequestObject() { + return requestObject; + } + + public SdkFilterInputStream getSdkStream() { + return sdkStream; + } + + public boolean shouldAbort() { + return shouldAbort; + } + + public int getRemaining() { + return remaining; + } + + public S3AInputStreamStatistics getStreamStatistics() { + return streamStatistics; + } + + public String getReason() { + return reason; + } + + public boolean executed() { + return executed.get(); + } + + public Exception getThrown() { + return thrown; + } + + public int getDrained() { + return drained; + } + + public boolean aborted() { + return aborted; + } + + @Override + public String toString() { + return "SDKStreamDrainer{" + + "uri='" + uri + '\'' + + ", reason='" + reason + '\'' + + ", shouldAbort=" + shouldAbort + + ", remaining=" + remaining + + ", executed=" + executed.get() + + ", aborted=" + aborted + + ", inner=" + sdkStream + + ", thrown=" + thrown + + '}'; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java new file mode 100644 index 00000000000..33a44a9ad78 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import com.amazonaws.internal.SdkFilterInputStream; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for stream draining. + */ +public class TestSDKStreamDrainer extends HadoopTestBase { + + public static final int BYTES = 100; + + /** + * Aborting does as asked. + */ + @Test + public void testDrainerAborted() throws Throwable { + assertAborted(drainer(BYTES, true, stream())); + } + + /** + * Create a stream of the default length. + * @return a stream. + */ + private static FakeSDKInputStream stream() { + return new FakeSDKInputStream(BYTES); + } + + /** + * a normal drain; all bytes are read. No abort. + */ + @Test + public void testDrainerDrained() throws Throwable { + assertBytesReadNotAborted( + drainer(BYTES, false, stream()), + BYTES); + } + + /** + * Empty streams are fine. + */ + @Test + public void testEmptyStream() throws Throwable { + int size = 0; + assertBytesReadNotAborted( + drainer(size, false, new FakeSDKInputStream(size)), + size); + } + + /** + * Single char read; just a safety check on the test stream more than + * the production code. + */ + @Test + public void testSingleChar() throws Throwable { + int size = 1; + assertBytesReadNotAborted( + drainer(size, false, new FakeSDKInputStream(size)), + size); + } + + /** + * a read spanning multiple buffers. + */ + @Test + public void testMultipleBuffers() throws Throwable { + int size = DRAIN_BUFFER_SIZE + 1; + assertBytesReadNotAborted( + drainer(size, false, new FakeSDKInputStream(size)), + size); + } + + /** + * Read of exactly one buffer. + */ + @Test + public void testExactlyOneBuffer() throws Throwable { + int size = DRAIN_BUFFER_SIZE; + assertBytesReadNotAborted( + drainer(size, false, new FakeSDKInputStream(size)), + size); + } + + /** + * Less data than expected came back. not escalated. + */ + @Test + public void testStreamUnderflow() throws Throwable { + int size = 50; + assertBytesReadNotAborted( + drainer(BYTES, false, new FakeSDKInputStream(size)), + size); + } + + /** + * Test a drain where a read triggers an IOE; this must escalate + * to an abort. + */ + @Test + public void testReadFailure() throws Throwable { + int threshold = 50; + SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/", + null, + new FakeSDKInputStream(BYTES, threshold), + false, + BYTES, + EMPTY_INPUT_STREAM_STATISTICS, "test"); + intercept(IOException.class, "", () -> + drainer.applyRaisingException()); + + assertAborted(drainer); + } + + /** + * abort does not read(), so the exception will not surface. + */ + @Test + public void testReadFailureDoesNotSurfaceInAbort() throws Throwable { + int threshold = 50; + SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/", + null, + new FakeSDKInputStream(BYTES, threshold), + true, + BYTES, + EMPTY_INPUT_STREAM_STATISTICS, "test"); + drainer.applyRaisingException(); + + assertAborted(drainer); + } + + /** + * make sure the underlying stream read code works. + */ + @Test + public void testFakeStreamRead() throws Throwable { + FakeSDKInputStream stream = stream(); + int count = 0; + while (stream.read() > 0) { + count++; + } + Assertions.assertThat(count) + .describedAs("bytes read from %s", stream) + .isEqualTo(BYTES); + } + + /** + * Create a drainer and invoke it, rethrowing any exception + * which occurred during the draining. + * @param remaining bytes remaining in the stream + * @param shouldAbort should we abort? + * @param in input stream. + * @return the drainer + * @throws Throwable something went wrong + */ + private SDKStreamDrainer drainer(int remaining, + boolean shouldAbort, + FakeSDKInputStream in) throws Throwable { + SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/", + null, + in, + shouldAbort, + remaining, + EMPTY_INPUT_STREAM_STATISTICS, "test"); + drainer.applyRaisingException(); + return drainer; + } + + + /** + * The draining aborted. + * @param drainer drainer to assert on. + * @return the drainer. + */ + private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) { + Assertions.assertThat(drainer) + .matches(SDKStreamDrainer::aborted, "aborted"); + return drainer; + } + + /** + * The draining was not aborted. + * @param drainer drainer to assert on. + * @return the drainer. + */ + private SDKStreamDrainer assertNotAborted(SDKStreamDrainer drainer) { + Assertions.assertThat(drainer) + .matches(d -> !d.aborted(), "is not aborted"); + return drainer; + } + + /** + * The draining was not aborted and {@code bytes} were read. + * @param drainer drainer to assert on. + * @param bytes expected byte count + * @return the drainer. + */ + private SDKStreamDrainer assertBytesReadNotAborted(SDKStreamDrainer drainer, + int bytes) { + return assertBytesRead(assertNotAborted(drainer), bytes); + } + + /** + * Assert {@code bytes} were read. + * @param drainer drainer to assert on. + * @param bytes expected byte count + * @return the drainer. + */ + private static SDKStreamDrainer assertBytesRead(final SDKStreamDrainer drainer, + final int bytes) { + Assertions.assertThat(drainer) + .describedAs("bytes read by %s", drainer) + .extracting(SDKStreamDrainer::getDrained) + .isEqualTo(bytes); + return drainer; + } + + + /** + * Fake stream; generates data dynamically. + * Only overrides the methods used in stream draining. + */ + private static final class FakeSDKInputStream extends SdkFilterInputStream { + + private final int capacity; + + private final int readToRaiseIOE; + + private int bytesRead; + + private boolean closed; + + private boolean aborted; + + /** + * read up to the capacity; optionally trigger an IOE. + * @param capacity total capacity. + * @param readToRaiseIOE position to raise an IOE, or -1 + */ + private FakeSDKInputStream(final int capacity, final int readToRaiseIOE) { + super(null); + this.capacity = capacity; + this.readToRaiseIOE = readToRaiseIOE; + } + + /** + * read up to the capacity. + * @param capacity total capacity. + */ + private FakeSDKInputStream(final int capacity) { + this(capacity, -1); + } + + @Override + public void abort() { + aborted = true; + } + + @Override + protected boolean isAborted() { + return aborted; + } + + @Override + public int read() throws IOException { + if (bytesRead >= capacity) { + // EOF + return -1; + } + bytesRead++; + if (readToRaiseIOE > 0 && bytesRead >= readToRaiseIOE) { + throw new IOException("IOE triggered on reading byte " + bytesRead); + } + return (int) '0' + (bytesRead % 10); + } + + @Override + public int read(final byte[] bytes, final int off, final int len) + throws IOException { + int count = 0; + + try { + while (count < len) { + int r = read(); + if (r < 0) { + break; + } + bytes[off + count] = (byte) r; + count++; + } + } catch (IOException e) { + if (count == 0) { + // first byte + throw e; + } + // otherwise break loop + } + return count; + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public String toString() { + return "FakeSDKInputStream{" + + "capacity=" + capacity + + ", readToRaiseIOE=" + readToRaiseIOE + + ", bytesRead=" + bytesRead + + ", closed=" + closed + + ", aborted=" + aborted + + "} " + super.toString(); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java new file mode 100644 index 00000000000..0295c07e567 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.performance; + +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3AInputStream; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.io.IOUtils; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; +import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; +import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; +import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ABORTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_POLICY_CHANGED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED; + +/** + * Test stream unbuffer performance/behavior with stream draining + * and aborting. + */ +public class ITestUnbufferDraining extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestUnbufferDraining.class); + + /** + * Readahead range to use, sets drain threshold too. + */ + public static final int READAHEAD = 1000; + + /** + * How big a file to create? + */ + public static final int FILE_SIZE = 50_000; + + /** + * Number of attempts to unbuffer on each stream. + */ + public static final int ATTEMPTS = 10; + + /** + * Test FS with a tiny connection pool and + * no recovery. + */ + private FileSystem brittleFS; + + /** + * Create with markers kept, always. + */ + public ITestUnbufferDraining() { + super(false); + } + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, + ASYNC_DRAIN_THRESHOLD, + ESTABLISH_TIMEOUT, + INPUT_FADVISE, + MAX_ERROR_RETRIES, + MAXIMUM_CONNECTIONS, + READAHEAD_RANGE, + REQUEST_TIMEOUT, + RETRY_LIMIT, + SOCKET_TIMEOUT); + + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + + // now create a new FS with minimal http capacity and recovery + // a separate one is used to avoid test teardown suffering + // from the lack of http connections and short timeouts. + Configuration conf = getConfiguration(); + // kick off async drain for any data + conf.setInt(ASYNC_DRAIN_THRESHOLD, 1); + conf.setInt(MAXIMUM_CONNECTIONS, 2); + conf.setInt(MAX_ERROR_RETRIES, 1); + conf.setInt(ESTABLISH_TIMEOUT, 1000); + conf.setInt(READAHEAD_RANGE, READAHEAD); + conf.setInt(RETRY_LIMIT, 1); + + brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf); + } + + @Override + public void teardown() throws Exception { + super.teardown(); + FileSystem bfs = getBrittleFS(); + FILESYSTEM_IOSTATS.aggregate(retrieveIOStatistics(bfs)); + IOUtils.cleanupWithLogger(LOG, bfs); + } + + public FileSystem getBrittleFS() { + return brittleFS; + } + + /** + * Test stream close performance/behavior with stream draining + * and unbuffer. + */ + @Test + public void testUnbufferDraining() throws Throwable { + + describe("unbuffer draining"); + FileStatus st = createTestFile(); + + IOStatistics brittleStats = retrieveIOStatistics(getBrittleFS()); + long originalUnbuffered = lookupCounter(brittleStats, + STREAM_READ_UNBUFFERED); + + int offset = FILE_SIZE - READAHEAD + 1; + try (FSDataInputStream in = getBrittleFS().openFile(st.getPath()) + .withFileStatus(st) + .must(ASYNC_DRAIN_THRESHOLD, 1) + .build().get()) { + describe("Initiating unbuffer with async drain\n"); + for (int i = 0; i < ATTEMPTS; i++) { + describe("Starting read/unbuffer #%d", i); + in.seek(offset); + in.read(); + in.unbuffer(); + } + // verify the policy switched. + assertReadPolicy(in, S3AInputPolicy.Random); + // assert that the statistics are as expected + IOStatistics stats = in.getIOStatistics(); + verifyStatisticCounterValue(stats, + STREAM_READ_UNBUFFERED, + ATTEMPTS); + verifyStatisticCounterValue(stats, + STREAM_READ_ABORTED, + 0); + // there's always a policy of 1, so + // this value must be 1 + 1 + verifyStatisticCounterValue(stats, + STREAM_READ_SEEK_POLICY_CHANGED, + 2); + } + // filesystem statistic propagation + verifyStatisticCounterValue(brittleStats, + STREAM_READ_UNBUFFERED, + ATTEMPTS + originalUnbuffered); + } + + /** + * Lookup a counter, returning 0 if it is not defined. + * @param statistics stats to probe + * @param key counter key + * @return the value or 0 + */ + private static long lookupCounter( + final IOStatistics statistics, + final String key) { + Long counter = statistics.counters().get(key); + return counter == null ? 0 : counter; + } + + /** + * Assert that the read policy is as expected. + * @param in input stream + * @param policy read policy. + */ + private static void assertReadPolicy(final FSDataInputStream in, + final S3AInputPolicy policy) { + S3AInputStream inner = (S3AInputStream) in.getWrappedStream(); + Assertions.assertThat(inner.getInputPolicy()) + .describedAs("input policy of %s", inner) + .isEqualTo(policy); + } + + /** + * Test stream close performance/behavior with unbuffer + * aborting rather than draining. + */ + @Test + public void testUnbufferAborting() throws Throwable { + + describe("unbuffer aborting"); + FileStatus st = createTestFile(); + IOStatistics brittleStats = retrieveIOStatistics(getBrittleFS()); + long originalUnbuffered = + lookupCounter(brittleStats, STREAM_READ_UNBUFFERED); + long originalAborted = + lookupCounter(brittleStats, STREAM_READ_ABORTED); + + // open the file at the beginning with a whole file read policy, + // so even with s3a switching to random on unbuffer, + // this always does a full GET + try (FSDataInputStream in = getBrittleFS().openFile(st.getPath()) + .withFileStatus(st) + .must(ASYNC_DRAIN_THRESHOLD, 1) + .must(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build().get()) { + assertReadPolicy(in, S3AInputPolicy.Sequential); + + describe("Initiating unbuffer with async drain\n"); + for (int i = 0; i < ATTEMPTS; i++) { + describe("Starting read/unbuffer #%d", i); + in.read(); + in.unbuffer(); + // because the read policy is sequential, it doesn't change + assertReadPolicy(in, S3AInputPolicy.Sequential); + } + + // assert that the statistics are as expected + IOStatistics stats = in.getIOStatistics(); + verifyStatisticCounterValue(stats, + STREAM_READ_UNBUFFERED, + ATTEMPTS); + verifyStatisticCounterValue(stats, + STREAM_READ_ABORTED, + ATTEMPTS); + // there's always a policy of 1. + verifyStatisticCounterValue(stats, + STREAM_READ_SEEK_POLICY_CHANGED, + 1); + } + // look at FS statistics + verifyStatisticCounterValue(brittleStats, + STREAM_READ_UNBUFFERED, + ATTEMPTS + originalUnbuffered); + verifyStatisticCounterValue(brittleStats, + STREAM_READ_ABORTED, + ATTEMPTS + originalAborted); + } + + private FileStatus createTestFile() throws IOException { + byte[] data = dataset(FILE_SIZE, '0', 10); + S3AFileSystem fs = getFileSystem(); + + Path path = methodPath(); + ContractTestUtils.createFile(fs, path, true, data); + return fs.getFileStatus(path); + } + + +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml index 600ea3a178d..15334772296 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml @@ -184,12 +184,16 @@ true + + fs.s3a.connection.request.timeout + 10s + +