From b62a5ece95a6b5bbb17f273debd55bcbf0c5f28c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 9 Jan 2018 18:46:52 +0000 Subject: [PATCH] HADOOP-15161. s3a: Stream and common statistics missing from metrics Contributed by Sean Mackrory --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 7 +- .../hadoop/fs/s3a/S3AInstrumentation.java | 75 +++++-------------- .../org/apache/hadoop/fs/s3a/Statistic.java | 8 ++ .../apache/hadoop/fs/s3a/ITestS3AMetrics.java | 24 +++++- 4 files changed, 55 insertions(+), 59 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a8147ed1c9c..62b97d69b46 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -688,7 +688,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { */ public FSDataInputStream open(Path f, int bufferSize) throws IOException { - checkNotClosed(); + entryPoint(INVOCATION_OPEN); LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy); final FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { @@ -732,7 +732,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - checkNotClosed(); + entryPoint(INVOCATION_CREATE); final Path path = qualify(f); String key = pathToKey(path); FileStatus status = null; @@ -799,6 +799,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { short replication, long blockSize, Progressable progress) throws IOException { + entryPoint(INVOCATION_CREATE_NON_RECURSIVE); Path parent = path.getParent(); if (parent != null) { // expect this to raise an exception if there is no parent @@ -1683,7 +1684,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { @Retries.RetryTranslated public boolean delete(Path f, boolean recursive) throws IOException { try { - checkNotClosed(); + entryPoint(INVOCATION_DELETE); return innerDelete(innerGetFileStatus(f, true), recursive); } catch (FileNotFoundException e) { LOG.debug("Couldn't delete {} - does not exist", f); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index d8433474755..b883455ec88 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -34,7 +34,6 @@ import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; -import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; @@ -129,8 +128,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource { private final MutableCounterLong numberOfFakeDirectoryDeletes; private final MutableCounterLong numberOfDirectoriesCreated; private final MutableCounterLong numberOfDirectoriesDeleted; - private final Map streamMetrics = - new HashMap<>(30); /** Instantiate this without caring whether or not S3Guard is enabled. */ private final S3GuardInstrumentation s3GuardInstrumentation @@ -138,6 +135,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource { private static final Statistic[] COUNTERS_TO_CREATE = { INVOCATION_COPY_FROM_LOCAL_FILE, + INVOCATION_CREATE, + INVOCATION_CREATE_NON_RECURSIVE, + INVOCATION_DELETE, INVOCATION_EXISTS, INVOCATION_GET_FILE_STATUS, INVOCATION_GLOB_STATUS, @@ -147,6 +147,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource { INVOCATION_LIST_LOCATED_STATUS, INVOCATION_LIST_STATUS, INVOCATION_MKDIRS, + INVOCATION_OPEN, INVOCATION_RENAME, OBJECT_COPY_REQUESTS, OBJECT_DELETE_REQUESTS, @@ -196,27 +197,27 @@ public class S3AInstrumentation implements Closeable, MetricsSource { "A unique identifier for the instance", fileSystemInstanceId.toString()); registry.tag(METRIC_TAG_BUCKET, "Hostname from the FS URL", name.getHost()); - streamOpenOperations = streamCounter(STREAM_OPENED); - streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS); - streamClosed = streamCounter(STREAM_CLOSED); - streamAborted = streamCounter(STREAM_ABORTED); - streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS); - streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS); + streamOpenOperations = counter(STREAM_OPENED); + streamCloseOperations = counter(STREAM_CLOSE_OPERATIONS); + streamClosed = counter(STREAM_CLOSED); + streamAborted = counter(STREAM_ABORTED); + streamSeekOperations = counter(STREAM_SEEK_OPERATIONS); + streamReadExceptions = counter(STREAM_READ_EXCEPTIONS); streamForwardSeekOperations = - streamCounter(STREAM_FORWARD_SEEK_OPERATIONS); + counter(STREAM_FORWARD_SEEK_OPERATIONS); streamBackwardSeekOperations = - streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS); - streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED); + counter(STREAM_BACKWARD_SEEK_OPERATIONS); + streamBytesSkippedOnSeek = counter(STREAM_SEEK_BYTES_SKIPPED); streamBytesBackwardsOnSeek = - streamCounter(STREAM_SEEK_BYTES_BACKWARDS); - streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ); - streamReadOperations = streamCounter(STREAM_READ_OPERATIONS); + counter(STREAM_SEEK_BYTES_BACKWARDS); + streamBytesRead = counter(STREAM_SEEK_BYTES_READ); + streamReadOperations = counter(STREAM_READ_OPERATIONS); streamReadFullyOperations = - streamCounter(STREAM_READ_FULLY_OPERATIONS); + counter(STREAM_READ_FULLY_OPERATIONS); streamReadsIncomplete = - streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE); - streamBytesReadInClose = streamCounter(STREAM_CLOSE_BYTES_READ); - streamBytesDiscardedInAbort = streamCounter(STREAM_ABORT_BYTES_DISCARDED); + counter(STREAM_READ_OPERATIONS_INCOMPLETE); + streamBytesReadInClose = counter(STREAM_CLOSE_BYTES_READ); + streamBytesDiscardedInAbort = counter(STREAM_ABORT_BYTES_DISCARDED); numberOfFilesCreated = counter(FILES_CREATED); numberOfFilesCopied = counter(FILES_COPIED); bytesOfFilesCopied = counter(FILES_COPIED_BYTES); @@ -282,20 +283,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource { return registry.newCounter(name, desc, 0L); } - /** - * Create a counter in the stream map: these are unregistered in the public - * metrics. - * @param name counter name - * @param desc counter description - * @return a new counter - */ - protected final MutableCounterLong streamCounter(String name, String desc) { - MutableCounterLong counter = new MutableCounterLong( - Interns.info(name, desc), 0L); - streamMetrics.put(name, counter); - return counter; - } - /** * Create a counter in the registry. * @param op statistic to count @@ -305,16 +292,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource { return counter(op.getSymbol(), op.getDescription()); } - /** - * Create a counter in the stream map: these are unregistered in the public - * metrics. - * @param op statistic to count - * @return a new counter - */ - protected final MutableCounterLong streamCounter(Statistic op) { - return streamCounter(op.getSymbol(), op.getDescription()); - } - /** * Create a gauge in the registry. * @param name name gauge name @@ -365,11 +342,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource { prefix, separator, suffix); registry.snapshot(metricBuilder, all); - for (Map.Entry entry: - streamMetrics.entrySet()) { - metricBuilder.tuple(entry.getKey(), - Long.toString(entry.getValue().value())); - } return metricBuilder.toString(); } @@ -447,9 +419,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource { */ public MutableMetric lookupMetric(String name) { MutableMetric metric = getRegistry().get(name); - if (metric == null) { - metric = streamMetrics.get(name); - } return metric; } @@ -1141,10 +1110,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource { public Map toMap() { MetricsToMap metricBuilder = new MetricsToMap(null); registry.snapshot(metricBuilder, true); - for (Map.Entry entry : - streamMetrics.entrySet()) { - metricBuilder.tuple(entry.getKey(), entry.getValue().value()); - } return metricBuilder.getMap(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 871d7c5e404..bb30f1f0c26 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -49,6 +49,12 @@ public enum Statistic { IGNORED_ERRORS("ignored_errors", "Errors caught and ignored"), INVOCATION_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE, "Calls of copyFromLocalFile()"), + INVOCATION_CREATE(CommonStatisticNames.OP_CREATE, + "Calls of create()"), + INVOCATION_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE, + "Calls of createNonRecursive()"), + INVOCATION_DELETE(CommonStatisticNames.OP_DELETE, + "Calls of delete()"), INVOCATION_EXISTS(CommonStatisticNames.OP_EXISTS, "Calls of exists()"), INVOCATION_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS, @@ -67,6 +73,8 @@ public enum Statistic { "Calls of listStatus()"), INVOCATION_MKDIRS(CommonStatisticNames.OP_MKDIRS, "Calls of mkdirs()"), + INVOCATION_OPEN(CommonStatisticNames.OP_OPEN, + "Calls of open()"), INVOCATION_RENAME(CommonStatisticNames.OP_RENAME, "Calls of rename()"), OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 182990cf304..e92ce78b101 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.junit.Test; import java.io.IOException; +import java.io.InputStream; /** * Test s3a performance metrics register and output. @@ -34,7 +35,7 @@ public class ITestS3AMetrics extends AbstractS3ATestBase { public void testMetricsRegister() throws IOException, InterruptedException { S3AFileSystem fs = getFileSystem(); - Path dest = new Path("newfile1"); + Path dest = path("testMetricsRegister"); ContractTestUtils.touch(fs, dest); String targetMetricSource = "S3AMetrics1" + "-" + fs.getBucket(); @@ -48,4 +49,25 @@ public class ITestS3AMetrics extends AbstractS3ATestBase { assertEquals("Metrics system should report single file created event", 1, fileCreated.value()); } + + @Test + public void testStreamStatistics() throws IOException { + S3AFileSystem fs = getFileSystem(); + Path file = path("testStreamStatistics"); + byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes(); + ContractTestUtils.createFile(fs, file, false, data); + + try (InputStream inputStream = fs.open(file)) { + while (inputStream.read(data) != -1) { + LOG.debug("Read batch of data from input stream..."); + } + } + + MutableCounterLong read = (MutableCounterLong) + fs.getInstrumentation().getRegistry() + .get(Statistic.STREAM_SEEK_BYTES_READ.getSymbol()); + assertEquals("Stream statistics were not merged", 26, read.value()); + } + + }