diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 16144206eee..7c54b32dc3b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -475,4 +475,18 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { * default hadoop temp dir on local system: {@value}. */ public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; + + /** + * Thread-level IOStats Support. + * {@value} + */ + public static final String THREAD_LEVEL_IOSTATISTICS_ENABLED = + "fs.thread.level.iostatistics.enabled"; + + /** + * Default value for Thread-level IOStats Support is true. + */ + public static final boolean THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT = + true; + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index f525c3cba78..d9ceab9a054 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -57,6 +57,8 @@ import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; @@ -156,11 +158,19 @@ class LocalFSFileInputStream extends FSInputStream implements /** Reference to the bytes read counter for slightly faster counting. */ private final AtomicLong bytesRead; + /** + * Thread level IOStatistics aggregator to update in close(). + */ + private final IOStatisticsAggregator + ioStatisticsAggregator; + public LocalFSFileInputStream(Path f) throws IOException { name = pathToFile(f); fis = new FileInputStream(name); bytesRead = ioStatistics.getCounterReference( STREAM_READ_BYTES); + ioStatisticsAggregator = + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(); } @Override @@ -193,9 +203,13 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Override public void close() throws IOException { - fis.close(); - if (asyncChannel != null) { - asyncChannel.close(); + try { + fis.close(); + if (asyncChannel != null) { + asyncChannel.close(); + } + } finally { + ioStatisticsAggregator.aggregate(ioStatistics); } } @@ -278,6 +292,7 @@ public boolean hasCapability(String capability) { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: case StreamCapabilities.VECTOREDIO: return true; default: @@ -407,9 +422,19 @@ final class LocalFSFileOutputStream extends OutputStream implements STREAM_WRITE_EXCEPTIONS) .build(); + /** + * Thread level IOStatistics aggregator to update in close(). + */ + private final IOStatisticsAggregator + ioStatisticsAggregator; + private LocalFSFileOutputStream(Path f, boolean append, FsPermission permission) throws IOException { File file = pathToFile(f); + // store the aggregator before attempting any IO. + ioStatisticsAggregator = + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(); + if (!append && permission == null) { permission = FsPermission.getFileDefault(); } @@ -436,10 +461,17 @@ private LocalFSFileOutputStream(Path f, boolean append, } /* - * Just forward to the fos + * Close the fos; update the IOStatisticsContext. */ @Override - public void close() throws IOException { fos.close(); } + public void close() throws IOException { + try { + fos.close(); + } finally { + ioStatisticsAggregator.aggregate(ioStatistics); + } + } + @Override public void flush() throws IOException { fos.flush(); } @Override @@ -485,6 +517,7 @@ public boolean hasCapability(String capability) { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: return true; default: return StoreImplementationUtils.isProbeForSyncable(capability); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index d68ef505dc3..c925e50889d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -93,6 +93,12 @@ public interface StreamCapabilities { */ String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM; + /** + * Streams that support IOStatistics context and capture thread-level + * IOStatistics. + */ + String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported"; + /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java index b24bef2a816..16fe0da7c5a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.impl; +import java.lang.ref.WeakReference; import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.Nullable; @@ -48,7 +49,17 @@ public long currentThreadId() { } public V setForCurrentThread(V newVal) { - return put(currentThreadId(), newVal); + long id = currentThreadId(); + + // if the same object is already in the map, just return it. + WeakReference ref = lookup(id); + // Reference value could be set to null. Thus, ref.get() could return + // null. Should be handled accordingly while using the returned value. + if (ref != null && ref.get() == newVal) { + return ref.get(); + } + + return put(id, newVal); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java new file mode 100644 index 00000000000..fb10b93848f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics; + +import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration; + +/** + * An interface defined to capture thread-level IOStatistics by using per + * thread context. + *

+ * The aggregator should be collected in their constructor by statistics-generating + * classes to obtain the aggregator to update across all threads. + *

+ * The {@link #snapshot()} call creates a snapshot of the statistics; + *

+ * The {@link #reset()} call resets the statistics in the context so + * that later snapshots will get the incremental data. + */ +public interface IOStatisticsContext extends IOStatisticsSource { + + /** + * Get the IOStatisticsAggregator for the context. + * + * @return return the aggregator for the context. + */ + IOStatisticsAggregator getAggregator(); + + /** + * Capture the snapshot of the context's IOStatistics. + * + * @return IOStatisticsSnapshot for the context. + */ + IOStatisticsSnapshot snapshot(); + + /** + * Get a unique ID for this context, for logging + * purposes. + * + * @return an ID unique for all contexts in this process. + */ + long getID(); + + /** + * Reset the context's IOStatistics. + */ + void reset(); + + /** + * Get the context's IOStatisticsContext. + * + * @return instance of IOStatisticsContext for the context. + */ + static IOStatisticsContext getCurrentIOStatisticsContext() { + return IOStatisticsContextIntegration.getCurrentIOStatisticsContext(); + } + + /** + * Set the IOStatisticsContext for the current thread. + * @param statisticsContext IOStatistics context instance for the + * current thread. If null, the context is reset. + */ + static void setThreadIOStatisticsContext( + IOStatisticsContext statisticsContext) { + IOStatisticsContextIntegration.setThreadIOStatisticsContext( + statisticsContext); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java new file mode 100644 index 00000000000..b672f6639cb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +/** + * Empty IOStatistics context which serves no-op for all the operations and + * returns an empty Snapshot if asked. + * + */ +final class EmptyIOStatisticsContextImpl implements IOStatisticsContext { + + private static final IOStatisticsContext EMPTY_CONTEXT = new EmptyIOStatisticsContextImpl(); + + private EmptyIOStatisticsContextImpl() { + } + + /** + * Create a new empty snapshot. + * A new one is always created for isolation. + * + * @return a statistics snapshot + */ + @Override + public IOStatisticsSnapshot snapshot() { + return new IOStatisticsSnapshot(); + } + + @Override + public IOStatisticsAggregator getAggregator() { + return EmptyIOStatisticsStore.getInstance(); + } + + @Override + public IOStatistics getIOStatistics() { + return EmptyIOStatistics.getInstance(); + } + + @Override + public void reset() {} + + /** + * The ID is always 0. + * As the real context implementation counter starts at 1, + * we are guaranteed to have unique IDs even between them and + * the empty context. + * @return 0 + */ + @Override + public long getID() { + return 0; + } + + /** + * Get the single instance. + * @return an instance. + */ + static IOStatisticsContext getInstance() { + return EMPTY_CONTEXT; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java new file mode 100644 index 00000000000..97a85281c4f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +/** + * Implementing the IOStatisticsContext. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsSnapshot. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public final class IOStatisticsContextImpl implements IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextImpl.class); + + /** + * Thread ID. + */ + private final long threadId; + + /** + * Unique ID. + */ + private final long id; + + /** + * IOStatistics to aggregate. + */ + private final IOStatisticsSnapshot ioStatistics = new IOStatisticsSnapshot(); + + /** + * Constructor. + * @param threadId thread ID + * @param id instance ID. + */ + public IOStatisticsContextImpl(final long threadId, final long id) { + this.threadId = threadId; + this.id = id; + } + + @Override + public String toString() { + return "IOStatisticsContextImpl{" + + "id=" + id + + ", threadId=" + threadId + + ", ioStatistics=" + ioStatistics + + '}'; + } + + /** + * Get the IOStatisticsAggregator of the context. + * @return the instance of IOStatisticsAggregator for this context. + */ + @Override + public IOStatisticsAggregator getAggregator() { + return ioStatistics; + } + + /** + * Returns a snapshot of the current thread's IOStatistics. + * + * @return IOStatisticsSnapshot of the context. + */ + @Override + public IOStatisticsSnapshot snapshot() { + LOG.debug("Taking snapshot of IOStatisticsContext id {}", id); + return new IOStatisticsSnapshot(ioStatistics); + } + + /** + * Reset the thread +. + */ + @Override + public void reset() { + LOG.debug("clearing IOStatisticsContext id {}", id); + ioStatistics.clear(); + } + + @Override + public IOStatistics getIOStatistics() { + return ioStatistics; + } + + /** + * ID of this context. + * @return ID. + */ + @Override + public long getID() { + return id; + } + + /** + * Get the thread ID. + * @return thread ID. + */ + public long getThreadID() { + return threadId; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java new file mode 100644 index 00000000000..483d1e4570f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.lang.ref.WeakReference; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * A Utility class for IOStatisticsContext, which helps in creating and + * getting the current active context. Static methods in this class allows to + * get the current context to start aggregating the IOStatistics. + * + * Static initializer is used to work out if the feature to collect + * thread-level IOStatistics is enabled or not and the corresponding + * implementation class is called for it. + * + * Weak Reference thread map to be used to keep track of different context's + * to avoid long-lived memory leakages as these references would be cleaned + * up at GC. + */ +public final class IOStatisticsContextIntegration { + + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextIntegration.class); + + /** + * Is thread-level IO Statistics enabled? + */ + private static boolean isThreadIOStatsEnabled; + + /** + * ID for next instance to create. + */ + public static final AtomicLong INSTANCE_ID = new AtomicLong(1); + + /** + * Active IOStatistics Context containing different worker thread's + * statistics. Weak Reference so that it gets cleaned up during GC and we + * avoid any memory leak issues due to long lived references. + */ + private static final WeakReferenceThreadMap + ACTIVE_IOSTATS_CONTEXT = + new WeakReferenceThreadMap<>( + IOStatisticsContextIntegration::createNewInstance, + IOStatisticsContextIntegration::referenceLostContext + ); + + static { + // Work out if the current context has thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + isThreadIOStatsEnabled = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Private constructor for a utility class to be used in IOStatisticsContext. + */ + private IOStatisticsContextIntegration() {} + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. + * @param key Thread ID that represents which thread the context belongs to. + * @return an instance of IOStatisticsContext. + */ + private static IOStatisticsContext createNewInstance(Long key) { + return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement()); + } + + /** + * In case of reference loss for IOStatisticsContext. + * @param key ThreadID. + */ + private static void referenceLostContext(Long key) { + LOG.debug("Reference lost for threadID for the context: {}", key); + } + + /** + * Get the current thread's IOStatisticsContext instance. If no instance is + * present for this thread ID, create one using the factory. + * @return instance of IOStatisticsContext. + */ + public static IOStatisticsContext getCurrentIOStatisticsContext() { + return isThreadIOStatsEnabled + ? ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() + : EmptyIOStatisticsContextImpl.getInstance(); + } + + /** + * Set the IOStatisticsContext for the current thread. + * @param statisticsContext IOStatistics context instance for the + * current thread. If null, the context is reset. + */ + public static void setThreadIOStatisticsContext( + IOStatisticsContext statisticsContext) { + if (isThreadIOStatsEnabled) { + if (statisticsContext == null) { + ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread(); + } + if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) { + ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext); + } + } + } + + /** + * Get thread ID specific IOStatistics values if + * statistics are enabled and the thread ID is in the map. + * @param testThreadId thread ID. + * @return IOStatisticsContext if found in the map. + */ + @VisibleForTesting + public static IOStatisticsContext getThreadSpecificIOStatisticsContext(long testThreadId) { + LOG.debug("IOStatsContext thread ID required: {}", testThreadId); + + if (!isThreadIOStatsEnabled) { + return null; + } + // lookup the weakRef IOStatisticsContext for the thread ID in the + // ThreadMap. + WeakReference ioStatisticsSnapshotWeakReference = + ACTIVE_IOSTATS_CONTEXT.lookup(testThreadId); + if (ioStatisticsSnapshotWeakReference != null) { + return ioStatisticsSnapshotWeakReference.get(); + } + return null; + } + + /** + * A method to enable IOStatisticsContext to override if set otherwise in + * the configurations for tests. + */ + @VisibleForTesting + public static void enableIOStatisticsContext() { + if (!isThreadIOStatsEnabled) { + LOG.info("Enabling Thread IOStatistics.."); + isThreadIOStatsEnabled = true; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java index 0abaab211de..c9e6d0b78ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java @@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable; @@ -136,6 +137,15 @@ public static class Builder { private boolean stopAbortsOnFailure = false; private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION; + /** + * IOStatisticsContext to switch to in all threads + * taking part in the commit operation. + * This ensures that the IOStatistics collected in the + * worker threads will be aggregated into the total statistics + * of the thread calling the committer commit/abort methods. + */ + private IOStatisticsContext ioStatisticsContext = null; + /** * Create the builder. * @param items items to process @@ -242,7 +252,7 @@ public Builder stopAbortsOnFailure() { * @param value new value * @return the builder */ - public Builder sleepInterval(final int value) { + public Builder sleepInterval(final int value) { sleepInterval = value; return this; } @@ -364,6 +374,8 @@ private boolean runSingleThreaded(Task task) /** * Parallel execution. + * All tasks run within the same IOStatisticsContext as the + * thread calling this method. * @param task task to execute * @param exception which may be raised in execution. * @return true if the operation executed successfully @@ -379,64 +391,70 @@ private boolean runParallel(final Task task) final AtomicBoolean revertFailed = new AtomicBoolean(false); List> futures = new ArrayList<>(); + ioStatisticsContext = IOStatisticsContext.getCurrentIOStatisticsContext(); IOException iteratorIOE = null; final RemoteIterator iterator = this.items; try { - while(iterator.hasNext()) { + while (iterator.hasNext()) { final I item = iterator.next(); // submit a task for each item that will either run or abort the task futures.add(service.submit(() -> { - if (!(stopOnFailure && taskFailed.get())) { - // run the task - boolean threw = true; - try { - LOG.debug("Executing task"); - task.run(item); - succeeded.add(item); - LOG.debug("Task succeeded"); + setStatisticsContext(); + try { + if (!(stopOnFailure && taskFailed.get())) { + // prepare and run the task + boolean threw = true; + try { + LOG.debug("Executing task"); + task.run(item); + succeeded.add(item); + LOG.debug("Task succeeded"); - threw = false; + threw = false; - } catch (Exception e) { - taskFailed.set(true); - exceptions.add(e); - LOG.info("Task failed {}", e.toString()); - LOG.debug("Task failed", e); + } catch (Exception e) { + taskFailed.set(true); + exceptions.add(e); + LOG.info("Task failed {}", e.toString()); + LOG.debug("Task failed", e); - if (onFailure != null) { - try { - onFailure.run(item, e); - } catch (Exception failException) { - LOG.warn("Failed to clean up on failure", e); - // swallow the exception + if (onFailure != null) { + try { + onFailure.run(item, e); + } catch (Exception failException) { + LOG.warn("Failed to clean up on failure", e); + // swallow the exception + } + } + } finally { + if (threw) { + taskFailed.set(true); } } - } finally { - if (threw) { - taskFailed.set(true); - } - } - - } else if (abortTask != null) { - // abort the task instead of running it - if (stopAbortsOnFailure && abortFailed.get()) { - return; - } - - boolean failed = true; - try { - LOG.info("Aborting task"); - abortTask.run(item); - failed = false; - } catch (Exception e) { - LOG.error("Failed to abort task", e); - // swallow the exception - } finally { - if (failed) { - abortFailed.set(true); + + } else if (abortTask != null) { + // abort the task instead of running it + if (stopAbortsOnFailure && abortFailed.get()) { + return; + } + + boolean failed = true; + try { + LOG.info("Aborting task"); + abortTask.run(item); + failed = false; + } catch (Exception e) { + LOG.error("Failed to abort task", e); + // swallow the exception + } finally { + if (failed) { + abortFailed.set(true); + } } } + } finally { + resetStatisticsContext(); } })); } @@ -447,7 +465,6 @@ private boolean runParallel(final Task task) // mark as a task failure so all submitted tasks will halt/abort taskFailed.set(true); } - // let the above tasks complete (or abort) waitFor(futures, sleepInterval); int futureCount = futures.size(); @@ -464,6 +481,7 @@ private boolean runParallel(final Task task) } boolean failed = true; + setStatisticsContext(); try { revertTask.run(item); failed = false; @@ -474,6 +492,7 @@ private boolean runParallel(final Task task) if (failed) { revertFailed.set(true); } + resetStatisticsContext(); } })); } @@ -498,6 +517,26 @@ private boolean runParallel(final Task task) // return true if all tasks succeeded. return !taskFailed.get(); } + + /** + * Set the statistics context for this thread. + */ + private void setStatisticsContext() { + if (ioStatisticsContext != null) { + IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); + } + } + + /** + * Reset the statistics context if it was set earlier. + * This unbinds the current thread from any statistics + * context. + */ + private void resetStatisticsContext() { + if (ioStatisticsContext != null) { + IOStatisticsContext.setThreadIOStatisticsContext(null); + } + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index a1dd4d8df02..6c39cc4b642 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -31,7 +31,9 @@ import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.util.functional.RemoteIterators; @@ -333,7 +335,7 @@ interface FileStatusAcceptor { * Thread safety: None. */ class FileStatusListingIterator - implements RemoteIterator, IOStatisticsSource { + implements RemoteIterator, IOStatisticsSource, Closeable { /** Source of objects. */ private final ObjectListingIterator source; @@ -403,6 +405,14 @@ public S3AFileStatus next() throws IOException { return status; } + /** + * Close, if called, will update + * the thread statistics context with the value. + */ + @Override + public void close() { + source.close(); + } /** * Try to retrieve another batch. * Note that for the initial batch, @@ -545,6 +555,11 @@ class ObjectListingIterator implements RemoteIterator, private final AuditSpan span; + /** + * Context statistics aggregator. + */ + private final IOStatisticsAggregator aggregator; + /** The most recent listing results. */ private S3ListResult objects; @@ -601,6 +616,8 @@ class ObjectListingIterator implements RemoteIterator, this.span = span; this.s3ListResultFuture = listingOperationCallbacks .listObjectsAsync(request, iostats, span); + this.aggregator = IOStatisticsContext.getCurrentIOStatisticsContext() + .getAggregator(); } /** @@ -693,11 +710,12 @@ public int getListingCount() { } /** - * Close, if actually called, will close the span - * this listing was created with. + * Close, if called, will update + * the thread statistics context with the value. */ @Override public void close() { + aggregator.aggregate(getIOStatistics()); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 8b1865c77c9..19943ff2f70 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -41,6 +41,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; @@ -165,6 +166,9 @@ class S3ABlockOutputStream extends OutputStream implements /** is client side encryption enabled? */ private final boolean isCSEEnabled; + /** Thread level IOStatistics Aggregator. */ + private final IOStatisticsAggregator threadIOStatisticsAggregator; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -201,6 +205,7 @@ class S3ABlockOutputStream extends OutputStream implements initMultipartUpload(); } this.isCSEEnabled = builder.isCSEEnabled; + this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; } /** @@ -454,11 +459,23 @@ public void close() throws IOException { */ private synchronized void cleanupOnClose() { cleanupWithLogger(LOG, getActiveBlock(), blockFactory); + mergeThreadIOStatistics(statistics.getIOStatistics()); LOG.debug("Statistics: {}", statistics); cleanupWithLogger(LOG, statistics); clearActiveBlock(); } + /** + * Merging the current thread's IOStatistics with the current IOStatistics + * context. + * + * @param streamStatistics Stream statistics to be merged into thread + * statistics aggregator. + */ + private void mergeThreadIOStatistics(IOStatistics streamStatistics) { + getThreadIOStatistics().aggregate(streamStatistics); + } + /** * Best effort abort of the multipart upload; sets * the field to null afterwards. @@ -662,6 +679,10 @@ public boolean hasCapability(String capability) { case StreamCapabilities.ABORTABLE_STREAM: return true; + // IOStatistics context support for thread-level IOStatistics. + case StreamCapabilities.IOSTATISTICS_CONTEXT: + return true; + default: return false; } @@ -701,6 +722,14 @@ public IOStatistics getIOStatistics() { return iostatistics; } + /** + * Get the IOStatistics aggregator passed in the builder. + * @return an aggregator + */ + protected IOStatisticsAggregator getThreadIOStatistics() { + return threadIOStatisticsAggregator; + } + /** * Multiple partition upload. */ @@ -1092,6 +1121,11 @@ public static final class BlockOutputStreamBuilder { */ private PutObjectOptions putOptions; + /** + * thread-level IOStatistics Aggregator. + */ + private IOStatisticsAggregator ioStatisticsAggregator; + private BlockOutputStreamBuilder() { } @@ -1108,6 +1142,7 @@ public void validate() { requireNonNull(putOptions, "null putOptions"); Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, "Block size is too small: %s", blockSize); + requireNonNull(ioStatisticsAggregator, "null ioStatisticsAggregator"); } /** @@ -1229,5 +1264,17 @@ public BlockOutputStreamBuilder withPutOptions( putOptions = value; return this; } + + /** + * Set builder value. + * + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withIOStatisticsAggregator( + final IOStatisticsAggregator value) { + ioStatisticsAggregator = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 406f40bff95..c49c368bbbe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -131,6 +131,7 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.store.audit.AuditEntryPoint; import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource; @@ -1576,7 +1577,8 @@ protected S3AReadOpContext createReadContext( statistics, statisticsContext, fileStatus, - vectoredIOContext) + vectoredIOContext, + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) .withAuditSpan(auditSpan); openFileHelper.applyDefaultOptions(roc); return roc.build(); @@ -1743,7 +1745,9 @@ private FSDataOutputStream innerCreateFile( DOWNGRADE_SYNCABLE_EXCEPTIONS, DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT)) .withCSEEnabled(isCSEEnabled) - .withPutOptions(putOptions); + .withPutOptions(putOptions) + .withIOStatisticsAggregator( + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3069f172891..178a807733a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; @@ -53,9 +54,9 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.functional.CallableRaisingIOE; import static java.util.Objects.requireNonNull; @@ -187,6 +188,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private long asyncDrainThreshold; + /** Aggregator used to aggregate per thread IOStatistics. */ + private final IOStatisticsAggregator threadIOStatistics; + /** * Create the stream. * This does not attempt to open it; that is only done on the first @@ -225,6 +229,7 @@ public S3AInputStream(S3AReadOpContext ctx, this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); this.unboundedThreadPool = unboundedThreadPool; this.vectoredIOContext = context.getVectoredIOContext(); + this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator()); } /** @@ -600,7 +605,6 @@ public synchronized void close() throws IOException { stopVectoredIOOperations.set(true); // close or abort the stream; blocking awaitFuture(closeStream("close() operation", false, true)); - LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); // end the client+audit span. client.close(); // this is actually a no-op @@ -608,10 +612,23 @@ public synchronized void close() throws IOException { } finally { // merge the statistics back into the FS statistics. streamStatistics.close(); + // Collect ThreadLevel IOStats + mergeThreadIOStatistics(streamStatistics.getIOStatistics()); } } } + /** + * Merging the current thread's IOStatistics with the current IOStatistics + * context. + * + * @param streamIOStats Stream statistics to be merged into thread + * statistics aggregator. + */ + private void mergeThreadIOStatistics(IOStatistics streamIOStats) { + threadIOStatistics.aggregate(streamIOStats); + } + /** * Close a stream: decide whether to abort or close, based on * the length of the stream and the current position. @@ -1331,6 +1348,7 @@ public synchronized void unbuffer() { public boolean hasCapability(String capability) { switch (toLowerCase(capability)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: case StreamCapabilities.READAHEAD: case StreamCapabilities.UNBUFFER: case StreamCapabilities.VECTOREDIO: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index 803b7757d25..bbd86ef5ac2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.store.audit.AuditSpan; import javax.annotation.Nullable; @@ -70,6 +71,9 @@ public class S3AReadOpContext extends S3AOpContext { */ private final VectoredIOContext vectoredIOContext; + /** Thread-level IOStatistics aggregator. **/ + private final IOStatisticsAggregator ioStatisticsAggregator; + /** * Instantiate. * @param path path of read @@ -78,6 +82,7 @@ public class S3AReadOpContext extends S3AOpContext { * @param instrumentation statistics context * @param dstFileStatus target file status * @param vectoredIOContext context for vectored read operation. + * @param ioStatisticsAggregator IOStatistics aggregator for each thread. */ public S3AReadOpContext( final Path path, @@ -85,11 +90,13 @@ public S3AReadOpContext( @Nullable FileSystem.Statistics stats, S3AStatisticsContext instrumentation, FileStatus dstFileStatus, - VectoredIOContext vectoredIOContext) { + VectoredIOContext vectoredIOContext, + IOStatisticsAggregator ioStatisticsAggregator) { super(invoker, stats, instrumentation, dstFileStatus); this.path = requireNonNull(path); this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext"); + this.ioStatisticsAggregator = ioStatisticsAggregator; } /** @@ -105,6 +112,7 @@ public S3AReadOpContext build() { "invalid readahead %d", readahead); Preconditions.checkArgument(asyncDrainThreshold >= 0, "invalid drainThreshold %d", asyncDrainThreshold); + requireNonNull(ioStatisticsAggregator, "ioStatisticsAggregator"); return this; } @@ -215,6 +223,15 @@ public VectoredIOContext getVectoredIOContext() { return vectoredIOContext; } + /** + * Return the IOStatistics aggregator. + * + * @return instance of IOStatisticsAggregator. + */ + public IOStatisticsAggregator getIOStatisticsAggregator() { + return ioStatisticsAggregator; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index 78b687cc6f1..d6044edde29 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.audit.AuditConstants; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -467,8 +468,7 @@ public void recoverTask(TaskAttemptContext taskContext) throws IOException { * * While the classic committers create a 0-byte file, the S3A committers * PUT up a the contents of a {@link SuccessData} file. - * - * @param context job context + * @param commitContext commit context * @param pending the pending commits * * @return the success data, even if the marker wasn't created @@ -476,7 +476,7 @@ public void recoverTask(TaskAttemptContext taskContext) throws IOException { * @throws IOException IO failure */ protected SuccessData maybeCreateSuccessMarkerFromCommits( - JobContext context, + final CommitContext commitContext, ActiveCommit pending) throws IOException { List filenames = new ArrayList<>(pending.size()); // The list of committed objects in pending is size limited in @@ -488,7 +488,13 @@ protected SuccessData maybeCreateSuccessMarkerFromCommits( // and the current statistics snapshot.aggregate(getIOStatistics()); - return maybeCreateSuccessMarker(context, filenames, snapshot); + // and include the context statistics if enabled + if (commitContext.isCollectIOStatistics()) { + snapshot.aggregate(commitContext.getIOStatisticsContext() + .getIOStatistics()); + } + + return maybeCreateSuccessMarker(commitContext.getJobContext(), filenames, snapshot); } /** @@ -729,6 +735,7 @@ private void loadAndCommit( final FileStatus status) throws IOException { final Path path = status.getPath(); + commitContext.switchToIOStatisticsContext(); try (DurationInfo ignored = new DurationInfo(LOG, "Loading and committing files in pendingset %s", path)) { @@ -775,6 +782,7 @@ private void loadAndRevert( final FileStatus status) throws IOException { final Path path = status.getPath(); + commitContext.switchToIOStatisticsContext(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Committing %s", path)) { PendingSet pendingSet = PersistentCommitData.load( @@ -806,6 +814,7 @@ private void loadAndAbort( final boolean deleteRemoteFiles) throws IOException { final Path path = status.getPath(); + commitContext.switchToIOStatisticsContext(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Aborting %s", path)) { PendingSet pendingSet = PersistentCommitData.load( @@ -832,6 +841,8 @@ private void loadAndAbort( /** * Start the final job commit/abort commit operations. + * If configured to collect statistics, + * The IO StatisticsContext is reset. * @param context job context * @return a commit context through which the operations can be invoked. * @throws IOException failure. @@ -840,14 +851,22 @@ protected CommitContext initiateJobOperation( final JobContext context) throws IOException { - return getCommitOperations().createCommitContext( + IOStatisticsContext ioStatisticsContext = + IOStatisticsContext.getCurrentIOStatisticsContext(); + CommitContext commitContext = getCommitOperations().createCommitContext( context, getOutputPath(), - getJobCommitThreadCount(context)); + getJobCommitThreadCount(context), + ioStatisticsContext); + commitContext.maybeResetIOStatisticsContext(); + return commitContext; } + /** * Start a ask commit/abort commit operations. * This may have a different thread count. + * If configured to collect statistics, + * The IO StatisticsContext is reset. * @param context job or task context * @return a commit context through which the operations can be invoked. * @throws IOException failure. @@ -856,10 +875,13 @@ protected CommitContext initiateTaskOperation( final JobContext context) throws IOException { - return getCommitOperations().createCommitContext( + CommitContext commitContext = getCommitOperations().createCommitContext( context, getOutputPath(), - getTaskCommitThreadCount(context)); + getTaskCommitThreadCount(context), + IOStatisticsContext.getCurrentIOStatisticsContext()); + commitContext.maybeResetIOStatisticsContext(); + return commitContext; } /** @@ -1014,7 +1036,7 @@ public void commitJob(JobContext context) throws IOException { stage = "completed"; jobCompleted(true); stage = "marker"; - successData = maybeCreateSuccessMarkerFromCommits(context, pending); + successData = maybeCreateSuccessMarkerFromCommits(commitContext, pending); stage = "cleanup"; cleanup(commitContext, false); } catch (IOException e) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index b85ce276504..6e2a5d8c9fb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -354,4 +354,24 @@ private CommitConstants() { public static final String OPT_SUMMARY_REPORT_DIR = OPT_PREFIX + "summary.report.directory"; + /** + * Experimental feature to collect thread level IO statistics. + * When set the committers will reset the statistics in + * task setup and propagate to the job committer. + * The job comitter will include those and its own statistics. + * Do not use if the execution engine is collecting statistics, + * as the multiple reset() operations will result in incomplete + * statistics. + * Value: {@value}. + */ + public static final String S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS = + OPT_PREFIX + "experimental.collect.iostatistics"; + + /** + * Default value for {@link #S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS}. + * Value: {@value}. + */ + public static final boolean S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT = + false; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java index 8bff165f2e8..8ac3dcb231d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -47,6 +49,8 @@ import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS; import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.THREAD_PREFIX; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.THREAD_KEEP_ALIVE_TIME; /** @@ -123,24 +127,43 @@ public final class CommitContext implements Closeable { */ private final int committerThreads; + /** + * Should IOStatistics be collected by the committer? + */ + private final boolean collectIOStatistics; + + /** + * IOStatisticsContext to switch to in all threads + * taking part in the commit operation. + * This ensures that the IOStatistics collected in the + * worker threads will be aggregated into the total statistics + * of the thread calling the committer commit/abort methods. + */ + private final IOStatisticsContext ioStatisticsContext; + /** * Create. * @param commitOperations commit callbacks * @param jobContext job context * @param committerThreads number of commit threads + * @param ioStatisticsContext IOStatistics context of current thread */ public CommitContext( final CommitOperations commitOperations, final JobContext jobContext, - final int committerThreads) { + final int committerThreads, + final IOStatisticsContext ioStatisticsContext) { this.commitOperations = commitOperations; this.jobContext = jobContext; this.conf = jobContext.getConfiguration(); this.jobId = jobContext.getJobID().toString(); + this.collectIOStatistics = conf.getBoolean( + S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, + S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT); + this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext); this.auditContextUpdater = new AuditContextUpdater(jobContext); this.auditContextUpdater.updateCurrentAuditContext(); this.committerThreads = committerThreads; - buildSubmitters(); } @@ -152,15 +175,19 @@ public CommitContext( * @param conf job conf * @param jobId ID * @param committerThreads number of commit threads + * @param ioStatisticsContext IOStatistics context of current thread */ public CommitContext(final CommitOperations commitOperations, final Configuration conf, final String jobId, - final int committerThreads) { + final int committerThreads, + final IOStatisticsContext ioStatisticsContext) { this.commitOperations = commitOperations; this.jobContext = null; this.conf = conf; this.jobId = jobId; + this.collectIOStatistics = false; + this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext); this.auditContextUpdater = new AuditContextUpdater(jobId); this.auditContextUpdater.updateCurrentAuditContext(); this.committerThreads = committerThreads; @@ -358,6 +385,44 @@ public String getJobId() { return jobId; } + /** + * Collecting thread level IO statistics? + * @return true if thread level IO stats should be collected. + */ + public boolean isCollectIOStatistics() { + return collectIOStatistics; + } + + /** + * IOStatistics context of the created thread. + * @return the IOStatistics. + */ + public IOStatisticsContext getIOStatisticsContext() { + return ioStatisticsContext; + } + + /** + * Switch to the context IOStatistics context, + * if needed. + */ + public void switchToIOStatisticsContext() { + IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); + } + + /** + * Reset the IOStatistics context if statistics are being + * collected. + * Logs at info. + */ + public void maybeResetIOStatisticsContext() { + if (collectIOStatistics) { + + LOG.info("Resetting IO statistics context {}", + ioStatisticsContext.getID()); + ioStatisticsContext.reset(); + } + } + /** * Submitter for a given thread pool. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index 0772e143f69..eb23f299a02 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Preconditions; @@ -639,15 +640,18 @@ public void jobCompleted(boolean success) { * @param context job context * @param path path for all work. * @param committerThreads thread pool size + * @param ioStatisticsContext IOStatistics context of current thread * @return the commit context to pass in. * @throws IOException failure. */ public CommitContext createCommitContext( JobContext context, Path path, - int committerThreads) throws IOException { + int committerThreads, + IOStatisticsContext ioStatisticsContext) throws IOException { return new CommitContext(this, context, - committerThreads); + committerThreads, + ioStatisticsContext); } /** @@ -668,7 +672,8 @@ public CommitContext createCommitContextForTesting( return new CommitContext(this, getStoreContext().getConfiguration(), id, - committerThreads); + committerThreads, + IOStatisticsContext.getCurrentIOStatisticsContext()); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 007e9b37096..9ded64eedc0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -219,6 +219,13 @@ private PendingSet innerCommitTask( } pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId); pendingSet.setJobId(jobId); + // add in the IOStatistics of all the file loading + if (commitContext.isCollectIOStatistics()) { + pendingSet.getIOStatistics() + .aggregate( + commitContext.getIOStatisticsContext().getIOStatistics()); + } + Path jobAttemptPath = getJobAttemptPath(context); TaskAttemptID taskAttemptID = context.getTaskAttemptID(); Path taskOutcomePath = new Path(jobAttemptPath, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 36eae012dea..d764055c45b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -696,6 +696,13 @@ protected int commitTaskInternal(final TaskAttemptContext context, pendingCommits.add(commit); } + // maybe add in the IOStatistics the thread + if (commitContext.isCollectIOStatistics()) { + pendingCommits.getIOStatistics().aggregate( + commitContext.getIOStatisticsContext() + .getIOStatistics()); + } + // save the data // overwrite any existing file, so whichever task attempt // committed last wins. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java index 213f94432c4..e90ad8b73ef 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.s3a.tools.MarkerTool; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.io.IOUtils; @@ -38,6 +39,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; @@ -66,6 +68,15 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase */ private AuditSpanSource spanSource; + /** + * Atomic references to be used to re-throw an Exception or an ASE + * caught inside a lambda function. + */ + private static final AtomicReference FUTURE_EXCEPTION = + new AtomicReference<>(); + private static final AtomicReference FUTURE_ASE = + new AtomicReference<>(); + /** * Get the source. * @return span source @@ -99,6 +110,9 @@ public void setup() throws Exception { S3AFileSystem.initializeClass(); super.setup(); setSpanSource(getFileSystem()); + // Reset the current context's thread IOStatistics.` + // this ensures that the context stats will always be from the test case + IOStatisticsContext.getCurrentIOStatisticsContext().reset(); } @Override @@ -263,4 +277,53 @@ public void skipIfClientSideEncryption() { Assume.assumeTrue("Skipping test if CSE is enabled", !getFileSystem().isCSEEnabled()); } + + /** + * If an exception is caught while doing some work in a Lambda function, + * store it in an atomic reference to be thrown later on. + * @param exception Exception caught. + */ + public static void setFutureException(Exception exception) { + FUTURE_EXCEPTION.set(exception); + } + + /** + * If an Assertion is caught while doing some work in a Lambda function, + * store it in an atomic reference to be thrown later on. + * + * @param ase Assertion Error caught. + */ + public static void setFutureAse(AssertionError ase) { + FUTURE_ASE.set(ase); + } + + /** + * throw the caught exception from the atomic reference and also clear the + * atomic reference so that we don't rethrow in another test. + * + * @throws Exception the exception caught. + */ + public static void maybeReThrowFutureException() throws Exception { + if (FUTURE_EXCEPTION.get() != null) { + Exception exceptionToThrow = FUTURE_EXCEPTION.get(); + // reset the atomic ref before throwing. + setFutureAse(null); + throw exceptionToThrow; + } + } + + /** + * throw the Assertion error from the atomic reference and also clear the + * atomic reference so that we don't rethrow in another test. + * + * @throws Exception Assertion error caught. + */ + public static void maybeReThrowFutureASE() throws Exception { + if (FUTURE_ASE.get() != null) { + AssertionError aseToThrow = FUTURE_ASE.get(); + // reset the atomic ref before throwing. + setFutureAse(null); + throw aseToThrow; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java new file mode 100644 index 00000000000..19c40c64661 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextImpl; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter; +import org.apache.hadoop.util.functional.TaskPool; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.enableIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getCurrentIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getThreadSpecificIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.setThreadIOStatisticsContext; +import static org.apache.hadoop.util.functional.RemoteIterators.foreach; + +/** + * Tests to verify the Thread-level IOStatistics. + */ +public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase { + + private static final int SMALL_THREADS = 2; + private static final int BYTES_BIG = 100; + private static final int BYTES_SMALL = 50; + private static final String[] IOSTATISTICS_CONTEXT_CAPABILITY = + new String[] {StreamCapabilities.IOSTATISTICS_CONTEXT}; + private ExecutorService executor; + + @Override + protected Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + enableIOStatisticsContext(); + return configuration; + } + + @Override + public void setup() throws Exception { + super.setup(); + executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + } + + @Override + public void teardown() throws Exception { + if (executor != null) { + executor.shutdown(); + } + super.teardown(); + } + + /** + * Verify that S3AInputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3AInputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = dataset(256, 'a', 'z'); + byte[] readDataFirst = new byte[BYTES_BIG]; + byte[] readDataSecond = new byte[BYTES_SMALL]; + writeDataset(fs, path, data, data.length, 1024, true); + + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + + for (int i = 0; i < SMALL_THREADS; i++) { + executor.submit(() -> { + try { + // get the thread context and reset + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + try (FSDataInputStream in = fs.open(path)) { + // Assert the InputStream's stream capability to support + // IOStatisticsContext. + assertCapabilities(in, IOSTATISTICS_CONTEXT_CAPABILITY, null); + in.seek(50); + in.read(readDataFirst); + } + assertContextBytesRead(context, BYTES_BIG); + // Stream is closed for a thread. Re-open and do more operations. + try (FSDataInputStream in = fs.open(path)) { + in.seek(100); + in.read(readDataSecond); + } + assertContextBytesRead(context, BYTES_BIG + BYTES_SMALL); + + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executor.shutdown(); + } + + // Check if an Exception or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + + } + + /** + * get the thread context and reset. + * @return thread context + */ + private static IOStatisticsContext getAndResetThreadStatisticsContext() { + IOStatisticsContext context = + IOStatisticsContext.getCurrentIOStatisticsContext(); + context.reset(); + return context; + } + + /** + * Verify that S3ABlockOutputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3ABlockOutputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] writeDataFirst = new byte[BYTES_BIG]; + byte[] writeDataSecond = new byte[BYTES_SMALL]; + + final ExecutorService executorService = + HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + for (int i = 0; i < SMALL_THREADS; i++) { + executorService.submit(() -> { + try { + // get the thread context and reset + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + try (FSDataOutputStream out = fs.create(path)) { + // Assert the OutputStream's stream capability to support + // IOStatisticsContext. + assertCapabilities(out, IOSTATISTICS_CONTEXT_CAPABILITY, null); + out.write(writeDataFirst); + } + assertContextBytesWrite(context, BYTES_BIG); + + // Stream is closed for a thread. Re-open and do more operations. + try (FSDataOutputStream out = fs.create(path)) { + out.write(writeDataSecond); + } + assertContextBytesWrite(context, BYTES_BIG + BYTES_SMALL); + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executorService.shutdown(); + } + + // Check if an Excp or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + } + + /** + * Verify stats collection and aggregation for constructor thread, Junit + * thread and a worker thread. + */ + @Test + public void testThreadIOStatisticsForDifferentThreads() + throws IOException, InterruptedException { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = new byte[BYTES_BIG]; + long threadIdForTest = Thread.currentThread().getId(); + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + Assertions.assertThat(((IOStatisticsContextImpl)context).getThreadID()) + .describedAs("Thread ID of %s", context) + .isEqualTo(threadIdForTest); + Assertions.assertThat(((IOStatisticsContextImpl)context).getID()) + .describedAs("ID of %s", context) + .isGreaterThan(0); + + // Write in the Junit thread. + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + } + + // Read in the Junit thread. + try (FSDataInputStream in = fs.open(path)) { + in.read(data); + } + + // Worker thread work and wait for it to finish. + TestWorkerThread workerThread = new TestWorkerThread(path, null); + long workerThreadID = workerThread.getId(); + workerThread.start(); + workerThread.join(); + + assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG); + assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL); + } + + /** + * Verify stats collection and aggregation for constructor thread, Junit + * thread and a worker thread. + */ + @Test + public void testThreadSharingIOStatistics() + throws IOException, InterruptedException { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = new byte[BYTES_BIG]; + long threadIdForTest = Thread.currentThread().getId(); + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + + + // Write in the Junit thread. + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + } + + // Read in the Junit thread. + try (FSDataInputStream in = fs.open(path)) { + in.read(data); + } + + // Worker thread will share the same context. + TestWorkerThread workerThread = new TestWorkerThread(path, context); + long workerThreadID = workerThread.getId(); + workerThread.start(); + workerThread.join(); + + assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG + BYTES_SMALL); + + } + + /** + * Test to verify if setting the current IOStatisticsContext removes the + * current context and creates a new instance of it. + */ + @Test + public void testSettingNullIOStatisticsContext() { + IOStatisticsContext ioStatisticsContextBefore = + getCurrentIOStatisticsContext(); + // Set the current IOStatisticsContext to null, which should remove the + // context and set a new one. + setThreadIOStatisticsContext(null); + // Get the context again after setting. + IOStatisticsContext ioStatisticsContextAfter = + getCurrentIOStatisticsContext(); + //Verify the context ID after setting to null is different than the previous + // one. + Assertions.assertThat(ioStatisticsContextBefore.getID()) + .describedAs("A new IOStaticsContext should be set after setting the " + + "current to null") + .isNotEqualTo(ioStatisticsContextAfter.getID()); + } + + /** + * Assert bytes written by the statistics context. + * + * @param context statistics context. + * @param bytes expected bytes. + */ + private void assertContextBytesWrite(IOStatisticsContext context, + int bytes) { + verifyStatisticCounterValue( + context.getIOStatistics(), + STREAM_WRITE_BYTES, + bytes); + } + + /** + * Assert bytes read by the statistics context. + * + * @param context statistics context. + * @param readBytes expected bytes. + */ + private void assertContextBytesRead(IOStatisticsContext context, + int readBytes) { + verifyStatisticCounterValue( + context.getIOStatistics(), + STREAM_READ_BYTES, + readBytes); + } + + /** + * Assert fixed bytes wrote and read for a particular thread ID. + * + * @param testThreadId thread ID. + * @param expectedBytesWrittenAndRead expected bytes. + */ + private void assertThreadStatisticsForThread(long testThreadId, + int expectedBytesWrittenAndRead) { + LOG.info("Thread ID to be asserted: {}", testThreadId); + IOStatisticsContext ioStatisticsContext = + getThreadSpecificIOStatisticsContext(testThreadId); + Assertions.assertThat(ioStatisticsContext) + .describedAs("IOStatisticsContext for %d", testThreadId) + .isNotNull(); + + + IOStatistics ioStatistics = ioStatisticsContext.snapshot(); + + + assertThatStatisticCounter(ioStatistics, + STREAM_WRITE_BYTES) + .describedAs("Bytes written are not as expected for thread : %s", + testThreadId) + .isEqualTo(expectedBytesWrittenAndRead); + + assertThatStatisticCounter(ioStatistics, + STREAM_READ_BYTES) + .describedAs("Bytes read are not as expected for thread : %s", + testThreadId) + .isEqualTo(expectedBytesWrittenAndRead); + } + + @Test + public void testListingStatisticsContext() throws Throwable { + describe("verify the list operations update on close()"); + + S3AFileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(methodPath()); + + // after all setup, get the reset context + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + IOStatistics ioStatistics = context.getIOStatistics(); + + fs.listStatus(path); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + context.reset(); + foreach(fs.listStatusIterator(path), i -> {}); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + context.reset(); + foreach(fs.listLocatedStatus(path), i -> {}); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + context.reset(); + foreach(fs.listFiles(path, true), i -> {}); + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + } + + @Test + public void testListingThroughTaskPool() throws Throwable { + describe("verify the list operations are updated through taskpool"); + + S3AFileSystem fs = getFileSystem(); + Path path = methodPath(); + fs.mkdirs(methodPath()); + + // after all setup, get the reset context + IOStatisticsContext context = + getAndResetThreadStatisticsContext(); + IOStatistics ioStatistics = context.getIOStatistics(); + + CloseableTaskPoolSubmitter submitter = + new CloseableTaskPoolSubmitter(executor); + TaskPool.foreach(fs.listStatusIterator(path)) + .executeWith(submitter) + .run(i -> {}); + + verifyStatisticCounterValue(ioStatistics, + StoreStatisticNames.OBJECT_LIST_REQUEST, + 1); + + } + + /** + * Simulating doing some work in a separate thread. + * If constructed with an IOStatisticsContext then + * that context is switched to before performing the IO. + */ + private class TestWorkerThread extends Thread implements Runnable { + private final Path workerThreadPath; + + private final IOStatisticsContext ioStatisticsContext; + + /** + * create. + * @param workerThreadPath thread path. + * @param ioStatisticsContext optional statistics context * + */ + TestWorkerThread( + final Path workerThreadPath, + final IOStatisticsContext ioStatisticsContext) { + this.workerThreadPath = workerThreadPath; + this.ioStatisticsContext = ioStatisticsContext; + } + + @Override + public void run() { + S3AFileSystem fs = getFileSystem(); + byte[] data = new byte[BYTES_SMALL]; + + // maybe switch context + if (ioStatisticsContext != null) { + IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); + } + + // Write in the worker thread. + try (FSDataOutputStream out = fs.create(workerThreadPath)) { + out.write(data); + } catch (IOException e) { + throw new UncheckedIOException("Failure while writing", e); + } + + //Read in the worker thread. + try (FSDataInputStream in = fs.open(workerThreadPath)) { + in.read(data); + } catch (IOException e) { + throw new UncheckedIOException("Failure while reading", e); + } + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index 08f4f8bc9df..d38b5c93a67 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.util.Progressable; import org.junit.Before; import org.junit.Test; @@ -67,7 +68,11 @@ private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() { .withProgress(progressable) .withPutTracker(putTracker) .withWriteOperations(oHelper) - .withPutOptions(PutObjectOptions.keepingDirs()); + .withPutOptions(PutObjectOptions.keepingDirs()) + .withIOStatisticsAggregator( + IOStatisticsContext.getCurrentIOStatisticsContext() + .getAggregator()); + return builder; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 9d9000cafb9..99872575271 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -26,6 +26,7 @@ import java.util.List; import org.assertj.core.api.Assertions; +import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,9 @@ import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSupport; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -66,6 +70,12 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitITest.class); + /** + * Job statistics accrued across all test cases. + */ + private static final IOStatisticsSnapshot JOB_STATISTICS = + IOStatisticsSupport.snapshotIOStatistics(); + /** * Helper class for commit operations and assertions. */ @@ -92,7 +102,8 @@ protected Configuration createConfiguration() { FS_S3A_COMMITTER_NAME, FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, - FAST_UPLOAD_BUFFER); + FAST_UPLOAD_BUFFER, + S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS); conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); @@ -100,9 +111,15 @@ protected Configuration createConfiguration() { conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY); // and bind the report dir conf.set(OPT_SUMMARY_REPORT_DIR, reportDir.toURI().toString()); + conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, true); return conf; } + @AfterClass + public static void printStatistics() { + LOG.info("Aggregate job statistics {}\n", + IOStatisticsLogging.ioStatisticsToPrettyString(JOB_STATISTICS)); + } /** * Get the log; can be overridden for test case log. * @return a log. @@ -397,6 +414,8 @@ public static SuccessData validateSuccessFile(final Path outputPath, /** * Load a success file; fail if the file is empty/nonexistent. + * The statistics in {@link #JOB_STATISTICS} are updated with + * the statistics from the success file * @param fs filesystem * @param outputPath directory containing the success file. * @param origin origin of the file @@ -426,6 +445,8 @@ public static SuccessData loadSuccessFile(final FileSystem fs, String body = ContractTestUtils.readUTF8(fs, success, -1); LOG.info("Loading committer success file {}. Actual contents=\n{}", success, body); - return SuccessData.load(fs, success); + SuccessData successData = SuccessData.load(fs, success); + JOB_STATISTICS.aggregate(successData.getIOStatistics()); + return successData; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java index b92605ca253..439ef9aa44f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.impl.CommitContext; import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; @@ -92,7 +93,8 @@ protected void verifyFailureConflictOutcome() throws Exception { // this is done by calling the preCommit method directly, final CommitContext commitContext = new CommitOperations(getWrapperFS()). - createCommitContext(getJob(), getOutputPath(), 0); + createCommitContext(getJob(), getOutputPath(), 0, + IOStatisticsContext.getCurrentIOStatisticsContext()); committer.preCommitJob(commitContext, AbstractS3ACommitter.ActiveCommit.empty()); reset(mockFS); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java index 36857669553..b19662c0117 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS; /** ITest of the low level protocol methods. */ public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol { @@ -51,6 +52,14 @@ protected String getCommitterName() { return CommitConstants.COMMITTER_NAME_DIRECTORY; } + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + // turn off stats collection to verify that it works + conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, false); + return conf; + } + @Override protected AbstractS3ACommitter createCommitter( Path outputPath,