HADOOP-18410. S3AInputStream.unbuffer() does not release http connections (#4766)

HADOOP-16202 "Enhance openFile()" added asynchronous draining of the 
remaining bytes of an S3 HTTP input stream for those operations
(unbuffer, seek) where it could avoid blocking the active
thread.

This patch fixes the asynchronous stream draining to work and so
return the stream back to the http pool. Without this, whenever
unbuffer() or seek() was called on a stream and an asynchronous
drain triggered, the connection was not returned; eventually
the pool would be empty and subsequent S3 requests would
fail with the message "Timeout waiting for connection from pool"

The root cause was that even though the fields passed in to drain() were
converted to references through the methods, in the lambda expression
passed in to submit, they were direct references

operation = client.submit(
 () -> drain(uri, streamStatistics,
       false, reason, remaining,
       object, wrappedStream));  /* here */

Those fields were only read during the async execution, at which
point they would have been set to null (or even a subsequent read).

A new SDKStreamDrainer class peforms the draining; this is a Callable
and can be submitted directly to the executor pool.

The class is used in both the classic and prefetching s3a input streams.

Also, calling unbuffer() switches the S3AInputStream from adaptive
to random IO mode; that is, it is considered a cue that future
IO will not be sequential, whole-file reads.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2022-08-31 11:16:52 +01:00 committed by GitHub
parent c334ba89ad
commit c69e16b297
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1023 additions and 221 deletions

View File

@ -1619,8 +1619,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
CompletableFuture<T> result = new CompletableFuture<>();
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, () -> {
LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
try (AuditSpan span = auditSpan.activate()) {
return operation.apply();
} finally {
LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
}
}));
return result;

View File

@ -51,6 +51,8 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
@ -65,7 +67,6 @@ import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
@ -97,10 +98,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";
/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;
/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
@ -242,6 +239,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
streamStatistics.inputPolicySet(inputPolicy.ordinal());
}
/**
* Get the current input policy.
* @return input policy.
*/
@VisibleForTesting
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
/**
* Opens up the stream at specified target position and for given length.
*
@ -604,7 +610,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
try {
stopVectoredIOOperations.set(true);
// close or abort the stream; blocking
awaitFuture(closeStream("close() operation", false, true));
closeStream("close() operation", false, true);
// end the client+audit span.
client.close();
// this is actually a no-op
@ -664,18 +670,25 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture<Boolean> operation;
SDKStreamDrainer drainer = new SDKStreamDrainer(
uri,
object,
wrappedStream,
shouldAbort,
(int) remaining,
streamStatistics,
reason);
if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
// don't bother with async io.
operation = CompletableFuture.completedFuture(
drain(shouldAbort, reason, remaining, object, wrappedStream));
// don't bother with async IO if the caller plans to wait for
// the result, there's an abort (which is fast), or
// there is not much data to read.
operation = CompletableFuture.completedFuture(drainer.apply());
} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort with references to the fields so they
// can be reused
operation = client.submit(
() -> drain(false, reason, remaining, object, wrappedStream));
// schedule an async drain/abort
operation = client.submit(drainer);
}
// either the stream is closed in the blocking call or the async call is
@ -685,117 +698,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
return operation;
}
/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {
try {
return invokeTrackingDuration(
streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(
shouldAbort,
reason,
remaining,
requestObject,
inner));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}
/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* This does not set the {@link #closed} flag.
*
* A reference to the stream is passed in so that the instance
* {@link #wrappedStream} field can be reused as soon as this
* method is submitted;
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {
// force a use of the request object so IDEs don't warn of
// lack of use.
requireNonNull(requestObject);
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object
// explicitly drain the stream
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inner.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
// now close it
inner.close();
// this MUST come after the close, so that if the IO operations fail
// and an abort is triggered, the initial attempt's statistics
// aren't collected.
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream {}", uri);
try {
inner.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
/**
* Forcibly reset the stream, by aborting the connection. The next
* {@code read()} operation will trigger the opening of a new HTTPS
@ -1080,8 +982,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
int drainBytes = 0;
int readCount;
while (drainBytes < drainQuantity) {
if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE];
if (drainBytes + InternalConstants.DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[InternalConstants.DRAIN_BUFFER_SIZE];
readCount = objectContent.read(drainBuffer);
} else {
byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
@ -1345,6 +1247,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
closeStream("unbuffer()", false, false);
} finally {
streamStatistics.unbuffered();
if (inputPolicy.isAdaptive()) {
S3AInputPolicy policy = S3AInputPolicy.Random;
LOG.debug("Switching to seek policy {} after unbuffer() invoked", policy);
setInputPolicy(policy);
}
}
}

View File

@ -379,6 +379,7 @@ public final class S3AUtils {
} else {
String name = innerCause.getClass().getName();
if (name.endsWith(".ConnectTimeoutException")
|| name.endsWith(".ConnectionPoolTimeoutException")
|| name.endsWith("$ConnectTimeoutException")) {
// TCP connection http timeout from the shaded or unshaded filenames
// com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException

View File

@ -378,6 +378,10 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
"Total count of bytes read from an input stream",
TYPE_COUNTER),
STREAM_READ_UNBUFFERED(
StreamStatisticNames.STREAM_READ_UNBUFFERED,
"Total count of input stream unbuffering operations",
TYPE_COUNTER),
STREAM_READ_BLOCKS_IN_FILE_CACHE(
StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
"Gauge of blocks in disk cache",

View File

@ -51,6 +51,11 @@ public final class InternalConstants {
*/
public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true;
/**
* size of a buffer to create when draining the stream.
*/
public static final int DRAIN_BUFFER_SIZE = 16384;
private InternalConstants() {
}
@ -97,6 +102,7 @@ public final class InternalConstants {
static {
Set<String> keys = Stream.of(
Constants.ASYNC_DRAIN_THRESHOLD,
Constants.INPUT_FADVISE,
Constants.READAHEAD_RANGE)
.collect(Collectors.toSet());

View File

@ -0,0 +1,325 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import com.amazonaws.internal.SdkFilterInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Drains/aborts s3 or other AWS SDK streams.
* It is callable so can be passed directly to a submitter
* for async invocation.
* A request object may be passed in; it will be implicitly
* cached until this object is GCd.
* This is because in some versions of the AWS SDK, the S3Object
* has a finalize() method which releases the http connection,
* even when the stream is still open.
* See HADOOP-17338 for details.
*/
public class SDKStreamDrainer implements CallableRaisingIOE<Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(
SDKStreamDrainer.class);
/**
* URI for log messages.
*/
private final String uri;
/**
* Request object; usually S3Object
* Never used, but needed to keep the http connection
* open long enough for draining to take place.
*/
@Nullable
private final Closeable requestObject;
/**
* Stream from the {@link #requestObject} for draining and closing.
*/
private final SdkFilterInputStream sdkStream;
/**
* Should the request be aborted?
*/
private final boolean shouldAbort;
/**
* How many bytes remaining?
* This is decremented as the stream is
* drained;
* If the stream finished before the expected
* remaining value was read, this will show how many
* bytes were still expected.
*/
private int remaining;
/**
* Statistics to update with the duration.
*/
private final S3AInputStreamStatistics streamStatistics;
/**
* Reason? for log messages.
*/
private final String reason;
/**
* Has the operation executed yet?
*/
private final AtomicBoolean executed = new AtomicBoolean(false);
/**
* Any exception caught during execution.
*/
private Exception thrown;
/**
* Was the stream aborted?
*/
private boolean aborted;
/**
* how many bytes were drained?
*/
private int drained = 0;
/**
* Prepare to drain the stream.
* @param uri URI for messages
* @param requestObject http request object; needed to avoid GC issues.
* @param sdkStream stream to close.
* @param shouldAbort force an abort; used if explicitly requested.
* @param streamStatistics stats to update
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
*/
public SDKStreamDrainer(final String uri,
@Nullable final Closeable requestObject,
final SdkFilterInputStream sdkStream,
final boolean shouldAbort,
final int remaining,
final S3AInputStreamStatistics streamStatistics,
final String reason) {
this.uri = uri;
this.requestObject = requestObject;
this.sdkStream = requireNonNull(sdkStream);
this.shouldAbort = shouldAbort;
this.remaining = remaining;
this.streamStatistics = requireNonNull(streamStatistics);
this.reason = reason;
}
/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
* @return was the stream aborted?
*/
@Override
public Boolean apply() {
try {
Boolean outcome = invokeTrackingDuration(
streamStatistics.initiateInnerStreamClose(shouldAbort),
this::drainOrAbortHttpStream);
aborted = outcome;
return outcome;
} catch (Exception e) {
thrown = e;
return aborted;
}
}
/**
* Apply, raising any exception.
* For testing.
* @return the outcome.
* @throws Exception anything raised.
*/
@VisibleForTesting
boolean applyRaisingException() throws Exception {
Boolean outcome = apply();
if (thrown != null) {
throw thrown;
}
return outcome;
}
/**
* Drain or abort the inner stream.
* Exceptions are saved then swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
* @return true if the stream was aborted.
*/
private boolean drainOrAbortHttpStream() {
if (executed.getAndSet(true)) {
throw new IllegalStateException(
"duplicate invocation of drain operation");
}
boolean executeAbort = shouldAbort;
LOG.debug("drain or abort reason {} remaining={} abort={}",
reason, remaining, executeAbort);
if (!executeAbort) {
try {
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object
if (remaining > 0) {
// explicitly drain the stream
LOG.debug("draining {} bytes", remaining);
drained = 0;
int size = Math.min(remaining, DRAIN_BUFFER_SIZE);
byte[] buffer = new byte[size];
// read the data; bail out early if
// the connection breaks.
// this may be a bit overaggressive on buffer underflow.
while (remaining > 0) {
final int count = sdkStream.read(buffer);
LOG.debug("read {} bytes", count);
if (count <= 0) {
// no more data is left
break;
}
drained += count;
remaining -= count;
}
LOG.debug("Drained stream of {} bytes", drained);
}
if (remaining != 0) {
// fewer bytes than expected came back; not treating as a
// reason to escalate to an abort().
// just log.
LOG.debug("drained fewer bytes than expected; {} remaining",
remaining);
}
// now close it.
// if there is still data in the stream, the SDK
// will warn and escalate to an abort itself.
LOG.debug("Closing stream");
sdkStream.close();
cleanupWithLogger(LOG, requestObject);
// this MUST come after the close, so that if the IO operations fail
// and an abort is triggered, the initial attempt's statistics
// aren't collected.
streamStatistics.streamClose(false, drained);
return false;
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
thrown = e;
}
}
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream {}", uri);
try {
sdkStream.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
thrown = e;
} finally {
cleanupWithLogger(LOG, requestObject);
}
streamStatistics.streamClose(true, remaining);
LOG.debug("Stream {} aborted: {}; remaining={}",
uri, reason, remaining);
return true;
}
public String getUri() {
return uri;
}
public Object getRequestObject() {
return requestObject;
}
public SdkFilterInputStream getSdkStream() {
return sdkStream;
}
public boolean shouldAbort() {
return shouldAbort;
}
public int getRemaining() {
return remaining;
}
public S3AInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
public String getReason() {
return reason;
}
public boolean executed() {
return executed.get();
}
public Exception getThrown() {
return thrown;
}
public int getDrained() {
return drained;
}
public boolean aborted() {
return aborted;
}
@Override
public String toString() {
return "SDKStreamDrainer{" +
"uri='" + uri + '\'' +
", reason='" + reason + '\'' +
", shouldAbort=" + shouldAbort +
", remaining=" + remaining +
", executed=" + executed.get() +
", aborted=" + aborted +
", inner=" + sdkStream +
", thrown=" + thrown +
'}';
}
}

View File

@ -27,6 +27,7 @@ import java.util.Map;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,12 +37,10 @@ import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Encapsulates low level interactions with S3 object on AWS.
*/
@ -79,7 +78,7 @@ public class S3ARemoteObject {
* Maps a stream returned by openForRead() to the associated S3 object.
* That allows us to close the object when closing the stream.
*/
private Map<InputStream, S3Object> s3Objects;
private final Map<InputStream, S3Object> s3Objects;
/**
* uri of the object being read.
@ -225,104 +224,27 @@ public class S3ARemoteObject {
void close(InputStream inputStream, int numRemainingBytes) {
S3Object obj;
synchronized (s3Objects) {
obj = s3Objects.get(inputStream);
obj = s3Objects.remove(inputStream);
if (obj == null) {
throw new IllegalArgumentException("inputStream not found");
}
s3Objects.remove(inputStream);
}
SDKStreamDrainer drainer = new SDKStreamDrainer(
uri,
obj,
(S3ObjectInputStream)inputStream,
false,
numRemainingBytes,
streamStatistics,
"close() operation");
if (numRemainingBytes <= context.getAsyncDrainThreshold()) {
// don't bother with async io.
drain(false, "close() operation", numRemainingBytes, obj, inputStream);
drainer.apply();
} else {
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
// schedule an async drain/abort with references to the fields so they
// can be reused
client.submit(
() -> drain(false, "close() operation", numRemainingBytes, obj,
inputStream));
// schedule an async drain/abort
client.submit(drainer);
}
}
/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object;
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {
try {
return invokeTrackingDuration(
streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
requestObject, inputStream));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}
/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {
if (!shouldAbort && remaining > 0) {
try {
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inputStream.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream", uri,
reason, e);
shouldAbort = true;
}
}
cleanupWithLogger(LOG, inputStream);
cleanupWithLogger(LOG, requestObject);
streamStatistics.streamClose(shouldAbort, remaining);
LOG.debug("Stream {} {}: {}; remaining={}", uri,
(shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
}

View File

@ -0,0 +1,343 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import com.amazonaws.internal.SdkFilterInputStream;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Unit tests for stream draining.
*/
public class TestSDKStreamDrainer extends HadoopTestBase {
public static final int BYTES = 100;
/**
* Aborting does as asked.
*/
@Test
public void testDrainerAborted() throws Throwable {
assertAborted(drainer(BYTES, true, stream()));
}
/**
* Create a stream of the default length.
* @return a stream.
*/
private static FakeSDKInputStream stream() {
return new FakeSDKInputStream(BYTES);
}
/**
* a normal drain; all bytes are read. No abort.
*/
@Test
public void testDrainerDrained() throws Throwable {
assertBytesReadNotAborted(
drainer(BYTES, false, stream()),
BYTES);
}
/**
* Empty streams are fine.
*/
@Test
public void testEmptyStream() throws Throwable {
int size = 0;
assertBytesReadNotAborted(
drainer(size, false, new FakeSDKInputStream(size)),
size);
}
/**
* Single char read; just a safety check on the test stream more than
* the production code.
*/
@Test
public void testSingleChar() throws Throwable {
int size = 1;
assertBytesReadNotAborted(
drainer(size, false, new FakeSDKInputStream(size)),
size);
}
/**
* a read spanning multiple buffers.
*/
@Test
public void testMultipleBuffers() throws Throwable {
int size = DRAIN_BUFFER_SIZE + 1;
assertBytesReadNotAborted(
drainer(size, false, new FakeSDKInputStream(size)),
size);
}
/**
* Read of exactly one buffer.
*/
@Test
public void testExactlyOneBuffer() throws Throwable {
int size = DRAIN_BUFFER_SIZE;
assertBytesReadNotAborted(
drainer(size, false, new FakeSDKInputStream(size)),
size);
}
/**
* Less data than expected came back. not escalated.
*/
@Test
public void testStreamUnderflow() throws Throwable {
int size = 50;
assertBytesReadNotAborted(
drainer(BYTES, false, new FakeSDKInputStream(size)),
size);
}
/**
* Test a drain where a read triggers an IOE; this must escalate
* to an abort.
*/
@Test
public void testReadFailure() throws Throwable {
int threshold = 50;
SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
null,
new FakeSDKInputStream(BYTES, threshold),
false,
BYTES,
EMPTY_INPUT_STREAM_STATISTICS, "test");
intercept(IOException.class, "", () ->
drainer.applyRaisingException());
assertAborted(drainer);
}
/**
* abort does not read(), so the exception will not surface.
*/
@Test
public void testReadFailureDoesNotSurfaceInAbort() throws Throwable {
int threshold = 50;
SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
null,
new FakeSDKInputStream(BYTES, threshold),
true,
BYTES,
EMPTY_INPUT_STREAM_STATISTICS, "test");
drainer.applyRaisingException();
assertAborted(drainer);
}
/**
* make sure the underlying stream read code works.
*/
@Test
public void testFakeStreamRead() throws Throwable {
FakeSDKInputStream stream = stream();
int count = 0;
while (stream.read() > 0) {
count++;
}
Assertions.assertThat(count)
.describedAs("bytes read from %s", stream)
.isEqualTo(BYTES);
}
/**
* Create a drainer and invoke it, rethrowing any exception
* which occurred during the draining.
* @param remaining bytes remaining in the stream
* @param shouldAbort should we abort?
* @param in input stream.
* @return the drainer
* @throws Throwable something went wrong
*/
private SDKStreamDrainer drainer(int remaining,
boolean shouldAbort,
FakeSDKInputStream in) throws Throwable {
SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
null,
in,
shouldAbort,
remaining,
EMPTY_INPUT_STREAM_STATISTICS, "test");
drainer.applyRaisingException();
return drainer;
}
/**
* The draining aborted.
* @param drainer drainer to assert on.
* @return the drainer.
*/
private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) {
Assertions.assertThat(drainer)
.matches(SDKStreamDrainer::aborted, "aborted");
return drainer;
}
/**
* The draining was not aborted.
* @param drainer drainer to assert on.
* @return the drainer.
*/
private SDKStreamDrainer assertNotAborted(SDKStreamDrainer drainer) {
Assertions.assertThat(drainer)
.matches(d -> !d.aborted(), "is not aborted");
return drainer;
}
/**
* The draining was not aborted and {@code bytes} were read.
* @param drainer drainer to assert on.
* @param bytes expected byte count
* @return the drainer.
*/
private SDKStreamDrainer assertBytesReadNotAborted(SDKStreamDrainer drainer,
int bytes) {
return assertBytesRead(assertNotAborted(drainer), bytes);
}
/**
* Assert {@code bytes} were read.
* @param drainer drainer to assert on.
* @param bytes expected byte count
* @return the drainer.
*/
private static SDKStreamDrainer assertBytesRead(final SDKStreamDrainer drainer,
final int bytes) {
Assertions.assertThat(drainer)
.describedAs("bytes read by %s", drainer)
.extracting(SDKStreamDrainer::getDrained)
.isEqualTo(bytes);
return drainer;
}
/**
* Fake stream; generates data dynamically.
* Only overrides the methods used in stream draining.
*/
private static final class FakeSDKInputStream extends SdkFilterInputStream {
private final int capacity;
private final int readToRaiseIOE;
private int bytesRead;
private boolean closed;
private boolean aborted;
/**
* read up to the capacity; optionally trigger an IOE.
* @param capacity total capacity.
* @param readToRaiseIOE position to raise an IOE, or -1
*/
private FakeSDKInputStream(final int capacity, final int readToRaiseIOE) {
super(null);
this.capacity = capacity;
this.readToRaiseIOE = readToRaiseIOE;
}
/**
* read up to the capacity.
* @param capacity total capacity.
*/
private FakeSDKInputStream(final int capacity) {
this(capacity, -1);
}
@Override
public void abort() {
aborted = true;
}
@Override
protected boolean isAborted() {
return aborted;
}
@Override
public int read() throws IOException {
if (bytesRead >= capacity) {
// EOF
return -1;
}
bytesRead++;
if (readToRaiseIOE > 0 && bytesRead >= readToRaiseIOE) {
throw new IOException("IOE triggered on reading byte " + bytesRead);
}
return (int) '0' + (bytesRead % 10);
}
@Override
public int read(final byte[] bytes, final int off, final int len)
throws IOException {
int count = 0;
try {
while (count < len) {
int r = read();
if (r < 0) {
break;
}
bytes[off + count] = (byte) r;
count++;
}
} catch (IOException e) {
if (count == 0) {
// first byte
throw e;
}
// otherwise break loop
}
return count;
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public String toString() {
return "FakeSDKInputStream{" +
"capacity=" + capacity +
", readToRaiseIOE=" + readToRaiseIOE +
", bytesRead=" + bytesRead +
", closed=" + closed +
", aborted=" + aborted +
"} " + super.toString();
}
}
}

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ABORTED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_POLICY_CHANGED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
/**
* Test stream unbuffer performance/behavior with stream draining
* and aborting.
*/
public class ITestUnbufferDraining extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestUnbufferDraining.class);
/**
* Readahead range to use, sets drain threshold too.
*/
public static final int READAHEAD = 1000;
/**
* How big a file to create?
*/
public static final int FILE_SIZE = 50_000;
/**
* Number of attempts to unbuffer on each stream.
*/
public static final int ATTEMPTS = 10;
/**
* Test FS with a tiny connection pool and
* no recovery.
*/
private FileSystem brittleFS;
/**
* Create with markers kept, always.
*/
public ITestUnbufferDraining() {
super(false);
}
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
ASYNC_DRAIN_THRESHOLD,
ESTABLISH_TIMEOUT,
INPUT_FADVISE,
MAX_ERROR_RETRIES,
MAXIMUM_CONNECTIONS,
PREFETCH_ENABLED_KEY,
READAHEAD_RANGE,
REQUEST_TIMEOUT,
RETRY_LIMIT,
SOCKET_TIMEOUT);
return conf;
}
@Override
public void setup() throws Exception {
super.setup();
// now create a new FS with minimal http capacity and recovery
// a separate one is used to avoid test teardown suffering
// from the lack of http connections and short timeouts.
Configuration conf = getConfiguration();
// kick off async drain for any data
conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
conf.setInt(MAXIMUM_CONNECTIONS, 2);
conf.setInt(MAX_ERROR_RETRIES, 1);
conf.setInt(ESTABLISH_TIMEOUT, 1000);
conf.setInt(READAHEAD_RANGE, READAHEAD);
conf.setInt(RETRY_LIMIT, 1);
brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
}
@Override
public void teardown() throws Exception {
super.teardown();
FileSystem bfs = getBrittleFS();
FILESYSTEM_IOSTATS.aggregate(retrieveIOStatistics(bfs));
IOUtils.cleanupWithLogger(LOG, bfs);
}
public FileSystem getBrittleFS() {
return brittleFS;
}
/**
* Test stream close performance/behavior with stream draining
* and unbuffer.
*/
@Test
public void testUnbufferDraining() throws Throwable {
describe("unbuffer draining");
FileStatus st = createTestFile();
IOStatistics brittleStats = retrieveIOStatistics(getBrittleFS());
long originalUnbuffered = lookupCounter(brittleStats,
STREAM_READ_UNBUFFERED);
int offset = FILE_SIZE - READAHEAD + 1;
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
.withFileStatus(st)
.must(ASYNC_DRAIN_THRESHOLD, 1)
.build().get()) {
describe("Initiating unbuffer with async drain\n");
for (int i = 0; i < ATTEMPTS; i++) {
describe("Starting read/unbuffer #%d", i);
in.seek(offset);
in.read();
in.unbuffer();
}
// verify the policy switched.
assertReadPolicy(in, S3AInputPolicy.Random);
// assert that the statistics are as expected
IOStatistics stats = in.getIOStatistics();
verifyStatisticCounterValue(stats,
STREAM_READ_UNBUFFERED,
ATTEMPTS);
verifyStatisticCounterValue(stats,
STREAM_READ_ABORTED,
0);
// there's always a policy of 1, so
// this value must be 1 + 1
verifyStatisticCounterValue(stats,
STREAM_READ_SEEK_POLICY_CHANGED,
2);
}
// filesystem statistic propagation
verifyStatisticCounterValue(brittleStats,
STREAM_READ_UNBUFFERED,
ATTEMPTS + originalUnbuffered);
}
/**
* Lookup a counter, returning 0 if it is not defined.
* @param statistics stats to probe
* @param key counter key
* @return the value or 0
*/
private static long lookupCounter(
final IOStatistics statistics,
final String key) {
Long counter = statistics.counters().get(key);
return counter == null ? 0 : counter;
}
/**
* Assert that the read policy is as expected.
* @param in input stream
* @param policy read policy.
*/
private static void assertReadPolicy(final FSDataInputStream in,
final S3AInputPolicy policy) {
S3AInputStream inner = (S3AInputStream) in.getWrappedStream();
Assertions.assertThat(inner.getInputPolicy())
.describedAs("input policy of %s", inner)
.isEqualTo(policy);
}
/**
* Test stream close performance/behavior with unbuffer
* aborting rather than draining.
*/
@Test
public void testUnbufferAborting() throws Throwable {
describe("unbuffer aborting");
FileStatus st = createTestFile();
IOStatistics brittleStats = retrieveIOStatistics(getBrittleFS());
long originalUnbuffered =
lookupCounter(brittleStats, STREAM_READ_UNBUFFERED);
long originalAborted =
lookupCounter(brittleStats, STREAM_READ_ABORTED);
// open the file at the beginning with a whole file read policy,
// so even with s3a switching to random on unbuffer,
// this always does a full GET
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
.withFileStatus(st)
.must(ASYNC_DRAIN_THRESHOLD, 1)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build().get()) {
assertReadPolicy(in, S3AInputPolicy.Sequential);
describe("Initiating unbuffer with async drain\n");
for (int i = 0; i < ATTEMPTS; i++) {
describe("Starting read/unbuffer #%d", i);
in.read();
in.unbuffer();
// because the read policy is sequential, it doesn't change
assertReadPolicy(in, S3AInputPolicy.Sequential);
}
// assert that the statistics are as expected
IOStatistics stats = in.getIOStatistics();
verifyStatisticCounterValue(stats,
STREAM_READ_UNBUFFERED,
ATTEMPTS);
verifyStatisticCounterValue(stats,
STREAM_READ_ABORTED,
ATTEMPTS);
// there's always a policy of 1.
verifyStatisticCounterValue(stats,
STREAM_READ_SEEK_POLICY_CHANGED,
1);
}
// look at FS statistics
verifyStatisticCounterValue(brittleStats,
STREAM_READ_UNBUFFERED,
ATTEMPTS + originalUnbuffered);
verifyStatisticCounterValue(brittleStats,
STREAM_READ_ABORTED,
ATTEMPTS + originalAborted);
}
private FileStatus createTestFile() throws IOException {
byte[] data = dataset(FILE_SIZE, '0', 10);
S3AFileSystem fs = getFileSystem();
Path path = methodPath();
ContractTestUtils.createFile(fs, path, true, data);
return fs.getFileStatus(path);
}
}

View File

@ -184,12 +184,16 @@
<value>true</value>
</property>
<property>
<name>fs.s3a.connection.request.timeout</name>
<value>10s</value>
</property>
<!--
To run these tests.
# Create a file auth-keys.xml - DO NOT ADD TO REVISION CONTROL
# add the property test.fs.s3n.name to point to an S3 filesystem URL
# Add the credentials for the service you are testing against
# see testing.md for details on what to set.
-->
<include xmlns="http://www.w3.org/2001/XInclude" href="auth-keys.xml">
<fallback/>