diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 610e89914e5..5b76f72258f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -60,3 +60,5 @@ HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd) HDFS-3900. QJM: avoid validating log segments on log rolls (todd) HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync (todd) + +HDFS-3899. QJM: Add client-side metrics (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 840cfb41c53..f627b8a2e9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -85,6 +85,8 @@ public class IPCLoggerChannel implements AsyncLogger { private final NamespaceInfo nsInfo; private int httpPort = -1; + private final IPCLoggerChannelMetrics metrics; + /** * The number of bytes of edits data still in the queue. */ @@ -154,6 +156,8 @@ public class IPCLoggerChannel implements AsyncLogger { executor = MoreExecutors.listeningDecorator( createExecutor()); + + metrics = IPCLoggerChannelMetrics.create(this); } @Override @@ -261,6 +265,10 @@ public class IPCLoggerChannel implements AsyncLogger { public synchronized int getQueuedEditsSize() { return queuedEditsSizeBytes; } + + public InetSocketAddress getRemoteAddress() { + return addr; + } /** * @return true if the server has gotten out of sync from the client, @@ -330,6 +338,7 @@ public class IPCLoggerChannel implements AsyncLogger { public Void call() throws IOException { throwIfOutOfSync(); + long rpcSendTimeNanos = System.nanoTime(); try { getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data); @@ -343,6 +352,14 @@ public class IPCLoggerChannel implements AsyncLogger { outOfSync = true; } throw e; + } finally { + long now = System.nanoTime(); + long rpcTime = TimeUnit.MICROSECONDS.convert( + now - rpcSendTimeNanos, TimeUnit.NANOSECONDS); + long endToEndTime = TimeUnit.MICROSECONDS.convert( + now - submitNanos, TimeUnit.NANOSECONDS); + metrics.addWriteEndToEndLatency(endToEndTime); + metrics.addWriteRpcLatency(rpcTime); } synchronized (IPCLoggerChannel.this) { highestAckedTxId = firstTxnId + numTxns - 1; @@ -547,11 +564,11 @@ public class IPCLoggerChannel implements AsyncLogger { } } - private long getLagTxns() { + public synchronized long getLagTxns() { return Math.max(committedTxId - highestAckedTxId, 0); } - private long getLagTimeMillis() { + public synchronized long getLagTimeMillis() { return TimeUnit.MILLISECONDS.convert( Math.max(lastCommitNanos - lastAckNanos, 0), TimeUnit.NANOSECONDS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java new file mode 100644 index 00000000000..fde6b998176 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; + +import com.google.common.collect.Maps; + +/** + * The metrics for a journal from the writer's perspective. + */ +@Metrics(about="Journal client metrics", context="dfs") +class IPCLoggerChannelMetrics { + final MetricsRegistry registry = new MetricsRegistry("NameNode"); + + private volatile IPCLoggerChannel ch; + + private final MutableQuantiles[] writeEndToEndLatencyQuantiles; + private final MutableQuantiles[] writeRpcLatencyQuantiles; + + + /** + * In the case of the NN transitioning between states, edit logs are closed + * and reopened. Thus, the IPCLoggerChannel instance that writes to a + * given JournalNode may change over the lifetime of the process. + * However, metrics2 doesn't have a function to unregister a set of metrics + * and fails if a new metrics class is registered with the same name + * as the existing one. Hence, we have to maintain our own registry + * ("multiton") here, so that we have exactly one metrics instance + * per JournalNode, and switch out the pointer to the underlying + * IPCLoggerChannel instance. + */ + private static final Map REGISTRY = + Maps.newHashMap(); + + private IPCLoggerChannelMetrics(IPCLoggerChannel ch) { + this.ch = ch; + + Configuration conf = new HdfsConfiguration(); + int[] intervals = + conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY); + if (intervals != null) { + writeEndToEndLatencyQuantiles = new MutableQuantiles[intervals.length]; + writeRpcLatencyQuantiles = new MutableQuantiles[intervals.length]; + for (int i = 0; i < writeEndToEndLatencyQuantiles.length; i++) { + int interval = intervals[i]; + writeEndToEndLatencyQuantiles[i] = registry.newQuantiles( + "writesE2E" + interval + "s", + "End-to-end time for write operations", "ops", "LatencyMicros", interval); + writeRpcLatencyQuantiles[i] = registry.newQuantiles( + "writesRpc" + interval + "s", + "RPC RTT for write operations", "ops", "LatencyMicros", interval); + } + } else { + writeEndToEndLatencyQuantiles = null; + writeRpcLatencyQuantiles = null; + } + } + + private void setChannel(IPCLoggerChannel ch) { + assert ch.getRemoteAddress().equals(this.ch.getRemoteAddress()); + this.ch = ch; + } + + static IPCLoggerChannelMetrics create(IPCLoggerChannel ch) { + String name = getName(ch); + synchronized (REGISTRY) { + IPCLoggerChannelMetrics m = REGISTRY.get(name); + if (m != null) { + m.setChannel(ch); + } else { + m = new IPCLoggerChannelMetrics(ch); + DefaultMetricsSystem.instance().register(name, null, m); + REGISTRY.put(name, m); + } + return m; + } + } + + private static String getName(IPCLoggerChannel ch) { + InetSocketAddress addr = ch.getRemoteAddress(); + String addrStr = addr.getAddress().getHostAddress(); + + // IPv6 addresses have colons, which aren't allowed as part of + // MBean names. Replace with '.' + addrStr = addrStr.replace(':', '.'); + + return "IPCLoggerChannel-" + addrStr + + "-" + addr.getPort(); + } + + @Metric("Is the remote logger out of sync with the quorum") + public String isOutOfSync() { + return Boolean.toString(ch.isOutOfSync()); + } + + @Metric("The number of transactions the remote log is lagging behind the " + + "quorum") + public long getCurrentLagTxns() { + return ch.getLagTxns(); + } + + @Metric("The number of milliseconds the remote log is lagging behind the " + + "quorum") + public long getLagTimeMillis() { + return ch.getLagTimeMillis(); + } + + @Metric("The number of bytes of pending data to be sent to the remote node") + public int getQueuedEditsSize() { + return ch.getQueuedEditsSize(); + } + + public void addWriteEndToEndLatency(long micros) { + if (writeEndToEndLatencyQuantiles != null) { + for (MutableQuantiles q : writeEndToEndLatencyQuantiles) { + q.add(micros); + } + } + } + + public void addWriteRpcLatency(long micros) { + if (writeRpcLatencyQuantiles != null) { + for (MutableQuantiles q : writeRpcLatencyQuantiles) { + q.add(micros); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java index 746216852c3..f6d87c7e92b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java @@ -27,6 +27,10 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableQuantiles; +/** + * The server-side metrics for a journal from the JournalNode's + * perspective. + */ @Metrics(about="Journal metrics", context="dfs") class JournalMetrics { final MetricsRegistry registry = new MetricsRegistry("JournalNode");