diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 30817a2a625..0c156e3548d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -46,9 +46,13 @@ import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+
/**
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -66,7 +70,7 @@ public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
- ByteBufferPositionedReadable {
+ ByteBufferPositionedReadable, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
@@ -867,8 +871,16 @@ public class CryptoInputStream extends FilterInputStream implements
+ " does not expose its stream capabilities.");
}
return ((StreamCapabilities) in).hasCapability(capability);
+ case StreamCapabilities.IOSTATISTICS:
+ return (in instanceof StreamCapabilities)
+ && ((StreamCapabilities) in).hasCapability(capability);
default:
return false;
}
}
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return retrieveIOStatistics(in);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
index aeb6e4d0ed2..553915d755f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
@@ -28,9 +28,13 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+
/**
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -48,7 +52,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements
- Syncable, CanSetDropBehind, StreamCapabilities {
+ Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
@@ -313,4 +317,9 @@ public class CryptoOutputStream extends FilterOutputStream implements
}
return false;
}
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return retrieveIOStatistics(out);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java
index 973b136bb3a..0c5b4f0d374 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java
@@ -24,6 +24,10 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
/**
@@ -33,7 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BufferedFSInputStream extends BufferedInputStream
-implements Seekable, PositionedReadable, HasFileDescriptor {
+ implements Seekable, PositionedReadable, HasFileDescriptor,
+ IOStatisticsSource, StreamCapabilities {
/**
* Creates a BufferedFSInputStream
* with the specified buffer size,
@@ -126,4 +131,26 @@ implements Seekable, PositionedReadable, HasFileDescriptor {
return null;
}
}
+
+ /**
+ * If the inner stream supports {@link StreamCapabilities},
+ * forward the probe to it.
+ * Otherwise: return false.
+ *
+ * @param capability string to query the stream support for.
+ * @return true if a capability is known to be supported.
+ */
+ @Override
+ public boolean hasCapability(final String capability) {
+ if (in instanceof StreamCapabilities) {
+ return ((StreamCapabilities) in).hasCapability(capability);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return retrieveIOStatistics(in);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index f081742ce59..b24136bf9ec 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -38,6 +38,9 @@ import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
@@ -134,7 +137,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
- private static class ChecksumFSInputChecker extends FSInputChecker {
+ private static class ChecksumFSInputChecker extends FSInputChecker implements
+ IOStatisticsSource {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
@@ -270,6 +274,17 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
}
return nread;
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * null if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance or null
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(datas);
+ }
}
private static class FSDataBoundedInputStream extends FSDataInputStream {
@@ -395,7 +410,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
- private static class ChecksumFSOutputSummer extends FSOutputSummer {
+ private static class ChecksumFSOutputSummer extends FSOutputSummer
+ implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
@@ -449,6 +465,28 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
throw new ClosedChannelException();
}
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * null if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance or null
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(datas);
+ }
+
+ /**
+ * Probe the inner stream for a capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true if a capability is known to be supported.
+ */
+ @Override
+ public boolean hasCapability(final String capability) {
+ return datas.hasCapability(capability);
+ }
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 31f82975899..b63e047358c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -29,6 +29,9 @@ import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.IdentityHashStore;
@@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
- ByteBufferPositionedReadable {
+ ByteBufferPositionedReadable, IOStatisticsSource {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
@@ -267,4 +270,15 @@ public class FSDataInputStream extends DataInputStream
"unsupported by " + in.getClass().getCanonicalName());
}
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * null if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance or null
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(in);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
index 5b604e58e23..27d164b7d87 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
@@ -24,13 +24,17 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
- implements Syncable, CanSetDropBehind, StreamCapabilities {
+ implements Syncable, CanSetDropBehind, StreamCapabilities,
+ IOStatisticsSource {
private final OutputStream wrappedStream;
private static class PositionCache extends FilterOutputStream {
@@ -155,4 +159,15 @@ public class FSDataOutputStream extends DataOutputStream
"not support setting the drop-behind caching setting.");
}
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * empty statistics if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
index b3b3fac0c09..ad2642f7db9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
@@ -24,6 +24,9 @@ import java.io.InputStream;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,4 +137,23 @@ public abstract class FSInputStream extends InputStream
throws IOException {
readFully(position, buffer, 0, buffer.length);
}
+
+ /**
+ * toString method returns the superclass toString, but if the subclass
+ * implements {@link IOStatisticsSource} then those statistics are
+ * extracted and included in the output.
+ * That is: statistics of subclasses are automatically reported.
+ * @return a string value.
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(super.toString());
+ sb.append('{');
+ if (this instanceof IOStatisticsSource) {
+ sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
+ (IOStatisticsSource) this));
+ }
+ sb.append('}');
+ return sb.toString();
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
index 89848dc29de..dcb76b50b34 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -26,14 +26,20 @@ import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes.
+ *
* The following names are considered general and preserved across different * StorageStatistics classes. When implementing a new StorageStatistics, it is * highly recommended to use the common statistic names. - * + *
* When adding new common statistic name constants, please make them unique. * By convention, they are implicitly unique: *
+ * This is for reporting and testing. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public final class DurationStatisticSummary implements Serializable { + + private static final long serialVersionUID = 6776381340896518486L; + + /** Statistic key. */ + private final String key; + + /** Are these success or failure statistics. */ + private final boolean success; + + /** Count of operation invocations. */ + private final long count; + + /** Max duration; -1 if unknown. */ + private final long max; + + /** Min duration; -1 if unknown. */ + private final long min; + + /** Mean duration -may be null. */ + private final MeanStatistic mean; + + /** + * Constructor. + * @param key Statistic key. + * @param success Are these success or failure statistics. + * @param count Count of operation invocations. + * @param max Max duration; -1 if unknown. + * @param min Min duration; -1 if unknown. + * @param mean Mean duration -may be null. (will be cloned) + */ + public DurationStatisticSummary(final String key, + final boolean success, + final long count, + final long max, + final long min, + @Nullable final MeanStatistic mean) { + this.key = key; + this.success = success; + this.count = count; + this.max = max; + this.min = min; + this.mean = mean == null ? null : mean.clone(); + } + + public String getKey() { + return key; + } + + public boolean isSuccess() { + return success; + } + + public long getCount() { + return count; + } + + public long getMax() { + return max; + } + + public long getMin() { + return min; + } + + public MeanStatistic getMean() { + return mean; + } + + @Override + public String toString() { + return "DurationStatisticSummary{" + + "key='" + key + '\'' + + ", success=" + success + + ", counter=" + count + + ", max=" + max + + ", mean=" + mean + + '}'; + } + + /** + * Fetch the duration timing summary of success or failure operations + * from an IO Statistics source. + * If the duration key is unknown, the summary will be incomplete. + * @param source source of data + * @param key duration statistic key + * @param success fetch success statistics, or if false, failure stats. + * @return a summary of the statistics. + */ + public static DurationStatisticSummary fetchDurationSummary( + IOStatistics source, + String key, + boolean success) { + String fullkey = success ? key : key + SUFFIX_FAILURES; + return new DurationStatisticSummary(key, success, + source.counters().getOrDefault(fullkey, 0L), + source.maximums().getOrDefault(fullkey + SUFFIX_MAX, -1L), + source.minimums().getOrDefault(fullkey + SUFFIX_MIN, -1L), + source.meanStatistics() + .get(fullkey + SUFFIX_MEAN)); + } + + /** + * Fetch the duration timing summary from an IOStatistics source. + * If the duration key is unknown, the summary will be incomplete. + * @param source source of data + * @param key duration statistic key + * @return a summary of the statistics. + */ + public static DurationStatisticSummary fetchSuccessSummary( + IOStatistics source, + String key) { + return fetchDurationSummary(source, key, true); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTracker.java new file mode 100644 index 00000000000..5a15c7ad66c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTracker.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics; + +import java.time.Duration; + +/** + * Interface to be implemented by objects which can track duration. + * It extends AutoCloseable to fit into a try-with-resources statement, + * but then strips out the {@code throws Exception} aspect of the signature + * so it doesn't force code to add extra handling for any failures. + * + * If a duration is declared as "failed()" then the failure counters + * will be updated. + */ +public interface DurationTracker extends AutoCloseable { + + /** + * The operation failed. Failure statistics will be updated. + */ + void failed(); + + /** + * Finish tracking: update the statistics with the timings. + */ + void close(); + + /** + * Get the duration of an operation as a java Duration + * instance. If the duration tracker hasn't completed, + * or its duration tracking doesn't actually measure duration, + * returns Duration.ZERO. + * @return a duration, value of ZERO until close(). + */ + default Duration asDuration() { + return Duration.ZERO; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java new file mode 100644 index 00000000000..b1d87c9100f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics; + +/** + * Interface for a source of duration tracking. + * + * This is intended for uses where it can be passed into classes + * which update operation durations, without tying those + * classes to internal implementation details. + */ +public interface DurationTrackerFactory { + + /** + * Initiate a duration tracking operation by creating/returning + * an object whose {@code close()} call will + * update the statistics. + * + * The statistics counter with the key name will be incremented + * by the given count. + * + * The expected use is within a try-with-resources clause. + * @param key statistic key prefix + * @param count #of times to increment the matching counter in this + * operation. + * @return an object to close after an operation completes. + */ + DurationTracker trackDuration(String key, long count); + + /** + * Initiate a duration tracking operation by creating/returning + * an object whose {@code close()} call will + * update the statistics. + * The expected use is within a try-with-resources clause. + * @param key statistic key + * @return an object to close after an operation completes. + */ + default DurationTracker trackDuration(String key) { + return trackDuration(key, 1); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatistics.java new file mode 100644 index 00000000000..75d99651281 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatistics.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * IO Statistics. + *
+ * These are low-cost per-instance statistics provided by any Hadoop + * I/O class instance. + *
+ * Consult the filesystem specification document for the requirements
+ * of an implementation of this interface.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface IOStatistics {
+
+ /**
+ * Map of counters.
+ * @return the current map of counters.
+ */
+ Map
+ * Exceptions are caught and downgraded to debug logging.
+ * @param source source of statistics.
+ * @return a string for logging.
+ */
+ public static String ioStatisticsSourceToString(@Nullable Object source) {
+ try {
+ return ioStatisticsToString(retrieveIOStatistics(source));
+ } catch (RuntimeException e) {
+ LOG.debug("Ignoring", e);
+ return "";
+ }
+ }
+
+ /**
+ * Convert IOStatistics to a string form.
+ * @param statistics A statistics instance.
+ * @return string value or the empty string if null
+ */
+ public static String ioStatisticsToString(
+ @Nullable final IOStatistics statistics) {
+ if (statistics != null) {
+ StringBuilder sb = new StringBuilder();
+ mapToString(sb, "counters", statistics.counters(), " ");
+ mapToString(sb, "gauges", statistics.gauges(), " ");
+ mapToString(sb, "minimums", statistics.minimums(), " ");
+ mapToString(sb, "maximums", statistics.maximums(), " ");
+ mapToString(sb, "means", statistics.meanStatistics(), " ");
+
+ return sb.toString();
+ } else {
+ return "";
+ }
+ }
+
+ /**
+ * Convert IOStatistics to a string form, with all the metrics sorted
+ * and empty value stripped.
+ * This is more expensive than the simple conversion, so should only
+ * be used for logging/output where it's known/highly likely that the
+ * caller wants to see the values. Not for debug logging.
+ * @param statistics A statistics instance.
+ * @return string value or the empty string if null
+ */
+ public static String ioStatisticsToPrettyString(
+ @Nullable final IOStatistics statistics) {
+ if (statistics != null) {
+ StringBuilder sb = new StringBuilder();
+ mapToSortedString(sb, "counters", statistics.counters(),
+ p -> p == 0);
+ mapToSortedString(sb, "\ngauges", statistics.gauges(),
+ p -> p == 0);
+ mapToSortedString(sb, "\nminimums", statistics.minimums(),
+ p -> p < 0);
+ mapToSortedString(sb, "\nmaximums", statistics.maximums(),
+ p -> p < 0);
+ mapToSortedString(sb, "\nmeans", statistics.meanStatistics(),
+ MeanStatistic::isEmpty);
+
+ return sb.toString();
+ } else {
+ return "";
+ }
+ }
+
+ /**
+ * Given a map, add its entryset to the string.
+ * The entries are only sorted if the source entryset
+ * iterator is sorted, such as from a TreeMap.
+ * @param sb string buffer to append to
+ * @param type type (for output)
+ * @param map map to evaluate
+ * @param separator separator
+ * @param
+ * Whenever this object's toString() method is called, it evaluates the
+ * statistics.
+ *
+ * This is designed to affordable to use in log statements.
+ * @param source source of statistics -may be null.
+ * @return an object whose toString() operation returns the current values.
+ */
+ public static Object demandStringifyIOStatisticsSource(
+ @Nullable IOStatisticsSource source) {
+ return new SourceToString(source);
+ }
+
+ /**
+ * On demand stringifier of an IOStatistics instance.
+ *
+ * Whenever this object's toString() method is called, it evaluates the
+ * statistics.
+ *
+ * This is for use in log statements where for the cost of creation
+ * of this entry is low; it is affordable to use in log statements.
+ * @param statistics statistics to stringify -may be null.
+ * @return an object whose toString() operation returns the current values.
+ */
+ public static Object demandStringifyIOStatistics(
+ @Nullable IOStatistics statistics) {
+ return new StatisticsToString(statistics);
+ }
+
+ /**
+ * Extract any statistics from the source and log at debug, if
+ * the log is set to log at debug.
+ * No-op if logging is not at debug or the source is null/of
+ * the wrong type/doesn't provide statistics.
+ * @param log log to log to
+ * @param message message for log -this must contain "{}" for the
+ * statistics report to actually get logged.
+ * @param source source object
+ */
+ public static void logIOStatisticsAtDebug(
+ Logger log,
+ String message,
+ Object source) {
+ if (log.isDebugEnabled()) {
+ // robust extract and convert to string
+ String stats = ioStatisticsSourceToString(source);
+ if (!stats.isEmpty()) {
+ log.debug(message, stats);
+ }
+ }
+ }
+
+ /**
+ * Extract any statistics from the source and log to
+ * this class's log at debug, if
+ * the log is set to log at debug.
+ * No-op if logging is not at debug or the source is null/of
+ * the wrong type/doesn't provide statistics.
+ * @param message message for log -this must contain "{}" for the
+ * statistics report to actually get logged.
+ * @param source source object
+ */
+ public static void logIOStatisticsAtDebug(
+ String message,
+ Object source) {
+ logIOStatisticsAtDebug(LOG, message, source);
+ }
+
+ /**
+ * On demand stringifier.
+ *
+ * Whenever this object's toString() method is called, it
+ * retrieves the latest statistics instance and re-evaluates it.
+ */
+ private static final class SourceToString {
+
+ private final IOStatisticsSource source;
+
+ private SourceToString(@Nullable IOStatisticsSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public String toString() {
+ return source != null
+ ? ioStatisticsSourceToString(source)
+ : IOStatisticsBinding.NULL_SOURCE;
+ }
+ }
+
+ /**
+ * Stringifier of statistics: low cost to instantiate and every
+ * toString/logging will re-evaluate the statistics.
+ */
+ private static final class StatisticsToString {
+
+ private final IOStatistics statistics;
+
+ /**
+ * Constructor.
+ * @param statistics statistics
+ */
+ private StatisticsToString(@Nullable IOStatistics statistics) {
+ this.statistics = statistics;
+ }
+
+ /**
+ * Evaluate and stringify the statistics.
+ * @return a string value.
+ */
+ @Override
+ public String toString() {
+ return statistics != null
+ ? ioStatisticsToString(statistics)
+ : IOStatisticsBinding.NULL_SOURCE;
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java
new file mode 100644
index 00000000000..5b8b2e284cc
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.aggregateMaps;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.snapshotMap;
+
+/**
+ * Snapshot of statistics from a different source.
+ *
+ * It is serializable so that frameworks which can use java serialization
+ * to propagate data (Spark, Flink...) can send the statistics
+ * back. For this reason, TreeMaps are explicitly used as field types,
+ * even though IDEs can recommend use of Map instead.
+ * For security reasons, untrusted java object streams should never be
+ * deserialized. If for some reason this is required, use
+ * {@link #requiredSerializationClasses()} to get the list of classes
+ * used when deserializing instances of this object.
+ *
+ *
+ * It is annotated for correct serializations with jackson2.
+ *
+ * These statistics MUST be instance specific, not thread local.
+ *
+ * It is not a requirement that the same instance is returned every time.
+ * {@link IOStatisticsSource}.
+ *
+ * If the object implementing this is Closeable, this method
+ * may return null if invoked on a closed object, even if
+ * it returns a valid instance when called earlier.
+ * @return an IOStatistics instance or null
+ */
+ default IOStatistics getIOStatistics() {
+ return null;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
new file mode 100644
index 00000000000..75977047c0f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.impl.StubDurationTracker;
+import org.apache.hadoop.fs.statistics.impl.StubDurationTrackerFactory;
+
+/**
+ * Support for working with IOStatistics.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class IOStatisticsSupport {
+
+ private IOStatisticsSupport() {
+ }
+
+ /**
+ * Take a snapshot of the current statistics state.
+ *
+ * This is not an atomic option.
+ *
+ * The instance can be serialized, and its
+ * {@code toString()} method lists all the values.
+ * @param statistics statistics
+ * @return a snapshot of the current values.
+ */
+ public static IOStatisticsSnapshot
+ snapshotIOStatistics(IOStatistics statistics) {
+
+ return new IOStatisticsSnapshot(statistics);
+ }
+
+ /**
+ * Create a snapshot statistics instance ready to aggregate data.
+ *
+ * The instance can be serialized, and its
+ * {@code toString()} method lists all the values.
+ * @return an empty snapshot
+ */
+ public static IOStatisticsSnapshot
+ snapshotIOStatistics() {
+
+ return new IOStatisticsSnapshot();
+ }
+
+ /**
+ * Get the IOStatistics of the source, casting it
+ * if it is of the relevant type, otherwise,
+ * if it implements {@link IOStatisticsSource}
+ * extracting the value.
+ *
+ * Returns null if the source isn't of the write type
+ * or the return value of
+ * {@link IOStatisticsSource#getIOStatistics()} was null.
+ * @return an IOStatistics instance or null
+ */
+
+ public static IOStatistics retrieveIOStatistics(
+ final Object source) {
+ if (source instanceof IOStatistics) {
+ return (IOStatistics) source;
+ } else if (source instanceof IOStatisticsSource) {
+ return ((IOStatisticsSource) source).getIOStatistics();
+ } else {
+ // null source or interface not implemented
+ return null;
+ }
+ }
+
+ /**
+ * Return a stub duration tracker factory whose returned trackers
+ * are always no-ops.
+ *
+ * As singletons are returned, this is very low-cost to use.
+ * @return a duration tracker factory.
+ */
+ public static DurationTrackerFactory stubDurationTrackerFactory() {
+ return StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY;
+ }
+
+ /**
+ * Get a stub duration tracker.
+ * @return a stub tracker.
+ */
+ public static DurationTracker stubDurationTracker() {
+ return StubDurationTracker.STUB_DURATION_TRACKER;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/MeanStatistic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/MeanStatistic.java
new file mode 100644
index 00000000000..d9ff0c25c6a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/MeanStatistic.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A mean statistic represented as the sum and the sample count;
+ * the mean is calculated on demand.
+ *
+ * It can be used to accrue values so as to dynamically update
+ * the mean. If so, know that there is no synchronization
+ * on the methods.
+ *
+ * If a statistic has 0 samples then it is considered to be empty.
+ *
+ * All 'empty' statistics are equivalent, independent of the sum value.
+ *
+ * For non-empty statistics, sum and sample values must match
+ * for equality.
+ *
+ * It is serializable and annotated for correct serializations with jackson2.
+ *
+ * Thread safety. The operations to add/copy sample data, are thread safe.
+ *
+ * So is the {@link #mean()} method. This ensures that when
+ * used to aggregated statistics, the aggregate value and sample
+ * count are set and evaluated consistently.
+ *
+ * Other methods marked as synchronized because Findbugs overreacts
+ * to the idea that some operations to update sum and sample count
+ * are synchronized, but that things like equals are not.
+ *
+ * When adding new common statistic name constants, please make them unique.
+ * By convention:
+ *
+ * When adding new common statistic name constants, please make them unique.
+ * By convention, they are implicitly unique:
+ *
+ * No-op if the gauge is unknown.
+ *
+ * No-op if the maximum is unknown.
+ *
+ * No-op if the minimum is unknown.
+ *
+ * No-op if the minimum is unknown.
+ *
+ * No-op if the minimum is unknown.
+ *
+ * No-op if the key is unknown.
+ *
+ * No-op if the key is unknown.
+ *
+ * No-op if the key is unknown.
+ *
+ * A ConcurrentHashMap of each set of statistics is created;
+ * the AtomicLong/MeanStatistic entries are fetched as required.
+ * When the statistics are updated, the referenced objects
+ * are updated rather than new values set in the map.
+ *
+ * The update is non -atomic, even though each individual statistic
+ * is updated thread-safely. If two threads update the values
+ * simultaneously, at the end of each operation the state will
+ * be correct. It is only during the sequence that the statistics
+ * may be observably inconsistent.
+ *
+ * This package defines two interfaces:
+ *
+ * {@link org.apache.hadoop.fs.statistics.IOStatisticsSource}:
+ * a source of statistic data, which can be retrieved
+ * through a call to
+ * {@link org.apache.hadoop.fs.statistics.IOStatisticsSource#getIOStatistics()} .
+ *
+ * {@link org.apache.hadoop.fs.statistics.IOStatistics} the statistics retrieved
+ * from a statistics source.
+ *
+ * The retrieved statistics may be an immutable snapshot -in which case to get
+ * updated statistics another call to
+ * {@link org.apache.hadoop.fs.statistics.IOStatisticsSource#getIOStatistics()}
+ * must be made. Or they may be dynamic -in which case every time a specific
+ * statistic is retrieved, the latest version is returned. Callers should assume
+ * that if a statistics instance is dynamic, there is no atomicity when querying
+ * multiple statistics. If the statistics source was a closeable object (e.g. a
+ * stream), the statistics MUST remain valid after the stream is closed.
+ *
+ * Use pattern:
+ *
+ * An application probes an object (filesystem, stream etc) to see if it
+ * implements {@code IOStatisticsSource}, and, if it is,
+ * calls {@code getIOStatistics()} to get its statistics.
+ * If this is non-null, the client has statistics on the current
+ * state of the statistics.
+ *
+ * The expectation is that a statistics source is dynamic: when a value is
+ * looked up the most recent values are returned.
+ * When iterating through the set, the values of the iterator SHOULD
+ * be frozen at the time the iterator was requested.
+ *
+ * These statistics can be used to: log operations, profile applications,
+ * and make assertions about the state of the output.
+ *
+ * The names of statistics are a matter of choice of the specific source.
+ * However, {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
+ * contains a
+ * set of names recommended for object store operations.
+ * {@link org.apache.hadoop.fs.statistics.StreamStatisticNames} declares
+ * recommended names for statistics provided for
+ * input and output streams.
+ *
+ * Utility classes:
+ *
+ * Implementors notes:
+ *
+ * Contains methods promoted from
+ * {@link org.apache.hadoop.fs.impl.FutureIOSupport} because they
+ * are a key part of integrating async IO in application code.
+ *
+ * One key feature is that the {@link #awaitFuture(Future)} and
+ * {@link #awaitFuture(Future, long, TimeUnit)} calls will
+ * extract and rethrow exceptions raised in the future's execution,
+ * including extracting the inner IOException of any
+ * {@code UncheckedIOException} raised in the future.
+ * This makes it somewhat easier to execute IOException-raising
+ * code inside futures.
+ *
+ * Any exception generated in the future is
+ * extracted and rethrown.
+ *
+ * Any exception generated in the future is
+ * extracted and rethrown.
+ *
+ *
+ *
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class StoreStatisticNames {
+
+ /** {@value}. */
+ public static final String OP_APPEND = "op_append";
+
+ /** {@value}. */
+ public static final String OP_COPY_FROM_LOCAL_FILE =
+ "op_copy_from_local_file";
+
+ /** {@value}. */
+ public static final String OP_CREATE = "op_create";
+
+ /** {@value}. */
+ public static final String OP_CREATE_NON_RECURSIVE =
+ "op_create_non_recursive";
+
+ /** {@value}. */
+ public static final String OP_DELETE = "op_delete";
+
+ /** {@value}. */
+ public static final String OP_EXISTS = "op_exists";
+
+ /** {@value}. */
+ public static final String OP_GET_CONTENT_SUMMARY =
+ "op_get_content_summary";
+
+ /** {@value}. */
+ public static final String OP_GET_DELEGATION_TOKEN =
+ "op_get_delegation_token";
+
+ /** {@value}. */
+ public static final String OP_GET_FILE_CHECKSUM =
+ "op_get_file_checksum";
+
+ /** {@value}. */
+ public static final String OP_GET_FILE_STATUS = "op_get_file_status";
+
+ /** {@value}. */
+ public static final String OP_GET_STATUS = "op_get_status";
+
+ /** {@value}. */
+ public static final String OP_GLOB_STATUS = "op_glob_status";
+
+ /** {@value}. */
+ public static final String OP_IS_FILE = "op_is_file";
+
+ /** {@value}. */
+ public static final String OP_IS_DIRECTORY = "op_is_directory";
+
+ /** {@value}. */
+ public static final String OP_LIST_FILES = "op_list_files";
+
+ /** {@value}. */
+ public static final String OP_LIST_LOCATED_STATUS =
+ "op_list_located_status";
+
+ /** {@value}. */
+ public static final String OP_LIST_STATUS = "op_list_status";
+
+ /** {@value}. */
+ public static final String OP_MKDIRS = "op_mkdirs";
+
+ /** {@value}. */
+ public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries";
+
+ /** {@value}. */
+ public static final String OP_OPEN = "op_open";
+
+ /** {@value}. */
+ public static final String OP_REMOVE_ACL = "op_remove_acl";
+
+ /** {@value}. */
+ public static final String OP_REMOVE_ACL_ENTRIES = "op_remove_acl_entries";
+
+ /** {@value}. */
+ public static final String OP_REMOVE_DEFAULT_ACL = "op_remove_default_acl";
+
+ /** {@value}. */
+ public static final String OP_RENAME = "op_rename";
+
+ /** {@value}. */
+ public static final String OP_SET_ACL = "op_set_acl";
+
+ /** {@value}. */
+ public static final String OP_SET_OWNER = "op_set_owner";
+
+ /** {@value}. */
+ public static final String OP_SET_PERMISSION = "op_set_permission";
+
+ /** {@value}. */
+ public static final String OP_SET_TIMES = "op_set_times";
+
+ /** {@value}. */
+ public static final String OP_TRUNCATE = "op_truncate";
+
+ /** {@value}. */
+ public static final String DELEGATION_TOKENS_ISSUED
+ = "delegation_tokens_issued";
+
+ /** Requests throttled and retried: {@value}. */
+ public static final String STORE_IO_THROTTLED
+ = "store_io_throttled";
+
+ /** Requests made of a store: {@value}. */
+ public static final String STORE_IO_REQUEST
+ = "store_io_request";
+
+ /**
+ * IO retried: {@value}.
+ */
+ public static final String STORE_IO_RETRY
+ = "store_io_retry";
+
+ /**
+ * A store's equivalent of a paged LIST request was initiated: {@value}.
+ */
+ public static final String OBJECT_LIST_REQUEST
+ = "object_list_request";
+
+ /**
+ * Number of continued object listings made.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_CONTINUE_LIST_REQUEST =
+ "object_continue_list_request";
+
+ /**
+ * A bulk DELETE request was made: {@value}.
+ * A separate statistic from {@link #OBJECT_DELETE_REQUEST}
+ * so that metrics on duration of the operations can
+ * be distinguished.
+ */
+ public static final String OBJECT_BULK_DELETE_REQUEST
+ = "object_bulk_delete_request";
+
+ /**
+ * A store's equivalent of a DELETE request was made: {@value}.
+ * This may be an HTTP DELETE verb, or it may be some custom
+ * operation which takes a list of objects to delete.
+ */
+ public static final String OBJECT_DELETE_REQUEST
+ = "object_delete_request";
+
+ /**
+ * The count of objects deleted in delete requests.
+ */
+ public static final String OBJECT_DELETE_OBJECTS
+ = "object_delete_objects";
+
+ /**
+ * Object multipart upload initiated.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_MULTIPART_UPLOAD_INITIATED =
+ "object_multipart_initiated";
+
+ /**
+ * Object multipart upload aborted.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
+ "object_multipart_aborted";
+
+ /**
+ * Object put/multipart upload count.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_REQUEST =
+ "object_put_request";
+
+ /**
+ * Object put/multipart upload completed count.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_REQUEST_COMPLETED =
+ "object_put_request_completed";
+
+ /**
+ * Current number of active put requests.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_REQUEST_ACTIVE =
+ "object_put_request_active";
+
+ /**
+ * number of bytes uploaded.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_BYTES =
+ "object_put_bytes";
+
+ /**
+ * number of bytes queued for upload/being actively uploaded.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_BYTES_PENDING =
+ "object_put_bytes_pending";
+
+ /**
+ * Count of S3 Select (or similar) requests issued.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_SELECT_REQUESTS =
+ "object_select_requests";
+
+ /**
+ * Suffix to use for a minimum value when
+ * the same key is shared across min/mean/max
+ * statistics.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_MIN = ".min";
+
+ /**
+ * Suffix to use for a maximum value when
+ * the same key is shared across max/mean/max
+ * statistics.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_MAX = ".max";
+
+ /**
+ * Suffix to use for a mean statistic value when
+ * the same key is shared across mean/mean/max
+ * statistics.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_MEAN = ".mean";
+
+ /**
+ * String to add to counters and other stats to track failures.
+ * This comes before the .min/.mean//max suffixes.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_FAILURES = ".failures";
+
+ /**
+ * The name of the statistic collected for executor acquisition if
+ * a duration tracker factory is passed in to the constructor.
+ * {@value}.
+ */
+ public static final String ACTION_EXECUTOR_ACQUIRED =
+ "action_executor_acquired";
+
+ /**
+ * An HTTP HEAD request was made: {@value}.
+ */
+ public static final String ACTION_HTTP_HEAD_REQUEST
+ = "action_http_head_request";
+
+ /**
+ * An HTTP GET request was made: {@value}.
+ */
+ public static final String ACTION_HTTP_GET_REQUEST
+ = "action_http_get_request";
+
+ /**
+ * An HTTP HEAD request was made: {@value}.
+ */
+ public static final String OBJECT_METADATA_REQUESTS
+ = "object_metadata_request";
+
+ public static final String OBJECT_COPY_REQUESTS
+ = "object_copy_requests";
+
+ public static final String STORE_IO_THROTTLE_RATE
+ = "store_io_throttle_rate";
+
+ public static final String DELEGATION_TOKEN_ISSUED
+ = "delegation_token_issued";
+
+ public static final String MULTIPART_UPLOAD_INSTANTIATED
+ = "multipart_instantiated";
+
+ public static final String MULTIPART_UPLOAD_PART_PUT
+ = "multipart_upload_part_put";
+
+ public static final String MULTIPART_UPLOAD_PART_PUT_BYTES
+ = "multipart_upload_part_put_bytes";
+
+ public static final String MULTIPART_UPLOAD_ABORTED
+ = "multipart_upload_aborted";
+
+ public static final String MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED
+ = "multipart_upload_abort_under_path_invoked";
+
+ public static final String MULTIPART_UPLOAD_COMPLETED
+ = "multipart_upload_completed";
+
+ public static final String MULTIPART_UPLOAD_STARTED
+ = "multipart_upload_started";
+
+ private StoreStatisticNames() {
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
new file mode 100644
index 00000000000..02072d464de
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * These are common statistic names.
+ *
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class StreamStatisticNames {
+
+ /**
+ * Count of times the TCP stream was aborted.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_ABORTED = "stream_aborted";
+
+ /**
+ * Bytes read from an input stream in read() calls.
+ * Does not include bytes read and then discarded in seek/close etc.
+ * These are the bytes returned to the caller.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BYTES
+ = "stream_read_bytes";
+
+ /**
+ * Count of bytes discarded by aborting an input stream .
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BYTES_DISCARDED_ABORT
+ = "stream_read_bytes_discarded_in_abort";
+
+ /**
+ * Count of bytes read and discarded when closing an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BYTES_DISCARDED_CLOSE
+ = "stream_read_bytes_discarded_in_close";
+
+ /**
+ * Count of times the TCP stream was closed.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_CLOSED = "stream_read_closed";
+
+ /**
+ * Total count of times an attempt to close an input stream was made
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_CLOSE_OPERATIONS
+ = "stream_read_close_operations";
+
+ /**
+ * Total count of times an input stream to was opened.
+ * For object stores, that means the count a GET request was initiated.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_OPENED = "stream_read_opened";
+
+ /**
+ * Count of exceptions raised during input stream reads.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_EXCEPTIONS =
+ "stream_read_exceptions";
+
+ /**
+ * Count of readFully() operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_FULLY_OPERATIONS
+ = "stream_read_fully_operations";
+
+ /**
+ * Count of read() operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_OPERATIONS =
+ "stream_read_operations";
+
+ /**
+ * Count of incomplete read() operations in an input stream,
+ * that is, when the bytes returned were less than that requested.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_OPERATIONS_INCOMPLETE
+ = "stream_read_operations_incomplete";
+
+ /**
+ * Count of version mismatches encountered while reading an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_VERSION_MISMATCHES
+ = "stream_read_version_mismatches";
+
+ /**
+ * Count of executed seek operations which went backwards in a stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BACKWARD_OPERATIONS =
+ "stream_read_seek_backward_operations";
+
+ /**
+ * Count of bytes moved backwards during seek operations
+ * in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BYTES_BACKWARDS
+ = "stream_read_bytes_backwards_on_seek";
+
+ /**
+ * Count of bytes read and discarded during seek() in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BYTES_DISCARDED =
+ "stream_read_seek_bytes_discarded";
+
+ /**
+ * Count of bytes skipped during forward seek operations.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BYTES_SKIPPED
+ = "stream_read_seek_bytes_skipped";
+
+ /**
+ * Count of executed seek operations which went forward in
+ * an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_FORWARD_OPERATIONS
+ = "stream_read_seek_forward_operations";
+
+ /**
+ * Count of times the seek policy was dynamically changed
+ * in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_POLICY_CHANGED =
+ "stream_read_seek_policy_changed";
+
+ /**
+ * Count of seek operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_OPERATIONS =
+ "stream_read_seek_operations";
+
+ /**
+ * Count of {@code InputStream.skip()} calls.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SKIP_OPERATIONS =
+ "stream_read_skip_operations";
+
+ /**
+ * Count bytes skipped in {@code InputStream.skip()} calls.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SKIP_BYTES =
+ "stream_read_skip_bytes";
+
+ /**
+ * Total count of bytes read from an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_TOTAL_BYTES
+ = "stream_read_total_bytes";
+
+ /**
+ * Count of calls of {@code CanUnbuffer.unbuffer()}.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_UNBUFFERED
+ = "stream_read_unbuffered";
+
+ /**
+ * "Count of stream write failures reported.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_EXCEPTIONS =
+ "stream_write_exceptions";
+
+ /**
+ * Count of failures when finalizing a multipart upload:
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS =
+ "stream_write_exceptions_completing_upload";
+
+ /**
+ * Count of block/partition uploads complete.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS
+ = "stream_write_block_uploads";
+
+ /**
+ * Count of number of block uploads aborted.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_ABORTED
+ = "stream_write_block_uploads_aborted";
+
+ /**
+ * Count of block/partition uploads active.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_ACTIVE
+ = "stream_write_block_uploads_active";
+
+ /**
+ * Gauge of data queued to be written.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING =
+ "stream_write_block_uploads_data_pending";
+
+ /**
+ * Count of number of block uploads committed.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_COMMITTED
+ = "stream_write_block_uploads_committed";
+
+ /**
+ * Gauge of block/partitions uploads queued to be written.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_PENDING
+ = "stream_write_block_uploads_pending";
+
+
+ /**
+ * "Count of bytes written to output stream including all not yet uploaded.
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_BYTES
+ = "stream_write_bytes";
+
+ /**
+ * Count of total time taken for uploads to complete.
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_TOTAL_TIME
+ = "stream_write_total_time";
+
+ /**
+ * Total queue duration of all block uploads.
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_QUEUE_DURATION
+ = "stream_write_queue_duration";
+
+ public static final String STREAM_WRITE_TOTAL_DATA
+ = "stream_write_total_data";
+
+ private StreamStatisticNames() {
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/AbstractIOStatisticsImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/AbstractIOStatisticsImpl.java
new file mode 100644
index 00000000000..c701a509d89
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/AbstractIOStatisticsImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+/**
+ * Base implementation in case common methods/fields need to be added
+ * in future.
+ */
+public abstract class AbstractIOStatisticsImpl implements IOStatistics {
+
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/DynamicIOStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/DynamicIOStatistics.java
new file mode 100644
index 00000000000..50c2625c351
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/DynamicIOStatistics.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+/**
+ * These statistics are dynamically evaluated by the supplied
+ * String -> type functions.
+ *
+ * This allows statistic sources to supply a list of callbacks used to
+ * generate the statistics on demand; similar to some of the Coda Hale metrics.
+ *
+ * The evaluation actually takes place during the iteration's {@code next()}
+ * call.
+ */
+final class DynamicIOStatistics
+ extends AbstractIOStatisticsImpl {
+
+ /**
+ * Counter evaluators.
+ */
+ private final EvaluatingStatisticsMap
+ *
+ *
+ *
+ *
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
index 2dfa30bf76e..55bb132e9c8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
+
/**
* A compression input stream.
*
@@ -34,7 +38,8 @@ import org.apache.hadoop.fs.Seekable;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public abstract class CompressionInputStream extends InputStream implements Seekable {
+public abstract class CompressionInputStream extends InputStream
+ implements Seekable, IOStatisticsSource {
/**
* The input stream to be compressed.
*/
@@ -68,7 +73,16 @@ public abstract class CompressionInputStream extends InputStream implements Seek
}
}
}
-
+
+ /**
+ * Return any IOStatistics provided by the underlying stream.
+ * @return IO stats from the inner stream.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(in);
+ }
+
/**
* Read bytes from the stream.
* Made abstract to prevent leakage to underlying stream.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
index 71c7f32e665..2a11ace8170 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
@@ -23,13 +23,17 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
/**
* A compression output stream.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public abstract class CompressionOutputStream extends OutputStream {
+public abstract class CompressionOutputStream extends OutputStream
+ implements IOStatisticsSource {
/**
* The output stream to be compressed.
*/
@@ -94,4 +98,12 @@ public abstract class CompressionOutputStream extends OutputStream {
*/
public abstract void resetState() throws IOException;
+ /**
+ * Return any IOStatistics provided by the underlying stream.
+ * @return IO stats from the inner stream.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(out);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index e2cd3048d58..520ddf6bdf4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -25,6 +25,9 @@ import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.Text;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -42,7 +45,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
*/
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
-public class LineReader implements Closeable {
+public class LineReader implements Closeable, IOStatisticsSource {
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
@@ -148,7 +151,16 @@ public class LineReader implements Closeable {
public void close() throws IOException {
in.close();
}
-
+
+ /**
+ * Return any IOStatistics provided by the source.
+ * @return IO stats from the input stream.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(in);
+ }
+
/**
* Read one line from the InputStream into the given Text.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/OperationDuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/OperationDuration.java
index 3276d2138bb..fdd25286a23 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/OperationDuration.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/OperationDuration.java
@@ -18,48 +18,98 @@
package org.apache.hadoop.util;
+import java.time.Duration;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Little duration counter.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.Public
@InterfaceStability.Unstable
public class OperationDuration {
+ /**
+ * Time in millis when the operation started.
+ */
private final long started;
+
+ /**
+ * Time when the operation finished.
+ */
private long finished;
+ /**
+ * Instantiate.
+ * The start time and finished time are both set
+ * to the current clock time.
+ */
public OperationDuration() {
started = time();
finished = started;
}
+ /**
+ * Evaluate the system time.
+ * @return the current clock time.
+ */
protected long time() {
return System.currentTimeMillis();
}
+ /**
+ * Update the finished time with the current system time.
+ */
public void finished() {
finished = time();
}
+ /**
+ * Return the duration as {@link #humanTime(long)}.
+ * @return a printable duration.
+ */
public String getDurationString() {
return humanTime(value());
}
+ /**
+ * Convert to a human time of minutes:seconds.millis.
+ * @param time time to humanize.
+ * @return a printable value.
+ */
public static String humanTime(long time) {
long seconds = (time / 1000);
long minutes = (seconds / 60);
return String.format("%d:%02d.%03ds", minutes, seconds % 60, time % 1000);
}
+ /**
+ * Return the duration as {@link #humanTime(long)}.
+ * @return a printable duration.
+ */
@Override
public String toString() {
return getDurationString();
}
+ /**
+ * Get the duration in milliseconds.
+ *
+ * This will be 0 until a call
+ * to {@link #finished()} has been made.
+ * @return the currently recorded duration.
+ */
public long value() {
return finished -started;
}
+
+ /**
+ * Get the duration of an operation as a java Duration
+ * instance.
+ * @return a duration.
+ */
+ public Duration asDuration() {
+ return Duration.ofMillis(value());
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/BiFunctionRaisingIOE.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/BiFunctionRaisingIOE.java
new file mode 100644
index 00000000000..ea17c16d01e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/BiFunctionRaisingIOE.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+
+/**
+ * Function of arity 2 which may raise an IOException.
+ * @param
+ *
+ *
+ * Recursively handles wrapped Execution and Completion Exceptions in
+ * case something very complicated has happened.
+ * @param e exception.
+ * @return an IOException extracted or built from the cause.
+ * @throws RuntimeException if that is the inner cause.
+ * @throws Error if that is the inner cause.
+ */
+ @SuppressWarnings("ChainOfInstanceofChecks")
+ public static IOException unwrapInnerException(final Throwable e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ return (IOException) cause;
+ } else if (cause instanceof UncheckedIOException) {
+ // this is always an IOException
+ return ((UncheckedIOException) cause).getCause();
+ } else if (cause instanceof CompletionException) {
+ return unwrapInnerException(cause);
+ } else if (cause instanceof ExecutionException) {
+ return unwrapInnerException(cause);
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof Error) {
+ throw (Error) cause;
+ } else if (cause != null) {
+ // other type: wrap with a new IOE
+ return new IOException(cause);
+ } else {
+ // this only happens if there was no cause.
+ return new IOException(e);
+ }
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/InvocationRaisingIOE.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/InvocationRaisingIOE.java
new file mode 100644
index 00000000000..b59dabea89e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/InvocationRaisingIOE.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+
+/**
+ * This is a lambda-expression which may raises an IOException.
+ * This is a recurrent design patten in the hadoop codebase, e.g
+ * {@code LambdaTestUtils.VoidCallable} and
+ * the S3A {@code Invoker.VoidOperation}}. Hopefully this should
+ * be the last.
+ * Note for implementors of methods which take this as an argument:
+ * don't use method overloading to determine which specific functional
+ * interface is to be used.
+ */
+@FunctionalInterface
+public interface InvocationRaisingIOE {
+
+ /**
+ * Apply the operation.
+ * @throws IOException Any IO failure
+ */
+ void apply() throws IOException;
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RemoteIterators.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RemoteIterators.java
new file mode 100644
index 00000000000..3ac0fced149
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RemoteIterators.java
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.io.IOUtils;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+
+/**
+ * A set of remote iterators supporting transformation and filtering,
+ * with IOStatisticsSource passthrough, and of conversions of
+ * the iterators to lists/arrays and of performing actions
+ * on the values.
+ *
+ * This aims to make it straightforward to use lambda-expressions to
+ * transform the results of an iterator, without losing the statistics
+ * in the process, and to chain the operations together.
+ *
+ * The closeable operation will be passed through RemoteIterators which
+ * wrap other RemoteIterators. This is to support any iterator which
+ * can be closed to release held connections, file handles etc.
+ * Unless client code is written to assume that RemoteIterator instances
+ * may be closed, this is not likely to be broadly used. It is added
+ * to make it possible to adopt this feature in a managed way.
+ *
+ * One notable feature is that the
+ * {@link #foreach(RemoteIterator, ConsumerRaisingIOE)} method will
+ * LOG at debug any IOStatistics provided by the iterator, if such
+ * statistics are provided. There's no attempt at retrieval and logging
+ * if the LOG is not set to debug, so it is a zero cost feature unless
+ * the logger {@code org.apache.hadoop.fs.functional.RemoteIterators}
+ * is at DEBUG.
+ *
+ * Based on the S3A Listing code, and some some work on moving other code
+ * to using iterative listings so as to pick up the statistics.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class RemoteIterators {
+
+ /**
+ * Log used for logging any statistics in
+ * {@link #foreach(RemoteIterator, ConsumerRaisingIOE)}
+ * at DEBUG.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ RemoteIterators.class);
+
+ private RemoteIterators() {
+ }
+
+ /**
+ * Create an iterator from a singleton.
+ * @param singleton instance
+ * @param source type
+ * @param RemoteIterator iterator,
+ FunctionRaisingIOE super S, T> mapper) {
+ return new MappingRemoteIterator<>(iterator, mapper);
+ }
+
+ /**
+ * Create a RemoteIterator from a RemoteIterator, casting the
+ * type in the process. This is to help with filesystem API
+ * calls where overloading causes confusion (e.g. listStatusIterator())
+ * @param source type
+ * @param RemoteIterator iterator) {
+ return new TypeCastingRemoteIterator<>(iterator);
+ }
+
+ /**
+ * Create a RemoteIterator from a RemoteIterator and a filter
+ * function which returns true for every element to be passed
+ * through.
+ *
+ * Elements are filtered in the hasNext() method; if not used
+ * the filtering will be done on demand in the {@code next()}
+ * call.
+ * @param type
+ * @param iterator source
+ * @param filter filter
+ * @return a remote iterator
+ */
+ public static RemoteIterator filteringRemoteIterator(
+ RemoteIterator iterator,
+ FunctionRaisingIOE super S, Boolean> filter) {
+ return new FilteringRemoteIterator<>(iterator, filter);
+ }
+
+ /**
+ * This adds an extra close operation alongside the passthrough
+ * to any Closeable.close() method supported by the source iterator.
+ * @param iterator source
+ * @param toClose extra object to close.
+ * @param source type.
+ * @return a new iterator
+ */
+ public static RemoteIterator closingRemoteIterator(
+ RemoteIterator iterator,
+ Closeable toClose) {
+ return new CloseRemoteIterator<>(iterator, toClose);
+ }
+
+ /**
+ * Build a list from a RemoteIterator.
+ * @param source type
+ * @param
+ implements RemoteIterator source;
+
+ private final Closeable sourceToClose;
+
+ protected WrappingRemoteIterator(final RemoteIterator source) {
+ this.source = requireNonNull(source);
+ sourceToClose = new MaybeClose(source);
+ }
+
+ protected RemoteIterator getSource() {
+ return source;
+ }
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return retrieveIOStatistics(source);
+ }
+
+ @Override
+ public void close() throws IOException {
+ sourceToClose.close();
+ }
+
+ /**
+ * Check for the source having a next element.
+ * If it does not, this object's close() method
+ * is called and false returned
+ * @return true if there is a new value
+ * @throws IOException failure to retrieve next value
+ */
+ protected boolean sourceHasNext() throws IOException {
+ boolean hasNext;
+ try {
+ hasNext = getSource().hasNext();
+ } catch (IOException e) {
+ IOUtils.cleanupWithLogger(LOG, this);
+ throw e;
+ }
+ if (!hasNext) {
+ // there is nothing less so automatically close.
+ close();
+ }
+ return hasNext;
+ }
+
+ /**
+ * Get the next source value.
+ * This calls {@link #sourceHasNext()} first to verify
+ * that there is data.
+ * @return the next value
+ * @throws IOException failure
+ * @throws NoSuchElementException no more data
+ */
+ protected S sourceNext() throws IOException {
+ try {
+ if (!sourceHasNext()) {
+ throw new NoSuchElementException();
+ }
+ return getSource().next();
+ } catch (NoSuchElementException | IOException e) {
+ IOUtils.cleanupWithLogger(LOG, this);
+ throw e;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return source.toString();
+ }
+
+ }
+
+ /**
+ * Iterator taking a source and a transformational function.
+ * @param source type
+ * @param
+ extends WrappingRemoteIterator {
+
+ /**
+ * Mapper to invoke.
+ */
+ private final FunctionRaisingIOE super S, T> mapper;
+
+ private MappingRemoteIterator(
+ RemoteIterator