From 515cba7d2ee1ca6e6ff763e91e3261fe4c1282a9 Mon Sep 17 00:00:00 2001 From: ahmarsuhail Date: Fri, 15 Jul 2022 13:51:19 +0100 Subject: [PATCH] HADOOP-18231. S3A prefetching: fix failing tests & drain stream async. (#4386) * adds in new test for prefetching input stream * creates streamStats before opening stream * updates numBlocks calculation method * fixes ITestS3AOpenCost.testOpenFileLongerLength * drains stream async * fixes failing unit test Contributed by Ahmar Suhail --- .../hadoop/fs/common/CachingBlockManager.java | 4 +- .../java/org/apache/hadoop/fs/common/Io.java | 45 ----- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 3 +- .../fs/s3a/read/S3CachingInputStream.java | 7 +- .../org/apache/hadoop/fs/s3a/read/S3File.java | 154 +++++++++++++--- .../fs/s3a/read/S3InMemoryInputStream.java | 7 +- .../hadoop/fs/s3a/read/S3InputStream.java | 18 +- .../fs/s3a/read/S3PrefetchingInputStream.java | 9 +- .../apache/hadoop/fs/s3a/read/S3Reader.java | 11 +- .../apache/hadoop/fs/common/TestIoClass.java | 62 ------- .../fs/s3a/ITestS3PrefetchingInputStream.java | 169 ++++++++++++++++++ .../org/apache/hadoop/fs/s3a/read/Fakes.java | 23 ++- .../apache/hadoop/fs/s3a/read/MockS3File.java | 2 +- .../hadoop/fs/s3a/read/TestS3InputStream.java | 24 +-- .../scale/ITestS3AInputStreamPerformance.java | 9 + 15 files changed, 373 insertions(+), 174 deletions(-) delete mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 44c2df29138..1bb439a9997 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -31,6 +31,8 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + /** * Provides read access to the underlying file one block at a time. * Improve read performance by prefetching and locall caching blocks. @@ -204,7 +206,7 @@ public abstract class CachingBlockManager extends BlockManager { // Cancel any prefetches in progress. this.cancelPrefetches(); - Io.closeIgnoringIoException(this.cache); + cleanupWithLogger(LOG, this.cache); this.ops.end(op); LOG.info(this.ops.getSummary(false)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java deleted file mode 100644 index fe11cdccf87..00000000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.fs.common; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Provides misc functionality related to IO. - */ -public final class Io { - private Io() {} - - /** - * Closes the given resource and ignores any IOException if thrown. - * - * @param resource the resource to close. - */ - public static void closeIgnoringIoException(Closeable resource) { - try { - if (resource != null) { - resource.close(); - } - } catch (IOException e) { - // Ignored on purpose as there is not much we can do here. - } - } -} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 0e74e2e3afd..b984d547c2a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1524,7 +1524,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, new S3PrefetchingInputStream( readContext.build(), createObjectAttributes(path, fileStatus), - createInputStreamCallbacks(auditSpan))); + createInputStreamCallbacks(auditSpan), + inputStreamStats)); } else { return new FSDataInputStream( new S3AInputStream( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java index a1a9a22448a..b6c6bf39988 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; /** * Provides an {@code InputStream} that allows reading from an S3 file. @@ -53,6 +54,7 @@ public class S3CachingInputStream extends S3InputStream { * @param context read-specific operation context. * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. + * @param streamStatistics statistics for this stream. * * @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if s3Attributes is null. @@ -61,8 +63,9 @@ public class S3CachingInputStream extends S3InputStream { public S3CachingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client) { - super(context, s3Attributes, client); + S3AInputStream.InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics) { + super(context, s3Attributes, client, streamStatistics); this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); int bufferPoolSize = this.numBlocksToPrefetch + 1; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java index 88854b87c81..ac22976e5bc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java @@ -19,18 +19,17 @@ package org.apache.hadoop.fs.s3a.read; -import java.io.Closeable; + import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.IdentityHashMap; -import java.util.List; import java.util.Map; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.common.Io; import org.apache.hadoop.fs.common.Validate; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.S3AInputStream; @@ -40,30 +39,56 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + /** * Encapsulates low level interactions with S3 object on AWS. */ -public class S3File implements Closeable { +public class S3File { + private static final Logger LOG = LoggerFactory.getLogger(S3File.class); - // Read-specific operation context. + /** + * Read-specific operation context. + */ private final S3AReadOpContext context; - // S3 object attributes. + /** + * S3 object attributes. + */ private final S3ObjectAttributes s3Attributes; - // Callbacks used for interacting with the underlying S3 client. + /** + * Callbacks used for interacting with the underlying S3 client. + */ private final S3AInputStream.InputStreamCallbacks client; - // Used for reporting input stream access statistics. + /** + * Used for reporting input stream access statistics. + */ private final S3AInputStreamStatistics streamStatistics; - // Enforces change tracking related policies. + /** + * Enforces change tracking related policies. + */ private final ChangeTracker changeTracker; - // Maps a stream returned by openForRead() to the associated S3 object. - // That allows us to close the object when closing the stream. + /** + * Maps a stream returned by openForRead() to the associated S3 object. + * That allows us to close the object when closing the stream. + */ private Map s3Objects; + /** + * uri of the object being read. + */ + private final String uri; + + /** + * size of a buffer to create when draining the stream. + */ + private static final int DRAIN_BUFFER_SIZE = 16384; + /** * Initializes a new instance of the {@code S3File} class. * @@ -97,7 +122,8 @@ public class S3File implements Closeable { this.client = client; this.streamStatistics = streamStatistics; this.changeTracker = changeTracker; - this.s3Objects = new IdentityHashMap(); + this.s3Objects = new IdentityHashMap<>(); + this.uri = this.getPath(); } /** @@ -169,7 +195,6 @@ public class S3File implements Closeable { .withRange(offset, offset + size - 1); this.changeTracker.maybeApplyConstraint(request); - String uri = this.getPath(); String operation = String.format( "%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset); DurationTracker tracker = streamStatistics.initiateGetRequest(); @@ -193,18 +218,7 @@ public class S3File implements Closeable { return stream; } - /** - * Closes this stream and releases all acquired resources. - */ - @Override - public synchronized void close() { - List streams = new ArrayList(this.s3Objects.keySet()); - for (InputStream stream : streams) { - this.close(stream); - } - } - - void close(InputStream inputStream) { + void close(InputStream inputStream, int numRemainingBytes) { S3Object obj; synchronized (this.s3Objects) { obj = this.s3Objects.get(inputStream); @@ -214,7 +228,91 @@ public class S3File implements Closeable { this.s3Objects.remove(inputStream); } - Io.closeIgnoringIoException(inputStream); - Io.closeIgnoringIoException(obj); + if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) { + // don't bother with async io. + drain(false, "close() operation", numRemainingBytes, obj, inputStream); + } else { + LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes); + // schedule an async drain/abort with references to the fields so they + // can be reused + client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream)); + } + } + + /** + * drain the stream. This method is intended to be + * used directly or asynchronously, and measures the + * duration of the operation in the stream statistics. + * + * @param shouldAbort force an abort; used if explicitly requested. + * @param reason reason for stream being closed; used in messages + * @param remaining remaining bytes + * @param requestObject http request object; + * @param inputStream stream to close. + * @return was the stream aborted? + */ + private boolean drain( + final boolean shouldAbort, + final String reason, + final long remaining, + final S3Object requestObject, + final InputStream inputStream) { + + try { + return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort), + () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream)); + } catch (IOException e) { + // this is only here because invokeTrackingDuration() has it in its + // signature + return shouldAbort; + } + } + + /** + * Drain or abort the inner stream. + * Exceptions are swallowed. + * If a close() is attempted and fails, the operation escalates to + * an abort. + * + * @param shouldAbort force an abort; used if explicitly requested. + * @param reason reason for stream being closed; used in messages + * @param remaining remaining bytes + * @param requestObject http request object + * @param inputStream stream to close. + * @return was the stream aborted? + */ + private boolean drainOrAbortHttpStream( + boolean shouldAbort, + final String reason, + final long remaining, + final S3Object requestObject, + final InputStream inputStream) { + + if (!shouldAbort && remaining > 0) { + try { + long drained = 0; + byte[] buffer = new byte[DRAIN_BUFFER_SIZE]; + while (true) { + final int count = inputStream.read(buffer); + if (count < 0) { + // no more data is left + break; + } + drained += count; + } + LOG.debug("Drained stream of {} bytes", drained); + } catch (Exception e) { + // exception escalates to an abort + LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e); + shouldAbort = true; + } + } + cleanupWithLogger(LOG, inputStream); + cleanupWithLogger(LOG, requestObject); + streamStatistics.streamClose(shouldAbort, remaining); + + LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason, + remaining); + return shouldAbort; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java index 2be2eaa98f7..c97cf38669b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.common.BufferData; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; /** * Provides an {@code InputStream} that allows reading from an S3 file. @@ -48,6 +49,7 @@ public class S3InMemoryInputStream extends S3InputStream { * @param context read-specific operation context. * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. + * @param streamStatistics statistics for this stream. * * @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if s3Attributes is null. @@ -56,8 +58,9 @@ public class S3InMemoryInputStream extends S3InputStream { public S3InMemoryInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client) { - super(context, s3Attributes, client); + S3AInputStream.InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics) { + super(context, s3Attributes, client, streamStatistics); int fileSize = (int) s3Attributes.getLen(); this.buffer = ByteBuffer.allocate(fileSize); LOG.debug("Created in-memory input stream for {} (size = {})", this.getName(), fileSize); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java index 00d5fbc367d..bbc9008c73a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java @@ -44,6 +44,8 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import static java.util.Objects.requireNonNull; + /** * Provides an {@link InputStream} that allows reading from an S3 file. */ @@ -96,6 +98,7 @@ public abstract class S3InputStream * @param context read-specific operation context. * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. + * @param streamStatistics statistics for this stream. * * @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if s3Attributes is null. @@ -104,16 +107,13 @@ public abstract class S3InputStream public S3InputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client) { + S3AInputStream.InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics) { - Validate.checkNotNull(context, "context"); - Validate.checkNotNull(s3Attributes, "s3Attributes"); - Validate.checkNotNull(client, "client"); - - this.context = context; - this.s3Attributes = s3Attributes; - this.client = client; - this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics(); + this.context = requireNonNull(context); + this.s3Attributes = requireNonNull(s3Attributes); + this.client = requireNonNull(client); + this.streamStatistics = requireNonNull(streamStatistics); this.ioStatistics = streamStatistics.getIOStatistics(); this.name = S3File.getPath(s3Attributes); this.changeTracker = new ChangeTracker( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java index 0f5834da4cc..66dd7c2a375 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java @@ -58,6 +58,7 @@ public class S3PrefetchingInputStream * @param context read-specific operation context. * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. + * @param streamStatistics statistics for this stream. * * @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if s3Attributes is null. @@ -66,7 +67,8 @@ public class S3PrefetchingInputStream public S3PrefetchingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client) { + S3AInputStream.InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics) { Validate.checkNotNull(context, "context"); Validate.checkNotNull(s3Attributes, "s3Attributes"); @@ -74,12 +76,13 @@ public class S3PrefetchingInputStream Validate.checkNotNullAndNotEmpty(s3Attributes.getKey(), "s3Attributes.getKey()"); Validate.checkNotNegative(s3Attributes.getLen(), "s3Attributes.getLen()"); Validate.checkNotNull(client, "client"); + Validate.checkNotNull(streamStatistics, "streamStatistics"); long fileSize = s3Attributes.getLen(); if (fileSize <= context.getPrefetchBlockSize()) { - this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client); + this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client, streamStatistics); } else { - this.inputStream = new S3CachingInputStream(context, s3Attributes, client); + this.inputStream = new S3CachingInputStream(context, s3Attributes, client, streamStatistics); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java index d9ed7810da6..89e3618be53 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java @@ -98,7 +98,7 @@ public class S3Reader implements Closeable { this.s3File.getStatistics().readOperationStarted(offset, size); Invoker invoker = this.s3File.getReadInvoker(); - invoker.retry( + int invokerResponse = invoker.retry( "read", this.s3File.getPath(), true, () -> { try { @@ -119,7 +119,12 @@ public class S3Reader implements Closeable { int numBytesRead = buffer.position(); buffer.limit(numBytesRead); this.s3File.getStatistics().readOperationCompleted(size, numBytesRead); - return numBytesRead; + + if (invokerResponse < 0) { + return invokerResponse; + } else { + return numBytesRead; + } } private void readOneBlock(ByteBuffer buffer, long offset, int size) throws IOException { @@ -153,7 +158,7 @@ public class S3Reader implements Closeable { } while (!this.closed && (numRemainingBytes > 0)); } finally { - s3File.close(inputStream); + s3File.close(inputStream, numRemainingBytes); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java deleted file mode 100644 index d1238fc007c..00000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.fs.common; - -import java.io.Closeable; -import java.io.IOException; - -import org.junit.Test; - -import org.apache.hadoop.test.AbstractHadoopTestBase; - -import static org.junit.Assert.assertTrue; - -public class TestIoClass extends AbstractHadoopTestBase { - - private static class StubResource implements Closeable { - private boolean isOpen = true; - - @Override - public void close() throws IOException { - this.isOpen = false; - throw new IOException("foo"); - } - - public boolean isOpen() { - return this.isOpen; - } - } - - @Test - public void verifyCloseIgnoringIoException() throws Exception { - ExceptionAsserts.assertThrows( - IOException.class, - "foo", - () -> { - (new StubResource()).close(); - }); - - // Should not throw. - StubResource resource = new StubResource(); - assertTrue(resource.isOpen()); - Io.closeIgnoringIoException(resource); - assertTrue(!resource.isOpen()); - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java new file mode 100644 index 00000000000..9b831cb3b84 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.net.URI; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; + +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Test the prefetching input stream, validates that the underlying S3CachingInputStream and + * S3InMemoryInputStream are working as expected. + */ +public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest { + + public ITestS3PrefetchingInputStream() { + super(true); + } + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3PrefetchingInputStream.class); + + private static final int S_1K = 1024; + private static final int S_1M = S_1K * S_1K; + // Path for file which should have length > block size so S3CachingInputStream is used + private Path largeFile; + private FileSystem largeFileFS; + private int numBlocks; + private int blockSize; + private long largeFileSize; + // Size should be < block size so S3InMemoryInputStream is used + private static final int SMALL_FILE_SIZE = S_1K * 16; + + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + conf.setBoolean(PREFETCH_ENABLED_KEY, true); + return conf; + } + + @Override + public void teardown() throws Exception { + super.teardown(); + cleanupWithLogger(LOG, largeFileFS); + largeFileFS = null; + } + + private void openFS() throws Exception { + Configuration conf = getConfiguration(); + + largeFile = new Path(DEFAULT_CSVTEST_FILE); + blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); + largeFileFS = new S3AFileSystem(); + largeFileFS.initialize(new URI(DEFAULT_CSVTEST_FILE), getConfiguration()); + FileStatus fileStatus = largeFileFS.getFileStatus(largeFile); + largeFileSize = fileStatus.getLen(); + numBlocks = calculateNumBlocks(largeFileSize, blockSize); + } + + private static int calculateNumBlocks(long largeFileSize, int blockSize) { + if (largeFileSize == 0) { + return 0; + } else { + return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize > 0 ? 1 : 0); + } + } + + @Test + public void testReadLargeFileFully() throws Throwable { + describe("read a large file fully, uses S3CachingInputStream"); + openFS(); + + try (FSDataInputStream in = largeFileFS.open(largeFile)) { + IOStatistics ioStats = in.getIOStatistics(); + + byte[] buffer = new byte[S_1M * 10]; + long bytesRead = 0; + + while (bytesRead < largeFileSize) { + in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead)); + bytesRead += buffer.length; + } + + verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks); + } + } + + @Test + public void testRandomReadLargeFile() throws Throwable { + describe("random read on a large file, uses S3CachingInputStream"); + openFS(); + + try (FSDataInputStream in = largeFileFS.open(largeFile)) { + IOStatistics ioStats = in.getIOStatistics(); + + byte[] buffer = new byte[blockSize]; + + // Don't read the block completely so it gets cached on seek + in.read(buffer, 0, blockSize - S_1K * 10); + in.seek(blockSize + S_1K * 10); + // Backwards seek, will use cached block + in.seek(S_1K * 5); + in.read(); + + verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2); + } + } + + @Test + public void testRandomReadSmallFile() throws Throwable { + describe("random read on a small file, uses S3InMemoryInputStream"); + + byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26); + Path smallFile = path("randomReadSmallFile"); + ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true); + + try (FSDataInputStream in = getFileSystem().open(smallFile)) { + IOStatistics ioStats = in.getIOStatistics(); + + byte[] buffer = new byte[SMALL_FILE_SIZE]; + + in.read(buffer, 0, S_1K * 4); + in.seek(S_1K * 12); + in.read(buffer, 0, S_1K * 4); + + verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java index f394119df80..7e91b6830d5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; @@ -132,7 +133,11 @@ public final class Fakes { fileStatus, futurePool, prefetchBlockSize, - prefetchBlockCount); + prefetchBlockCount) + .withChangeDetectionPolicy( + ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None, + ChangeDetectionPolicy.Source.ETag, false)) + .withInputPolicy(S3AInputPolicy.Normal); } public static URI createUri(String bucket, String key) { @@ -217,11 +222,13 @@ public final class Fakes { prefetchBlockCount); S3AInputStream.InputStreamCallbacks callbacks = createInputStreamCallbacks(bucket, key); + S3AInputStreamStatistics stats = + s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(); if (clazz == TestS3InMemoryInputStream.class) { - return new TestS3InMemoryInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks); + return new TestS3InMemoryInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks, stats); } else if (clazz == TestS3CachingInputStream.class) { - return new TestS3CachingInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks); + return new TestS3CachingInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks, stats); } throw new RuntimeException("Unsupported class: " + clazz); @@ -259,8 +266,9 @@ public final class Fakes { public TestS3InMemoryInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client) { - super(context, s3Attributes, client); + S3AInputStream.InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics) { + super(context, s3Attributes, client, streamStatistics); } @Override @@ -350,8 +358,9 @@ public final class Fakes { public TestS3CachingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client) { - super(context, s3Attributes, client); + S3AInputStream.InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics) { + super(context, s3Attributes, client, streamStatistics); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java index b265eea931d..82b7a10d40c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java @@ -79,7 +79,7 @@ class MockS3File extends S3File { } @Override - public void close(InputStream inputStream) { + public void close(InputStream inputStream, int numRemainingBytes) { // do nothing since we do not use a real S3 stream. } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index e3c6c002bff..cf3ad400afe 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -51,24 +52,27 @@ public class TestS3InputStream extends AbstractHadoopTestBase { public void testArgChecks() throws Exception { S3AReadOpContext readContext = Fakes.createReadContext(futurePool, "key", 10, 10, 1); S3ObjectAttributes attrs = Fakes.createObjectAttributes("bucket", "key", 10); + S3AInputStreamStatistics stats = + readContext.getS3AStatisticsContext().newInputStreamStatistics(); // Should not throw. - new S3CachingInputStream(readContext, attrs, client); + new S3CachingInputStream(readContext, attrs, client, stats); ExceptionAsserts.assertThrows( - IllegalArgumentException.class, - "'context' must not be null", - () -> new S3CachingInputStream(null, attrs, client)); + NullPointerException.class, + () -> new S3CachingInputStream(null, attrs, client, stats)); ExceptionAsserts.assertThrows( - IllegalArgumentException.class, - "'s3Attributes' must not be null", - () -> new S3CachingInputStream(readContext, null, client)); + NullPointerException.class, + () -> new S3CachingInputStream(readContext, null, client, stats)); ExceptionAsserts.assertThrows( - IllegalArgumentException.class, - "'client' must not be null", - () -> new S3CachingInputStream(readContext, attrs, null)); + NullPointerException.class, + () -> new S3CachingInputStream(readContext, attrs, null, stats)); + + ExceptionAsserts.assertThrows( + NullPointerException.class, + () -> new S3CachingInputStream(readContext, attrs, client, null)); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index d73a938bcce..77ff32d0162 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3AInputStream; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; @@ -92,6 +93,14 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase { private boolean testDataAvailable = true; private String assumptionMessage = "test file"; + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + conf.setBoolean(PREFETCH_ENABLED_KEY, false); + return conf; + } + /** * Open the FS and the test data. The input stream is always set up here. * @throws IOException IO Problems.