HADOOP-15161. s3a: Stream and common statistics missing from metrics
Contributed by Sean Mackrory
This commit is contained in:
parent
f725b9e267
commit
b62a5ece95
|
@ -688,7 +688,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
*/
|
||||
public FSDataInputStream open(Path f, int bufferSize)
|
||||
throws IOException {
|
||||
checkNotClosed();
|
||||
entryPoint(INVOCATION_OPEN);
|
||||
LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy);
|
||||
final FileStatus fileStatus = getFileStatus(f);
|
||||
if (fileStatus.isDirectory()) {
|
||||
|
@ -732,7 +732,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
checkNotClosed();
|
||||
entryPoint(INVOCATION_CREATE);
|
||||
final Path path = qualify(f);
|
||||
String key = pathToKey(path);
|
||||
FileStatus status = null;
|
||||
|
@ -799,6 +799,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
short replication,
|
||||
long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
entryPoint(INVOCATION_CREATE_NON_RECURSIVE);
|
||||
Path parent = path.getParent();
|
||||
if (parent != null) {
|
||||
// expect this to raise an exception if there is no parent
|
||||
|
@ -1683,7 +1684,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
@Retries.RetryTranslated
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
try {
|
||||
checkNotClosed();
|
||||
entryPoint(INVOCATION_DELETE);
|
||||
return innerDelete(innerGetFileStatus(f, true), recursive);
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.debug("Couldn't delete {} - does not exist", f);
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.metrics2.MetricsSource;
|
|||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
|
@ -129,8 +128,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
private final MutableCounterLong numberOfFakeDirectoryDeletes;
|
||||
private final MutableCounterLong numberOfDirectoriesCreated;
|
||||
private final MutableCounterLong numberOfDirectoriesDeleted;
|
||||
private final Map<String, MutableCounterLong> streamMetrics =
|
||||
new HashMap<>(30);
|
||||
|
||||
/** Instantiate this without caring whether or not S3Guard is enabled. */
|
||||
private final S3GuardInstrumentation s3GuardInstrumentation
|
||||
|
@ -138,6 +135,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
|
||||
private static final Statistic[] COUNTERS_TO_CREATE = {
|
||||
INVOCATION_COPY_FROM_LOCAL_FILE,
|
||||
INVOCATION_CREATE,
|
||||
INVOCATION_CREATE_NON_RECURSIVE,
|
||||
INVOCATION_DELETE,
|
||||
INVOCATION_EXISTS,
|
||||
INVOCATION_GET_FILE_STATUS,
|
||||
INVOCATION_GLOB_STATUS,
|
||||
|
@ -147,6 +147,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
INVOCATION_LIST_LOCATED_STATUS,
|
||||
INVOCATION_LIST_STATUS,
|
||||
INVOCATION_MKDIRS,
|
||||
INVOCATION_OPEN,
|
||||
INVOCATION_RENAME,
|
||||
OBJECT_COPY_REQUESTS,
|
||||
OBJECT_DELETE_REQUESTS,
|
||||
|
@ -196,27 +197,27 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
"A unique identifier for the instance",
|
||||
fileSystemInstanceId.toString());
|
||||
registry.tag(METRIC_TAG_BUCKET, "Hostname from the FS URL", name.getHost());
|
||||
streamOpenOperations = streamCounter(STREAM_OPENED);
|
||||
streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
|
||||
streamClosed = streamCounter(STREAM_CLOSED);
|
||||
streamAborted = streamCounter(STREAM_ABORTED);
|
||||
streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
|
||||
streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
|
||||
streamOpenOperations = counter(STREAM_OPENED);
|
||||
streamCloseOperations = counter(STREAM_CLOSE_OPERATIONS);
|
||||
streamClosed = counter(STREAM_CLOSED);
|
||||
streamAborted = counter(STREAM_ABORTED);
|
||||
streamSeekOperations = counter(STREAM_SEEK_OPERATIONS);
|
||||
streamReadExceptions = counter(STREAM_READ_EXCEPTIONS);
|
||||
streamForwardSeekOperations =
|
||||
streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
|
||||
counter(STREAM_FORWARD_SEEK_OPERATIONS);
|
||||
streamBackwardSeekOperations =
|
||||
streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
|
||||
streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
|
||||
counter(STREAM_BACKWARD_SEEK_OPERATIONS);
|
||||
streamBytesSkippedOnSeek = counter(STREAM_SEEK_BYTES_SKIPPED);
|
||||
streamBytesBackwardsOnSeek =
|
||||
streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
|
||||
streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
|
||||
streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
|
||||
counter(STREAM_SEEK_BYTES_BACKWARDS);
|
||||
streamBytesRead = counter(STREAM_SEEK_BYTES_READ);
|
||||
streamReadOperations = counter(STREAM_READ_OPERATIONS);
|
||||
streamReadFullyOperations =
|
||||
streamCounter(STREAM_READ_FULLY_OPERATIONS);
|
||||
counter(STREAM_READ_FULLY_OPERATIONS);
|
||||
streamReadsIncomplete =
|
||||
streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
|
||||
streamBytesReadInClose = streamCounter(STREAM_CLOSE_BYTES_READ);
|
||||
streamBytesDiscardedInAbort = streamCounter(STREAM_ABORT_BYTES_DISCARDED);
|
||||
counter(STREAM_READ_OPERATIONS_INCOMPLETE);
|
||||
streamBytesReadInClose = counter(STREAM_CLOSE_BYTES_READ);
|
||||
streamBytesDiscardedInAbort = counter(STREAM_ABORT_BYTES_DISCARDED);
|
||||
numberOfFilesCreated = counter(FILES_CREATED);
|
||||
numberOfFilesCopied = counter(FILES_COPIED);
|
||||
bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
|
||||
|
@ -282,20 +283,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
return registry.newCounter(name, desc, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a counter in the stream map: these are unregistered in the public
|
||||
* metrics.
|
||||
* @param name counter name
|
||||
* @param desc counter description
|
||||
* @return a new counter
|
||||
*/
|
||||
protected final MutableCounterLong streamCounter(String name, String desc) {
|
||||
MutableCounterLong counter = new MutableCounterLong(
|
||||
Interns.info(name, desc), 0L);
|
||||
streamMetrics.put(name, counter);
|
||||
return counter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a counter in the registry.
|
||||
* @param op statistic to count
|
||||
|
@ -305,16 +292,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
return counter(op.getSymbol(), op.getDescription());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a counter in the stream map: these are unregistered in the public
|
||||
* metrics.
|
||||
* @param op statistic to count
|
||||
* @return a new counter
|
||||
*/
|
||||
protected final MutableCounterLong streamCounter(Statistic op) {
|
||||
return streamCounter(op.getSymbol(), op.getDescription());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a gauge in the registry.
|
||||
* @param name name gauge name
|
||||
|
@ -365,11 +342,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
prefix,
|
||||
separator, suffix);
|
||||
registry.snapshot(metricBuilder, all);
|
||||
for (Map.Entry<String, MutableCounterLong> entry:
|
||||
streamMetrics.entrySet()) {
|
||||
metricBuilder.tuple(entry.getKey(),
|
||||
Long.toString(entry.getValue().value()));
|
||||
}
|
||||
return metricBuilder.toString();
|
||||
}
|
||||
|
||||
|
@ -447,9 +419,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
*/
|
||||
public MutableMetric lookupMetric(String name) {
|
||||
MutableMetric metric = getRegistry().get(name);
|
||||
if (metric == null) {
|
||||
metric = streamMetrics.get(name);
|
||||
}
|
||||
return metric;
|
||||
}
|
||||
|
||||
|
@ -1141,10 +1110,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
public Map<String, Long> toMap() {
|
||||
MetricsToMap metricBuilder = new MetricsToMap(null);
|
||||
registry.snapshot(metricBuilder, true);
|
||||
for (Map.Entry<String, MutableCounterLong> entry :
|
||||
streamMetrics.entrySet()) {
|
||||
metricBuilder.tuple(entry.getKey(), entry.getValue().value());
|
||||
}
|
||||
return metricBuilder.getMap();
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,12 @@ public enum Statistic {
|
|||
IGNORED_ERRORS("ignored_errors", "Errors caught and ignored"),
|
||||
INVOCATION_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE,
|
||||
"Calls of copyFromLocalFile()"),
|
||||
INVOCATION_CREATE(CommonStatisticNames.OP_CREATE,
|
||||
"Calls of create()"),
|
||||
INVOCATION_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE,
|
||||
"Calls of createNonRecursive()"),
|
||||
INVOCATION_DELETE(CommonStatisticNames.OP_DELETE,
|
||||
"Calls of delete()"),
|
||||
INVOCATION_EXISTS(CommonStatisticNames.OP_EXISTS,
|
||||
"Calls of exists()"),
|
||||
INVOCATION_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS,
|
||||
|
@ -67,6 +73,8 @@ public enum Statistic {
|
|||
"Calls of listStatus()"),
|
||||
INVOCATION_MKDIRS(CommonStatisticNames.OP_MKDIRS,
|
||||
"Calls of mkdirs()"),
|
||||
INVOCATION_OPEN(CommonStatisticNames.OP_OPEN,
|
||||
"Calls of open()"),
|
||||
INVOCATION_RENAME(CommonStatisticNames.OP_RENAME,
|
||||
"Calls of rename()"),
|
||||
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* Test s3a performance metrics register and output.
|
||||
|
@ -34,7 +35,7 @@ public class ITestS3AMetrics extends AbstractS3ATestBase {
|
|||
public void testMetricsRegister()
|
||||
throws IOException, InterruptedException {
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Path dest = new Path("newfile1");
|
||||
Path dest = path("testMetricsRegister");
|
||||
ContractTestUtils.touch(fs, dest);
|
||||
|
||||
String targetMetricSource = "S3AMetrics1" + "-" + fs.getBucket();
|
||||
|
@ -48,4 +49,25 @@ public class ITestS3AMetrics extends AbstractS3ATestBase {
|
|||
assertEquals("Metrics system should report single file created event",
|
||||
1, fileCreated.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamStatistics() throws IOException {
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Path file = path("testStreamStatistics");
|
||||
byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes();
|
||||
ContractTestUtils.createFile(fs, file, false, data);
|
||||
|
||||
try (InputStream inputStream = fs.open(file)) {
|
||||
while (inputStream.read(data) != -1) {
|
||||
LOG.debug("Read batch of data from input stream...");
|
||||
}
|
||||
}
|
||||
|
||||
MutableCounterLong read = (MutableCounterLong)
|
||||
fs.getInstrumentation().getRegistry()
|
||||
.get(Statistic.STREAM_SEEK_BYTES_READ.getSymbol());
|
||||
assertEquals("Stream statistics were not merged", 26, read.value());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue