From 4c8cd61961c567b7469ac7730244d67370d4f3f4 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Wed, 27 Jul 2022 01:11:22 +0530 Subject: [PATCH] HADOOP-17461. Collect thread-level IOStatistics. (#4352) This adds a thread-level collector of IOStatistics, IOStatisticsContext, which can be: * Retrieved for a thread and cached for access from other threads. * reset() to record new statistics. * Queried for live statistics through the IOStatisticsSource.getIOStatistics() method. * Queries for a statistics aggregator for use in instrumented classes. * Asked to create a serializable copy in snapshot() The goal is to make it possible for applications with multiple threads performing different work items simultaneously to be able to collect statistics on the individual threads, and so generate aggregate reports on the total work performed for a specific job, query or similar unit of work. Some changes in IOStatistics-gathering classes are needed for this feature * Caching the active context's aggregator in the object's constructor * Updating it in close() Slightly more work is needed in multithreaded code, such as the S3A committers, which collect statistics across all threads used in task and job commit operations. Currently the IOStatisticsContext-aware classes are: * The S3A input stream, output stream and list iterators. * RawLocalFileSystem's input and output streams. * The S3A committers. * The TaskPool class in hadoop-common, which propagates the active context into scheduled worker threads. Collection of statistics in the IOStatisticsContext is disabled process-wide by default until the feature is considered stable. To enable the collection, set the option fs.thread.level.iostatistics.enabled to "true" in core-site.xml; Contributed by Mehakmeet Singh and Steve Loughran --- .../hadoop/fs/CommonConfigurationKeys.java | 14 + .../apache/hadoop/fs/RawLocalFileSystem.java | 43 +- .../apache/hadoop/fs/StreamCapabilities.java | 6 + .../fs/impl/WeakReferenceThreadMap.java | 13 +- .../fs/statistics/IOStatisticsContext.java | 83 +++ .../impl/EmptyIOStatisticsContextImpl.java | 81 +++ .../impl/IOStatisticsContextImpl.java | 128 +++++ .../impl/IOStatisticsContextIntegration.java | 167 ++++++ .../hadoop/util/functional/TaskPool.java | 131 +++-- .../org/apache/hadoop/fs/s3a/Listing.java | 24 +- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 47 ++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 8 +- .../apache/hadoop/fs/s3a/S3AInputStream.java | 22 +- .../hadoop/fs/s3a/S3AReadOpContext.java | 19 +- .../fs/s3a/commit/AbstractS3ACommitter.java | 40 +- .../hadoop/fs/s3a/commit/CommitConstants.java | 20 + .../fs/s3a/commit/impl/CommitContext.java | 71 ++- .../fs/s3a/commit/impl/CommitOperations.java | 11 +- .../commit/magic/MagicS3GuardCommitter.java | 7 + .../s3a/commit/staging/StagingCommitter.java | 7 + .../hadoop/fs/s3a/AbstractS3ATestBase.java | 63 +++ .../fs/s3a/ITestS3AIOStatisticsContext.java | 487 ++++++++++++++++++ .../fs/s3a/TestS3ABlockOutputStream.java | 7 +- .../fs/s3a/commit/AbstractCommitITest.java | 25 +- .../TestStagingDirectoryOutputCommitter.java | 4 +- .../ITestDirectoryCommitProtocol.java | 9 + 26 files changed, 1458 insertions(+), 79 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 16144206eee..7c54b32dc3b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -475,4 +475,18 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { * default hadoop temp dir on local system: {@value}. */ public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; + + /** + * Thread-level IOStats Support. + * {@value} + */ + public static final String THREAD_LEVEL_IOSTATISTICS_ENABLED = + "fs.thread.level.iostatistics.enabled"; + + /** + * Default value for Thread-level IOStats Support is true. + */ + public static final boolean THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT = + true; + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index f525c3cba78..d9ceab9a054 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -57,6 +57,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; @@ -156,11 +158,19 @@ public class RawLocalFileSystem extends FileSystem { /** Reference to the bytes read counter for slightly faster counting. */ private final AtomicLong bytesRead; + /** + * Thread level IOStatistics aggregator to update in close(). + */ + private final IOStatisticsAggregator + ioStatisticsAggregator; + public LocalFSFileInputStream(Path f) throws IOException { name = pathToFile(f); fis = new FileInputStream(name); bytesRead = ioStatistics.getCounterReference( STREAM_READ_BYTES); + ioStatisticsAggregator = + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(); } @Override @@ -193,9 +203,13 @@ public class RawLocalFileSystem extends FileSystem { @Override public void close() throws IOException { - fis.close(); - if (asyncChannel != null) { - asyncChannel.close(); + try { + fis.close(); + if (asyncChannel != null) { + asyncChannel.close(); + } + } finally { + ioStatisticsAggregator.aggregate(ioStatistics); } } @@ -278,6 +292,7 @@ public class RawLocalFileSystem extends FileSystem { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: case StreamCapabilities.VECTOREDIO: return true; default: @@ -407,9 +422,19 @@ public class RawLocalFileSystem extends FileSystem { STREAM_WRITE_EXCEPTIONS) .build(); + /** + * Thread level IOStatistics aggregator to update in close(). + */ + private final IOStatisticsAggregator + ioStatisticsAggregator; + private LocalFSFileOutputStream(Path f, boolean append, FsPermission permission) throws IOException { File file = pathToFile(f); + // store the aggregator before attempting any IO. + ioStatisticsAggregator = + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(); + if (!append && permission == null) { permission = FsPermission.getFileDefault(); } @@ -436,10 +461,17 @@ public class RawLocalFileSystem extends FileSystem { } /* - * Just forward to the fos + * Close the fos; update the IOStatisticsContext. */ @Override - public void close() throws IOException { fos.close(); } + public void close() throws IOException { + try { + fos.close(); + } finally { + ioStatisticsAggregator.aggregate(ioStatistics); + } + } + @Override public void flush() throws IOException { fos.flush(); } @Override @@ -485,6 +517,7 @@ public class RawLocalFileSystem extends FileSystem { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: return true; default: return StoreImplementationUtils.isProbeForSyncable(capability); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index d68ef505dc3..c925e50889d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -93,6 +93,12 @@ public interface StreamCapabilities { */ String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM; + /** + * Streams that support IOStatistics context and capture thread-level + * IOStatistics. + */ + String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported"; + /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java index b24bef2a816..16fe0da7c5a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.impl; +import java.lang.ref.WeakReference; import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.Nullable; @@ -48,7 +49,17 @@ public class WeakReferenceThreadMap extends WeakReferenceMap { } public V setForCurrentThread(V newVal) { - return put(currentThreadId(), newVal); + long id = currentThreadId(); + + // if the same object is already in the map, just return it. + WeakReference ref = lookup(id); + // Reference value could be set to null. Thus, ref.get() could return + // null. Should be handled accordingly while using the returned value. + if (ref != null && ref.get() == newVal) { + return ref.get(); + } + + return put(id, newVal); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java new file mode 100644 index 00000000000..fb10b93848f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics; + +import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration; + +/** + * An interface defined to capture thread-level IOStatistics by using per + * thread context. + *

+ * The aggregator should be collected in their constructor by statistics-generating + * classes to obtain the aggregator to update across all threads. + *

+ * The {@link #snapshot()} call creates a snapshot of the statistics; + *

+ * The {@link #reset()} call resets the statistics in the context so + * that later snapshots will get the incremental data. + */ +public interface IOStatisticsContext extends IOStatisticsSource { + + /** + * Get the IOStatisticsAggregator for the context. + * + * @return return the aggregator for the context. + */ + IOStatisticsAggregator getAggregator(); + + /** + * Capture the snapshot of the context's IOStatistics. + * + * @return IOStatisticsSnapshot for the context. + */ + IOStatisticsSnapshot snapshot(); + + /** + * Get a unique ID for this context, for logging + * purposes. + * + * @return an ID unique for all contexts in this process. + */ + long getID(); + + /** + * Reset the context's IOStatistics. + */ + void reset(); + + /** + * Get the context's IOStatisticsContext. + * + * @return instance of IOStatisticsContext for the context. + */ + static IOStatisticsContext getCurrentIOStatisticsContext() { + return IOStatisticsContextIntegration.getCurrentIOStatisticsContext(); + } + + /** + * Set the IOStatisticsContext for the current thread. + * @param statisticsContext IOStatistics context instance for the + * current thread. If null, the context is reset. + */ + static void setThreadIOStatisticsContext( + IOStatisticsContext statisticsContext) { + IOStatisticsContextIntegration.setThreadIOStatisticsContext( + statisticsContext); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java new file mode 100644 index 00000000000..b672f6639cb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +/** + * Empty IOStatistics context which serves no-op for all the operations and + * returns an empty Snapshot if asked. + * + */ +final class EmptyIOStatisticsContextImpl implements IOStatisticsContext { + + private static final IOStatisticsContext EMPTY_CONTEXT = new EmptyIOStatisticsContextImpl(); + + private EmptyIOStatisticsContextImpl() { + } + + /** + * Create a new empty snapshot. + * A new one is always created for isolation. + * + * @return a statistics snapshot + */ + @Override + public IOStatisticsSnapshot snapshot() { + return new IOStatisticsSnapshot(); + } + + @Override + public IOStatisticsAggregator getAggregator() { + return EmptyIOStatisticsStore.getInstance(); + } + + @Override + public IOStatistics getIOStatistics() { + return EmptyIOStatistics.getInstance(); + } + + @Override + public void reset() {} + + /** + * The ID is always 0. + * As the real context implementation counter starts at 1, + * we are guaranteed to have unique IDs even between them and + * the empty context. + * @return 0 + */ + @Override + public long getID() { + return 0; + } + + /** + * Get the single instance. + * @return an instance. + */ + static IOStatisticsContext getInstance() { + return EMPTY_CONTEXT; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java new file mode 100644 index 00000000000..97a85281c4f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +/** + * Implementing the IOStatisticsContext. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsSnapshot. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public final class IOStatisticsContextImpl implements IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextImpl.class); + + /** + * Thread ID. + */ + private final long threadId; + + /** + * Unique ID. + */ + private final long id; + + /** + * IOStatistics to aggregate. + */ + private final IOStatisticsSnapshot ioStatistics = new IOStatisticsSnapshot(); + + /** + * Constructor. + * @param threadId thread ID + * @param id instance ID. + */ + public IOStatisticsContextImpl(final long threadId, final long id) { + this.threadId = threadId; + this.id = id; + } + + @Override + public String toString() { + return "IOStatisticsContextImpl{" + + "id=" + id + + ", threadId=" + threadId + + ", ioStatistics=" + ioStatistics + + '}'; + } + + /** + * Get the IOStatisticsAggregator of the context. + * @return the instance of IOStatisticsAggregator for this context. + */ + @Override + public IOStatisticsAggregator getAggregator() { + return ioStatistics; + } + + /** + * Returns a snapshot of the current thread's IOStatistics. + * + * @return IOStatisticsSnapshot of the context. + */ + @Override + public IOStatisticsSnapshot snapshot() { + LOG.debug("Taking snapshot of IOStatisticsContext id {}", id); + return new IOStatisticsSnapshot(ioStatistics); + } + + /** + * Reset the thread +. + */ + @Override + public void reset() { + LOG.debug("clearing IOStatisticsContext id {}", id); + ioStatistics.clear(); + } + + @Override + public IOStatistics getIOStatistics() { + return ioStatistics; + } + + /** + * ID of this context. + * @return ID. + */ + @Override + public long getID() { + return id; + } + + /** + * Get the thread ID. + * @return thread ID. + */ + public long getThreadID() { + return threadId; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java new file mode 100644 index 00000000000..483d1e4570f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.lang.ref.WeakReference; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * A Utility class for IOStatisticsContext, which helps in creating and + * getting the current active context. Static methods in this class allows to + * get the current context to start aggregating the IOStatistics. + * + * Static initializer is used to work out if the feature to collect + * thread-level IOStatistics is enabled or not and the corresponding + * implementation class is called for it. + * + * Weak Reference thread map to be used to keep track of different context's + * to avoid long-lived memory leakages as these references would be cleaned + * up at GC. + */ +public final class IOStatisticsContextIntegration { + + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextIntegration.class); + + /** + * Is thread-level IO Statistics enabled? + */ + private static boolean isThreadIOStatsEnabled; + + /** + * ID for next instance to create. + */ + public static final AtomicLong INSTANCE_ID = new AtomicLong(1); + + /** + * Active IOStatistics Context containing different worker thread's + * statistics. Weak Reference so that it gets cleaned up during GC and we + * avoid any memory leak issues due to long lived references. + */ + private static final WeakReferenceThreadMap + ACTIVE_IOSTATS_CONTEXT = + new WeakReferenceThreadMap<>( + IOStatisticsContextIntegration::createNewInstance, + IOStatisticsContextIntegration::referenceLostContext + ); + + static { + // Work out if the current context has thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + isThreadIOStatsEnabled = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Private constructor for a utility class to be used in IOStatisticsContext. + */ + private IOStatisticsContextIntegration() {} + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. + * @param key Thread ID that represents which thread the context belongs to. + * @return an instance of IOStatisticsContext. + */ + private static IOStatisticsContext createNewInstance(Long key) { + return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement()); + } + + /** + * In case of reference loss for IOStatisticsContext. + * @param key ThreadID. + */ + private static void referenceLostContext(Long key) { + LOG.debug("Reference lost for threadID for the context: {}", key); + } + + /** + * Get the current thread's IOStatisticsContext instance. If no instance is + * present for this thread ID, create one using the factory. + * @return instance of IOStatisticsContext. + */ + public static IOStatisticsContext getCurrentIOStatisticsContext() { + return isThreadIOStatsEnabled + ? ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() + : EmptyIOStatisticsContextImpl.getInstance(); + } + + /** + * Set the IOStatisticsContext for the current thread. + * @param statisticsContext IOStatistics context instance for the + * current thread. If null, the context is reset. + */ + public static void setThreadIOStatisticsContext( + IOStatisticsContext statisticsContext) { + if (isThreadIOStatsEnabled) { + if (statisticsContext == null) { + ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread(); + } + if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) { + ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext); + } + } + } + + /** + * Get thread ID specific IOStatistics values if + * statistics are enabled and the thread ID is in the map. + * @param testThreadId thread ID. + * @return IOStatisticsContext if found in the map. + */ + @VisibleForTesting + public static IOStatisticsContext getThreadSpecificIOStatisticsContext(long testThreadId) { + LOG.debug("IOStatsContext thread ID required: {}", testThreadId); + + if (!isThreadIOStatsEnabled) { + return null; + } + // lookup the weakRef IOStatisticsContext for the thread ID in the + // ThreadMap. + WeakReference ioStatisticsSnapshotWeakReference = + ACTIVE_IOSTATS_CONTEXT.lookup(testThreadId); + if (ioStatisticsSnapshotWeakReference != null) { + return ioStatisticsSnapshotWeakReference.get(); + } + return null; + } + + /** + * A method to enable IOStatisticsContext to override if set otherwise in + * the configurations for tests. + */ + @VisibleForTesting + public static void enableIOStatisticsContext() { + if (!isThreadIOStatsEnabled) { + LOG.info("Enabling Thread IOStatistics.."); + isThreadIOStatsEnabled = true; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java index 0abaab211de..c9e6d0b78ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable; @@ -136,6 +137,15 @@ public final class TaskPool { private boolean stopAbortsOnFailure = false; private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION; + /** + * IOStatisticsContext to switch to in all threads + * taking part in the commit operation. + * This ensures that the IOStatistics collected in the + * worker threads will be aggregated into the total statistics + * of the thread calling the committer commit/abort methods. + */ + private IOStatisticsContext ioStatisticsContext = null; + /** * Create the builder. * @param items items to process @@ -242,7 +252,7 @@ public final class TaskPool { * @param value new value * @return the builder */ - public Builder sleepInterval(final int value) { + public Builder sleepInterval(final int value) { sleepInterval = value; return this; } @@ -364,6 +374,8 @@ public final class TaskPool { /** * Parallel execution. + * All tasks run within the same IOStatisticsContext as the + * thread calling this method. * @param task task to execute * @param exception which may be raised in execution. * @return true if the operation executed successfully @@ -379,64 +391,70 @@ public final class TaskPool { final AtomicBoolean revertFailed = new AtomicBoolean(false); List> futures = new ArrayList<>(); + ioStatisticsContext = IOStatisticsContext.getCurrentIOStatisticsContext(); IOException iteratorIOE = null; final RemoteIterator iterator = this.items; try { - while(iterator.hasNext()) { + while (iterator.hasNext()) { final I item = iterator.next(); // submit a task for each item that will either run or abort the task futures.add(service.submit(() -> { - if (!(stopOnFailure && taskFailed.get())) { - // run the task - boolean threw = true; - try { - LOG.debug("Executing task"); - task.run(item); - succeeded.add(item); - LOG.debug("Task succeeded"); + setStatisticsContext(); + try { + if (!(stopOnFailure && taskFailed.get())) { + // prepare and run the task + boolean threw = true; + try { + LOG.debug("Executing task"); + task.run(item); + succeeded.add(item); + LOG.debug("Task succeeded"); - threw = false; + threw = false; - } catch (Exception e) { - taskFailed.set(true); - exceptions.add(e); - LOG.info("Task failed {}", e.toString()); - LOG.debug("Task failed", e); + } catch (Exception e) { + taskFailed.set(true); + exceptions.add(e); + LOG.info("Task failed {}", e.toString()); + LOG.debug("Task failed", e); - if (onFailure != null) { - try { - onFailure.run(item, e); - } catch (Exception failException) { - LOG.warn("Failed to clean up on failure", e); - // swallow the exception + if (onFailure != null) { + try { + onFailure.run(item, e); + } catch (Exception failException) { + LOG.warn("Failed to clean up on failure", e); + // swallow the exception + } + } + } finally { + if (threw) { + taskFailed.set(true); } } - } finally { - if (threw) { - taskFailed.set(true); - } - } - - } else if (abortTask != null) { - // abort the task instead of running it - if (stopAbortsOnFailure && abortFailed.get()) { - return; - } - - boolean failed = true; - try { - LOG.info("Aborting task"); - abortTask.run(item); - failed = false; - } catch (Exception e) { - LOG.error("Failed to abort task", e); - // swallow the exception - } finally { - if (failed) { - abortFailed.set(true); + + } else if (abortTask != null) { + // abort the task instead of running it + if (stopAbortsOnFailure && abortFailed.get()) { + return; + } + + boolean failed = true; + try { + LOG.info("Aborting task"); + abortTask.run(item); + failed = false; + } catch (Exception e) { + LOG.error("Failed to abort task", e); + // swallow the exception + } finally { + if (failed) { + abortFailed.set(true); + } } } + } finally { + resetStatisticsContext(); } })); } @@ -447,7 +465,6 @@ public final class TaskPool { // mark as a task failure so all submitted tasks will halt/abort taskFailed.set(true); } - // let the above tasks complete (or abort) waitFor(futures, sleepInterval); int futureCount = futures.size(); @@ -464,6 +481,7 @@ public final class TaskPool { } boolean failed = true; + setStatisticsContext(); try { revertTask.run(item); failed = false; @@ -474,6 +492,7 @@ public final class TaskPool { if (failed) { revertFailed.set(true); } + resetStatisticsContext(); } })); } @@ -498,6 +517,26 @@ public final class TaskPool { // return true if all tasks succeeded. return !taskFailed.get(); } + + /** + * Set the statistics context for this thread. + */ + private void setStatisticsContext() { + if (ioStatisticsContext != null) { + IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); + } + } + + /** + * Reset the statistics context if it was set earlier. + * This unbinds the current thread from any statistics + * context. + */ + private void resetStatisticsContext() { + if (ioStatisticsContext != null) { + IOStatisticsContext.setThreadIOStatisticsContext(null); + } + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index a1dd4d8df02..6c39cc4b642 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -31,7 +31,9 @@ import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.util.functional.RemoteIterators; @@ -333,7 +335,7 @@ public class Listing extends AbstractStoreOperation { * Thread safety: None. */ class FileStatusListingIterator - implements RemoteIterator, IOStatisticsSource { + implements RemoteIterator, IOStatisticsSource, Closeable { /** Source of objects. */ private final ObjectListingIterator source; @@ -403,6 +405,14 @@ public class Listing extends AbstractStoreOperation { return status; } + /** + * Close, if called, will update + * the thread statistics context with the value. + */ + @Override + public void close() { + source.close(); + } /** * Try to retrieve another batch. * Note that for the initial batch, @@ -545,6 +555,11 @@ public class Listing extends AbstractStoreOperation { private final AuditSpan span; + /** + * Context statistics aggregator. + */ + private final IOStatisticsAggregator aggregator; + /** The most recent listing results. */ private S3ListResult objects; @@ -601,6 +616,8 @@ public class Listing extends AbstractStoreOperation { this.span = span; this.s3ListResultFuture = listingOperationCallbacks .listObjectsAsync(request, iostats, span); + this.aggregator = IOStatisticsContext.getCurrentIOStatisticsContext() + .getAggregator(); } /** @@ -693,11 +710,12 @@ public class Listing extends AbstractStoreOperation { } /** - * Close, if actually called, will close the span - * this listing was created with. + * Close, if called, will update + * the thread statistics context with the value. */ @Override public void close() { + aggregator.aggregate(getIOStatistics()); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 8b1865c77c9..19943ff2f70 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -41,6 +41,7 @@ import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; @@ -165,6 +166,9 @@ class S3ABlockOutputStream extends OutputStream implements /** is client side encryption enabled? */ private final boolean isCSEEnabled; + /** Thread level IOStatistics Aggregator. */ + private final IOStatisticsAggregator threadIOStatisticsAggregator; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -201,6 +205,7 @@ class S3ABlockOutputStream extends OutputStream implements initMultipartUpload(); } this.isCSEEnabled = builder.isCSEEnabled; + this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; } /** @@ -454,11 +459,23 @@ class S3ABlockOutputStream extends OutputStream implements */ private synchronized void cleanupOnClose() { cleanupWithLogger(LOG, getActiveBlock(), blockFactory); + mergeThreadIOStatistics(statistics.getIOStatistics()); LOG.debug("Statistics: {}", statistics); cleanupWithLogger(LOG, statistics); clearActiveBlock(); } + /** + * Merging the current thread's IOStatistics with the current IOStatistics + * context. + * + * @param streamStatistics Stream statistics to be merged into thread + * statistics aggregator. + */ + private void mergeThreadIOStatistics(IOStatistics streamStatistics) { + getThreadIOStatistics().aggregate(streamStatistics); + } + /** * Best effort abort of the multipart upload; sets * the field to null afterwards. @@ -662,6 +679,10 @@ class S3ABlockOutputStream extends OutputStream implements case StreamCapabilities.ABORTABLE_STREAM: return true; + // IOStatistics context support for thread-level IOStatistics. + case StreamCapabilities.IOSTATISTICS_CONTEXT: + return true; + default: return false; } @@ -701,6 +722,14 @@ class S3ABlockOutputStream extends OutputStream implements return iostatistics; } + /** + * Get the IOStatistics aggregator passed in the builder. + * @return an aggregator + */ + protected IOStatisticsAggregator getThreadIOStatistics() { + return threadIOStatisticsAggregator; + } + /** * Multiple partition upload. */ @@ -1092,6 +1121,11 @@ class S3ABlockOutputStream extends OutputStream implements */ private PutObjectOptions putOptions; + /** + * thread-level IOStatistics Aggregator. + */ + private IOStatisticsAggregator ioStatisticsAggregator; + private BlockOutputStreamBuilder() { } @@ -1108,6 +1142,7 @@ class S3ABlockOutputStream extends OutputStream implements requireNonNull(putOptions, "null putOptions"); Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, "Block size is too small: %s", blockSize); + requireNonNull(ioStatisticsAggregator, "null ioStatisticsAggregator"); } /** @@ -1229,5 +1264,17 @@ class S3ABlockOutputStream extends OutputStream implements putOptions = value; return this; } + + /** + * Set builder value. + * + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withIOStatisticsAggregator( + final IOStatisticsAggregator value) { + ioStatisticsAggregator = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 406f40bff95..c49c368bbbe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -131,6 +131,7 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.store.audit.AuditEntryPoint; import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource; @@ -1576,7 +1577,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, statistics, statisticsContext, fileStatus, - vectoredIOContext) + vectoredIOContext, + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) .withAuditSpan(auditSpan); openFileHelper.applyDefaultOptions(roc); return roc.build(); @@ -1743,7 +1745,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, DOWNGRADE_SYNCABLE_EXCEPTIONS, DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT)) .withCSEEnabled(isCSEEnabled) - .withPutOptions(putOptions); + .withPutOptions(putOptions) + .withIOStatisticsAggregator( + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3069f172891..178a807733a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -37,6 +37,7 @@ import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; @@ -53,9 +54,9 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.functional.CallableRaisingIOE; import static java.util.Objects.requireNonNull; @@ -187,6 +188,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private long asyncDrainThreshold; + /** Aggregator used to aggregate per thread IOStatistics. */ + private final IOStatisticsAggregator threadIOStatistics; + /** * Create the stream. * This does not attempt to open it; that is only done on the first @@ -225,6 +229,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); this.unboundedThreadPool = unboundedThreadPool; this.vectoredIOContext = context.getVectoredIOContext(); + this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator()); } /** @@ -600,7 +605,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, stopVectoredIOOperations.set(true); // close or abort the stream; blocking awaitFuture(closeStream("close() operation", false, true)); - LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); // end the client+audit span. client.close(); // this is actually a no-op @@ -608,10 +612,23 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, } finally { // merge the statistics back into the FS statistics. streamStatistics.close(); + // Collect ThreadLevel IOStats + mergeThreadIOStatistics(streamStatistics.getIOStatistics()); } } } + /** + * Merging the current thread's IOStatistics with the current IOStatistics + * context. + * + * @param streamIOStats Stream statistics to be merged into thread + * statistics aggregator. + */ + private void mergeThreadIOStatistics(IOStatistics streamIOStats) { + threadIOStatistics.aggregate(streamIOStats); + } + /** * Close a stream: decide whether to abort or close, based on * the length of the stream and the current position. @@ -1331,6 +1348,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, public boolean hasCapability(String capability) { switch (toLowerCase(capability)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: case StreamCapabilities.READAHEAD: case StreamCapabilities.UNBUFFER: case StreamCapabilities.VECTOREDIO: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index 803b7757d25..bbd86ef5ac2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.store.audit.AuditSpan; import javax.annotation.Nullable; @@ -70,6 +71,9 @@ public class S3AReadOpContext extends S3AOpContext { */ private final VectoredIOContext vectoredIOContext; + /** Thread-level IOStatistics aggregator. **/ + private final IOStatisticsAggregator ioStatisticsAggregator; + /** * Instantiate. * @param path path of read @@ -78,6 +82,7 @@ public class S3AReadOpContext extends S3AOpContext { * @param instrumentation statistics context * @param dstFileStatus target file status * @param vectoredIOContext context for vectored read operation. + * @param ioStatisticsAggregator IOStatistics aggregator for each thread. */ public S3AReadOpContext( final Path path, @@ -85,11 +90,13 @@ public class S3AReadOpContext extends S3AOpContext { @Nullable FileSystem.Statistics stats, S3AStatisticsContext instrumentation, FileStatus dstFileStatus, - VectoredIOContext vectoredIOContext) { + VectoredIOContext vectoredIOContext, + IOStatisticsAggregator ioStatisticsAggregator) { super(invoker, stats, instrumentation, dstFileStatus); this.path = requireNonNull(path); this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext"); + this.ioStatisticsAggregator = ioStatisticsAggregator; } /** @@ -105,6 +112,7 @@ public class S3AReadOpContext extends S3AOpContext { "invalid readahead %d", readahead); Preconditions.checkArgument(asyncDrainThreshold >= 0, "invalid drainThreshold %d", asyncDrainThreshold); + requireNonNull(ioStatisticsAggregator, "ioStatisticsAggregator"); return this; } @@ -215,6 +223,15 @@ public class S3AReadOpContext extends S3AOpContext { return vectoredIOContext; } + /** + * Return the IOStatistics aggregator. + * + * @return instance of IOStatisticsAggregator. + */ + public IOStatisticsAggregator getIOStatisticsAggregator() { + return ioStatisticsAggregator; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index 78b687cc6f1..d6044edde29 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.audit.AuditConstants; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -467,8 +468,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter * * While the classic committers create a 0-byte file, the S3A committers * PUT up a the contents of a {@link SuccessData} file. - * - * @param context job context + * @param commitContext commit context * @param pending the pending commits * * @return the success data, even if the marker wasn't created @@ -476,7 +476,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter * @throws IOException IO failure */ protected SuccessData maybeCreateSuccessMarkerFromCommits( - JobContext context, + final CommitContext commitContext, ActiveCommit pending) throws IOException { List filenames = new ArrayList<>(pending.size()); // The list of committed objects in pending is size limited in @@ -488,7 +488,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter // and the current statistics snapshot.aggregate(getIOStatistics()); - return maybeCreateSuccessMarker(context, filenames, snapshot); + // and include the context statistics if enabled + if (commitContext.isCollectIOStatistics()) { + snapshot.aggregate(commitContext.getIOStatisticsContext() + .getIOStatistics()); + } + + return maybeCreateSuccessMarker(commitContext.getJobContext(), filenames, snapshot); } /** @@ -729,6 +735,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter final FileStatus status) throws IOException { final Path path = status.getPath(); + commitContext.switchToIOStatisticsContext(); try (DurationInfo ignored = new DurationInfo(LOG, "Loading and committing files in pendingset %s", path)) { @@ -775,6 +782,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter final FileStatus status) throws IOException { final Path path = status.getPath(); + commitContext.switchToIOStatisticsContext(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Committing %s", path)) { PendingSet pendingSet = PersistentCommitData.load( @@ -806,6 +814,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter final boolean deleteRemoteFiles) throws IOException { final Path path = status.getPath(); + commitContext.switchToIOStatisticsContext(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Aborting %s", path)) { PendingSet pendingSet = PersistentCommitData.load( @@ -832,6 +841,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter /** * Start the final job commit/abort commit operations. + * If configured to collect statistics, + * The IO StatisticsContext is reset. * @param context job context * @return a commit context through which the operations can be invoked. * @throws IOException failure. @@ -840,14 +851,22 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter final JobContext context) throws IOException { - return getCommitOperations().createCommitContext( + IOStatisticsContext ioStatisticsContext = + IOStatisticsContext.getCurrentIOStatisticsContext(); + CommitContext commitContext = getCommitOperations().createCommitContext( context, getOutputPath(), - getJobCommitThreadCount(context)); + getJobCommitThreadCount(context), + ioStatisticsContext); + commitContext.maybeResetIOStatisticsContext(); + return commitContext; } + /** * Start a ask commit/abort commit operations. * This may have a different thread count. + * If configured to collect statistics, + * The IO StatisticsContext is reset. * @param context job or task context * @return a commit context through which the operations can be invoked. * @throws IOException failure. @@ -856,10 +875,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter final JobContext context) throws IOException { - return getCommitOperations().createCommitContext( + CommitContext commitContext = getCommitOperations().createCommitContext( context, getOutputPath(), - getTaskCommitThreadCount(context)); + getTaskCommitThreadCount(context), + IOStatisticsContext.getCurrentIOStatisticsContext()); + commitContext.maybeResetIOStatisticsContext(); + return commitContext; } /** @@ -1014,7 +1036,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter stage = "completed"; jobCompleted(true); stage = "marker"; - successData = maybeCreateSuccessMarkerFromCommits(context, pending); + successData = maybeCreateSuccessMarkerFromCommits(commitContext, pending); stage = "cleanup"; cleanup(commitContext, false); } catch (IOException e) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index b85ce276504..6e2a5d8c9fb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -354,4 +354,24 @@ public final class CommitConstants { public static final String OPT_SUMMARY_REPORT_DIR = OPT_PREFIX + "summary.report.directory"; + /** + * Experimental feature to collect thread level IO statistics. + * When set the committers will reset the statistics in + * task setup and propagate to the job committer. + * The job comitter will include those and its own statistics. + * Do not use if the execution engine is collecting statistics, + * as the multiple reset() operations will result in incomplete + * statistics. + * Value: {@value}. + */ + public static final String S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS = + OPT_PREFIX + "experimental.collect.iostatistics"; + + /** + * Default value for {@link #S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS}. + * Value: {@value}. + */ + public static final boolean S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT = + false; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java index 8bff165f2e8..8ac3dcb231d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java @@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.commit.impl; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -47,6 +49,8 @@ import org.apache.hadoop.util.functional.TaskPool; import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS; import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.THREAD_PREFIX; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.THREAD_KEEP_ALIVE_TIME; /** @@ -123,24 +127,43 @@ public final class CommitContext implements Closeable { */ private final int committerThreads; + /** + * Should IOStatistics be collected by the committer? + */ + private final boolean collectIOStatistics; + + /** + * IOStatisticsContext to switch to in all threads + * taking part in the commit operation. + * This ensures that the IOStatistics collected in the + * worker threads will be aggregated into the total statistics + * of the thread calling the committer commit/abort methods. + */ + private final IOStatisticsContext ioStatisticsContext; + /** * Create. * @param commitOperations commit callbacks * @param jobContext job context * @param committerThreads number of commit threads + * @param ioStatisticsContext IOStatistics context of current thread */ public CommitContext( final CommitOperations commitOperations, final JobContext jobContext, - final int committerThreads) { + final int committerThreads, + final IOStatisticsContext ioStatisticsContext) { this.commitOperations = commitOperations; this.jobContext = jobContext; this.conf = jobContext.getConfiguration(); this.jobId = jobContext.getJobID().toString(); + this.collectIOStatistics = conf.getBoolean( + S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, + S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT); + this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext); this.auditContextUpdater = new AuditContextUpdater(jobContext); this.auditContextUpdater.updateCurrentAuditContext(); this.committerThreads = committerThreads; - buildSubmitters(); } @@ -152,15 +175,19 @@ public final class CommitContext implements Closeable { * @param conf job conf * @param jobId ID * @param committerThreads number of commit threads + * @param ioStatisticsContext IOStatistics context of current thread */ public CommitContext(final CommitOperations commitOperations, final Configuration conf, final String jobId, - final int committerThreads) { + final int committerThreads, + final IOStatisticsContext ioStatisticsContext) { this.commitOperations = commitOperations; this.jobContext = null; this.conf = conf; this.jobId = jobId; + this.collectIOStatistics = false; + this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext); this.auditContextUpdater = new AuditContextUpdater(jobId); this.auditContextUpdater.updateCurrentAuditContext(); this.committerThreads = committerThreads; @@ -358,6 +385,44 @@ public final class CommitContext implements Closeable { return jobId; } + /** + * Collecting thread level IO statistics? + * @return true if thread level IO stats should be collected. + */ + public boolean isCollectIOStatistics() { + return collectIOStatistics; + } + + /** + * IOStatistics context of the created thread. + * @return the IOStatistics. + */ + public IOStatisticsContext getIOStatisticsContext() { + return ioStatisticsContext; + } + + /** + * Switch to the context IOStatistics context, + * if needed. + */ + public void switchToIOStatisticsContext() { + IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); + } + + /** + * Reset the IOStatistics context if statistics are being + * collected. + * Logs at info. + */ + public void maybeResetIOStatisticsContext() { + if (collectIOStatistics) { + + LOG.info("Resetting IO statistics context {}", + ioStatisticsContext.getID()); + ioStatisticsContext.reset(); + } + } + /** * Submitter for a given thread pool. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index 0772e143f69..eb23f299a02 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Preconditions; @@ -639,15 +640,18 @@ public class CommitOperations extends AbstractStoreOperation * @param context job context * @param path path for all work. * @param committerThreads thread pool size + * @param ioStatisticsContext IOStatistics context of current thread * @return the commit context to pass in. * @throws IOException failure. */ public CommitContext createCommitContext( JobContext context, Path path, - int committerThreads) throws IOException { + int committerThreads, + IOStatisticsContext ioStatisticsContext) throws IOException { return new CommitContext(this, context, - committerThreads); + committerThreads, + ioStatisticsContext); } /** @@ -668,7 +672,8 @@ public class CommitOperations extends AbstractStoreOperation return new CommitContext(this, getStoreContext().getConfiguration(), id, - committerThreads); + committerThreads, + IOStatisticsContext.getCurrentIOStatisticsContext()); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 007e9b37096..9ded64eedc0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -219,6 +219,13 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { } pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId); pendingSet.setJobId(jobId); + // add in the IOStatistics of all the file loading + if (commitContext.isCollectIOStatistics()) { + pendingSet.getIOStatistics() + .aggregate( + commitContext.getIOStatisticsContext().getIOStatistics()); + } + Path jobAttemptPath = getJobAttemptPath(context); TaskAttemptID taskAttemptID = context.getTaskAttemptID(); Path taskOutcomePath = new Path(jobAttemptPath, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 36eae012dea..d764055c45b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -696,6 +696,13 @@ public class StagingCommitter extends AbstractS3ACommitter { pendingCommits.add(commit); } + // maybe add in the IOStatistics the thread + if (commitContext.isCollectIOStatistics()) { + pendingCommits.getIOStatistics().aggregate( + commitContext.getIOStatisticsContext() + .getIOStatistics()); + } + // save the data // overwrite any existing file, so whichever task attempt // committed last wins. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java index 213f94432c4..e90ad8b73ef 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.s3a.tools.MarkerTool; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.io.IOUtils; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; @@ -66,6 +68,15 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase */ private AuditSpanSource spanSource; + /** + * Atomic references to be used to re-throw an Exception or an ASE + * caught inside a lambda function. + */ + private static final AtomicReference FUTURE_EXCEPTION = + new AtomicReference<>(); + private static final AtomicReference FUTURE_ASE = + new AtomicReference<>(); + /** * Get the source. * @return span source @@ -99,6 +110,9 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase S3AFileSystem.initializeClass(); super.setup(); setSpanSource(getFileSystem()); + // Reset the current context's thread IOStatistics.` + // this ensures that the context stats will always be from the test case + IOStatisticsContext.getCurrentIOStatisticsContext().reset(); } @Override @@ -263,4 +277,53 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase Assume.assumeTrue("Skipping test if CSE is enabled", !getFileSystem().isCSEEnabled()); } + + /** + * If an exception is caught while doing some work in a Lambda function, + * store it in an atomic reference to be thrown later on. + * @param exception Exception caught. + */ + public static void setFutureException(Exception exception) { + FUTURE_EXCEPTION.set(exception); + } + + /** + * If an Assertion is caught while doing some work in a Lambda function, + * store it in an atomic reference to be thrown later on. + * + * @param ase Assertion Error caught. + */ + public static void setFutureAse(AssertionError ase) { + FUTURE_ASE.set(ase); + } + + /** + * throw the caught exception from the atomic reference and also clear the + * atomic reference so that we don't rethrow in another test. + * + * @throws Exception the exception caught. + */ + public static void maybeReThrowFutureException() throws Exception { + if (FUTURE_EXCEPTION.get() != null) { + Exception exceptionToThrow = FUTURE_EXCEPTION.get(); + // reset the atomic ref before throwing. + setFutureAse(null); + throw exceptionToThrow; + } + } + + /** + * throw the Assertion error from the atomic reference and also clear the + * atomic reference so that we don't rethrow in another test. + * + * @throws Exception Assertion error caught. + */ + public static void maybeReThrowFutureASE() throws Exception { + if (FUTURE_ASE.get() != null) { + AssertionError aseToThrow = FUTURE_ASE.get(); + // reset the atomic ref before throwing. + setFutureAse(null); + throw aseToThrow; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java new file mode 100644 index 00000000000..19c40c64661 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextImpl; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter; +import org.apache.hadoop.util.functional.TaskPool; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.enableIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getCurrentIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getThreadSpecificIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.setThreadIOStatisticsContext; +import static org.apache.hadoop.util.functional.RemoteIterators.foreach; + +/** + * Tests to verify the Thread-level IOStatistics. + */ +public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase { + + private static final int SMALL_THREADS = 2; + private static final int BYTES_BIG = 100; + private static final int BYTES_SMALL = 50; + private static final String[] IOSTATISTICS_CONTEXT_CAPABILITY = + new String[] {StreamCapabilities.IOSTATISTICS_CONTEXT}; + private ExecutorService executor; + + @Override + protected Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + enableIOStatisticsContext(); + return configuration; + } + + @Override + public void setup() throws Exception { + super.setup(); + executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + } + + @Override + public void teardown() throws Exception { + if (executor != null) { + executor.shutdown(); + } + super.teardown(); + } + + /** + * Verify that S3AInputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3AInputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = dataset(256, 'a', 'z'); + byte[] readDataFirst = new byte[BYTES_BIG]; + byte[] readDataSecond = new byte[BYTES_SMALL]; + writeDataset(fs, path, data, data.length, 1024, true); + + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + + for (int i = 0; i < SMALL_THREADS; i++) { + executor.submit(() -> { + try { + // get the thread context and reset + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + try (FSDataInputStream in = fs.open(path)) { + // Assert the InputStream's stream capability to support + // IOStatisticsContext. + assertCapabilities(in, IOSTATISTICS_CONTEXT_CAPABILITY, null); + in.seek(50); + in.read(readDataFirst); + } + assertContextBytesRead(context, BYTES_BIG); + // Stream is closed for a thread. Re-open and do more operations. + try (FSDataInputStream in = fs.open(path)) { + in.seek(100); + in.read(readDataSecond); + } + assertContextBytesRead(context, BYTES_BIG + BYTES_SMALL); + + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executor.shutdown(); + } + + // Check if an Exception or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + + } + + /** + * get the thread context and reset. + * @return thread context + */ + private static IOStatisticsContext getAndResetThreadStatisticsContext() { + IOStatisticsContext context = + IOStatisticsContext.getCurrentIOStatisticsContext(); + context.reset(); + return context; + } + + /** + * Verify that S3ABlockOutputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3ABlockOutputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] writeDataFirst = new byte[BYTES_BIG]; + byte[] writeDataSecond = new byte[BYTES_SMALL]; + + final ExecutorService executorService = + HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + for (int i = 0; i < SMALL_THREADS; i++) { + executorService.submit(() -> { + try { + // get the thread context and reset + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + try (FSDataOutputStream out = fs.create(path)) { + // Assert the OutputStream's stream capability to support + // IOStatisticsContext. + assertCapabilities(out, IOSTATISTICS_CONTEXT_CAPABILITY, null); + out.write(writeDataFirst); + } + assertContextBytesWrite(context, BYTES_BIG); + + // Stream is closed for a thread. Re-open and do more operations. + try (FSDataOutputStream out = fs.create(path)) { + out.write(writeDataSecond); + } + assertContextBytesWrite(context, BYTES_BIG + BYTES_SMALL); + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executorService.shutdown(); + } + + // Check if an Excp or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + } + + /** + * Verify stats collection and aggregation for constructor thread, Junit + * thread and a worker thread. + */ + @Test + public void testThreadIOStatisticsForDifferentThreads() + throws IOException, InterruptedException { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = new byte[BYTES_BIG]; + long threadIdForTest = Thread.currentThread().getId(); + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + Assertions.assertThat(((IOStatisticsContextImpl)context).getThreadID()) + .describedAs("Thread ID of %s", context) + .isEqualTo(threadIdForTest); + Assertions.assertThat(((IOStatisticsContextImpl)context).getID()) + .describedAs("ID of %s", context) + .isGreaterThan(0); + + // Write in the Junit thread. + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + } + + // Read in the Junit thread. + try (FSDataInputStream in = fs.open(path)) { + in.read(data); + } + + // Worker thread work and wait for it to finish. + TestWorkerThread workerThread = new TestWorkerThread(path, null); + long workerThreadID = workerThread.getId(); + workerThread.start(); + workerThread.join(); + + assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG); + assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL); + } + + /** + * Verify stats collection and aggregation for constructor thread, Junit + * thread and a worker thread. + */ + @Test + public void testThreadSharingIOStatistics() + throws IOException, InterruptedException { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = new byte[BYTES_BIG]; + long threadIdForTest = Thread.currentThread().getId(); + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + + + // Write in the Junit thread. + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + } + + // Read in the Junit thread. + try (FSDataInputStream in = fs.open(path)) { + in.read(data); + } + + // Worker thread will share the same context. + TestWorkerThread workerThread = new TestWorkerThread(path, context); + long workerThreadID = workerThread.getId(); + workerThread.start(); + workerThread.join(); + + assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG + BYTES_SMALL); + + } + + /** + * Test to verify if setting the current IOStatisticsContext removes the + * current context and creates a new instance of it. + */ + @Test + public void testSettingNullIOStatisticsContext() { + IOStatisticsContext ioStatisticsContextBefore = + getCurrentIOStatisticsContext(); + // Set the current IOStatisticsContext to null, which should remove the + // context and set a new one. + setThreadIOStatisticsContext(null); + // Get the context again after setting. + IOStatisticsContext ioStatisticsContextAfter = + getCurrentIOStatisticsContext(); + //Verify the context ID after setting to null is different than the previous + // one. + Assertions.assertThat(ioStatisticsContextBefore.getID()) + .describedAs("A new IOStaticsContext should be set after setting the " + + "current to null") + .isNotEqualTo(ioStatisticsContextAfter.getID()); + } + + /** + * Assert bytes written by the statistics context. + * + * @param context statistics context. + * @param bytes expected bytes. + */ + private void assertContextBytesWrite(IOStatisticsContext context, + int bytes) { + verifyStatisticCounterValue( + context.getIOStatistics(), + STREAM_WRITE_BYTES, + bytes); + } + + /** + * Assert bytes read by the statistics context. + * + * @param context statistics context. + * @param readBytes expected bytes. + */ + private void assertContextBytesRead(IOStatisticsContext context, + int readBytes) { + verifyStatisticCounterValue( + context.getIOStatistics(), + STREAM_READ_BYTES, + readBytes); + } + + /** + * Assert fixed bytes wrote and read for a particular thread ID. + * + * @param testThreadId thread ID. + * @param expectedBytesWrittenAndRead expected bytes. + */ + private void assertThreadStatisticsForThread(long testThreadId, + int expectedBytesWrittenAndRead) { + LOG.info("Thread ID to be asserted: {}", testThreadId); + IOStatisticsContext ioStatisticsContext = + getThreadSpecificIOStatisticsContext(testThreadId); + Assertions.assertThat(ioStatisticsContext) + .describedAs("IOStatisticsContext for %d", testThreadId) + .isNotNull(); + + + IOStatistics ioStatistics = ioStatisticsContext.snapshot(); + + + assertThatStatisticCounter(ioStatistics, + STREAM_WRITE_BYTES) + .describedAs("Bytes written are not as expected for thread : %s", + testThreadId) + .isEqualTo(expectedBytesWrittenAndRead); + + assertThatStatisticCounter(ioStatistics, + STREAM_READ_BYTES) + .describedAs("Bytes read are not as expected for thread : %s", + testThreadId) + .isEqualTo(expectedBytesWrittenAndRead); + } + + @Test + public void testListingStatisticsContext() throws Throwable { + describe("verify the list operations update on close()"); + + S3AFileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(methodPath()); + + // after all setup, get the reset context + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + IOStatistics ioStatistics = context.getIOStatistics(); + + fs.listStatus(path); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + context.reset(); + foreach(fs.listStatusIterator(path), i -> {}); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + context.reset(); + foreach(fs.listLocatedStatus(path), i -> {}); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + context.reset(); + foreach(fs.listFiles(path, true), i -> {}); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + } + + @Test + public void testListingThroughTaskPool() throws Throwable { + describe("verify the list operations are updated through taskpool"); + + S3AFileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(methodPath()); + + // after all setup, get the reset context + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + IOStatistics ioStatistics = context.getIOStatistics(); + + CloseableTaskPoolSubmitter submitter = + new CloseableTaskPoolSubmitter(executor); + TaskPool.foreach(fs.listStatusIterator(path)) + .executeWith(submitter) + .run(i -> {}); + + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + } + + /** + * Simulating doing some work in a separate thread. + * If constructed with an IOStatisticsContext then + * that context is switched to before performing the IO. + */ + private class TestWorkerThread extends Thread implements Runnable { + private final Path workerThreadPath; + + private final IOStatisticsContext ioStatisticsContext; + + /** + * create. + * @param workerThreadPath thread path. + * @param ioStatisticsContext optional statistics context * + */ + TestWorkerThread( + final Path workerThreadPath, + final IOStatisticsContext ioStatisticsContext) { + this.workerThreadPath = workerThreadPath; + this.ioStatisticsContext = ioStatisticsContext; + } + + @Override + public void run() { + S3AFileSystem fs = getFileSystem(); + byte[] data = new byte[BYTES_SMALL]; + + // maybe switch context + if (ioStatisticsContext != null) { + IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); + } + + // Write in the worker thread. + try (FSDataOutputStream out = fs.create(workerThreadPath)) { + out.write(data); + } catch (IOException e) { + throw new UncheckedIOException("Failure while writing", e); + } + + //Read in the worker thread. + try (FSDataInputStream in = fs.open(workerThreadPath)) { + in.read(data); + } catch (IOException e) { + throw new UncheckedIOException("Failure while reading", e); + } + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index 08f4f8bc9df..d38b5c93a67 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.s3a.audit.AuditTestSupport; import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.util.Progressable; import org.junit.Before; import org.junit.Test; @@ -67,7 +68,11 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { .withProgress(progressable) .withPutTracker(putTracker) .withWriteOperations(oHelper) - .withPutOptions(PutObjectOptions.keepingDirs()); + .withPutOptions(PutObjectOptions.keepingDirs()) + .withIOStatisticsAggregator( + IOStatisticsContext.getCurrentIOStatisticsContext() + .getAggregator()); + return builder; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 9d9000cafb9..99872575271 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -26,6 +26,7 @@ import java.time.format.DateTimeFormatterBuilder; import java.util.List; import org.assertj.core.api.Assertions; +import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,9 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSupport; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -66,6 +70,12 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitITest.class); + /** + * Job statistics accrued across all test cases. + */ + private static final IOStatisticsSnapshot JOB_STATISTICS = + IOStatisticsSupport.snapshotIOStatistics(); + /** * Helper class for commit operations and assertions. */ @@ -92,7 +102,8 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { FS_S3A_COMMITTER_NAME, FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, - FAST_UPLOAD_BUFFER); + FAST_UPLOAD_BUFFER, + S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS); conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); @@ -100,9 +111,15 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY); // and bind the report dir conf.set(OPT_SUMMARY_REPORT_DIR, reportDir.toURI().toString()); + conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, true); return conf; } + @AfterClass + public static void printStatistics() { + LOG.info("Aggregate job statistics {}\n", + IOStatisticsLogging.ioStatisticsToPrettyString(JOB_STATISTICS)); + } /** * Get the log; can be overridden for test case log. * @return a log. @@ -397,6 +414,8 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { /** * Load a success file; fail if the file is empty/nonexistent. + * The statistics in {@link #JOB_STATISTICS} are updated with + * the statistics from the success file * @param fs filesystem * @param outputPath directory containing the success file. * @param origin origin of the file @@ -426,6 +445,8 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { String body = ContractTestUtils.readUTF8(fs, success, -1); LOG.info("Loading committer success file {}. Actual contents=\n{}", success, body); - return SuccessData.load(fs, success); + SuccessData successData = SuccessData.load(fs, success); + JOB_STATISTICS.aggregate(successData.getIOStatistics()); + return successData; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java index b92605ca253..439ef9aa44f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.impl.CommitContext; import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; @@ -92,7 +93,8 @@ public class TestStagingDirectoryOutputCommitter // this is done by calling the preCommit method directly, final CommitContext commitContext = new CommitOperations(getWrapperFS()). - createCommitContext(getJob(), getOutputPath(), 0); + createCommitContext(getJob(), getOutputPath(), 0, + IOStatisticsContext.getCurrentIOStatisticsContext()); committer.preCommitJob(commitContext, AbstractS3ACommitter.ActiveCommit.empty()); reset(mockFS); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java index 36857669553..b19662c0117 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS; /** ITest of the low level protocol methods. */ public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol { @@ -51,6 +52,14 @@ public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol { return CommitConstants.COMMITTER_NAME_DIRECTORY; } + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + // turn off stats collection to verify that it works + conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, false); + return conf; + } + @Override protected AbstractS3ACommitter createCommitter( Path outputPath,