HDFS-10648. Expose Balancer metrics through Metrics2 (#3427)
This commit is contained in:
parent
8f4456d4a1
commit
b1431813d0
|
@ -40,6 +40,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -226,6 +228,7 @@ public class Balancer {
|
||||||
private final long maxSizeToMove;
|
private final long maxSizeToMove;
|
||||||
private final long defaultBlockSize;
|
private final long defaultBlockSize;
|
||||||
private final boolean sortTopNodes;
|
private final boolean sortTopNodes;
|
||||||
|
private final BalancerMetrics metrics;
|
||||||
|
|
||||||
// all data node lists
|
// all data node lists
|
||||||
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
||||||
|
@ -357,6 +360,7 @@ public class Balancer {
|
||||||
this.defaultBlockSize = getLongBytes(conf,
|
this.defaultBlockSize = getLongBytes(conf,
|
||||||
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
||||||
|
this.metrics = BalancerMetrics.create(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
|
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
|
||||||
|
@ -454,6 +458,8 @@ public class Balancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
logUtilizationCollections();
|
logUtilizationCollections();
|
||||||
|
metrics.setNumOfOverUtilizedNodes(overUtilized.size());
|
||||||
|
metrics.setNumOfUnderUtilizedNodes(underUtilized.size());
|
||||||
|
|
||||||
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
|
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
|
||||||
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
|
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
|
||||||
|
@ -636,7 +642,11 @@ public class Balancer {
|
||||||
this.belowAvgUtilized.clear();
|
this.belowAvgUtilized.clear();
|
||||||
this.underUtilized.clear();
|
this.underUtilized.clear();
|
||||||
this.policy.reset();
|
this.policy.reset();
|
||||||
dispatcher.reset(conf);;
|
dispatcher.reset(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
NameNodeConnector getNnc() {
|
||||||
|
return nnc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static class Result {
|
static class Result {
|
||||||
|
@ -710,8 +720,10 @@ public class Balancer {
|
||||||
/** Run an iteration for all datanodes. */
|
/** Run an iteration for all datanodes. */
|
||||||
Result runOneIteration() {
|
Result runOneIteration() {
|
||||||
try {
|
try {
|
||||||
|
metrics.setIterateRunning(true);
|
||||||
final List<DatanodeStorageReport> reports = dispatcher.init();
|
final List<DatanodeStorageReport> reports = dispatcher.init();
|
||||||
final long bytesLeftToMove = init(reports);
|
final long bytesLeftToMove = init(reports);
|
||||||
|
metrics.setBytesLeftToMove(bytesLeftToMove);
|
||||||
if (bytesLeftToMove == 0) {
|
if (bytesLeftToMove == 0) {
|
||||||
return newResult(ExitStatus.SUCCESS, bytesLeftToMove, 0);
|
return newResult(ExitStatus.SUCCESS, bytesLeftToMove, 0);
|
||||||
} else {
|
} else {
|
||||||
|
@ -766,6 +778,7 @@ public class Balancer {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return newResult(ExitStatus.INTERRUPTED);
|
return newResult(ExitStatus.INTERRUPTED);
|
||||||
} finally {
|
} finally {
|
||||||
|
metrics.setIterateRunning(false);
|
||||||
dispatcher.shutdownNow();
|
dispatcher.shutdownNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -848,6 +861,10 @@ public class Balancer {
|
||||||
static int run(Collection<URI> namenodes, Collection<String> nsIds,
|
static int run(Collection<URI> namenodes, Collection<String> nsIds,
|
||||||
final BalancerParameters p, Configuration conf)
|
final BalancerParameters p, Configuration conf)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
DefaultMetricsSystem.initialize("Balancer");
|
||||||
|
JvmMetrics.create("Balancer",
|
||||||
|
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
|
||||||
|
DefaultMetricsSystem.instance());
|
||||||
if (!p.getRunAsService()) {
|
if (!p.getRunAsService()) {
|
||||||
return doBalance(namenodes, nsIds, p, conf);
|
return doBalance(namenodes, nsIds, p, conf);
|
||||||
}
|
}
|
||||||
|
@ -893,6 +910,8 @@ public class Balancer {
|
||||||
time2Str(scheduleInterval));
|
time2Str(scheduleInterval));
|
||||||
Thread.sleep(scheduleInterval);
|
Thread.sleep(scheduleInterval);
|
||||||
}
|
}
|
||||||
|
DefaultMetricsSystem.shutdown();
|
||||||
|
|
||||||
// normal stop
|
// normal stop
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics for individual Balancer.
|
||||||
|
*/
|
||||||
|
@Metrics(about="Balancer metrics", context="dfs")
|
||||||
|
final class BalancerMetrics {
|
||||||
|
|
||||||
|
private final Balancer balancer;
|
||||||
|
|
||||||
|
@Metric("If a balancer iterate is running")
|
||||||
|
private MutableGaugeInt iterateRunning;
|
||||||
|
|
||||||
|
@Metric("Bytes left to move to make cluster balanced")
|
||||||
|
private MutableGaugeLong bytesLeftToMove;
|
||||||
|
|
||||||
|
@Metric("Number of under utilized nodes")
|
||||||
|
private MutableGaugeInt numOfUnderUtilizedNodes;
|
||||||
|
|
||||||
|
@Metric("Number of over utilized nodes")
|
||||||
|
private MutableGaugeInt numOfOverUtilizedNodes;
|
||||||
|
|
||||||
|
private BalancerMetrics(Balancer b) {
|
||||||
|
this.balancer = b;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BalancerMetrics create(Balancer b) {
|
||||||
|
BalancerMetrics m = new BalancerMetrics(b);
|
||||||
|
return DefaultMetricsSystem.instance().register(
|
||||||
|
m.getName(), null, m);
|
||||||
|
}
|
||||||
|
|
||||||
|
String getName() {
|
||||||
|
return "Balancer-" + balancer.getNnc().getBlockpoolID();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Metric("Bytes that already moved in current doBalance run.")
|
||||||
|
public long getBytesMovedInCurrentRun() {
|
||||||
|
return balancer.getNnc().getBytesMoved().get();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setIterateRunning(boolean iterateRunning) {
|
||||||
|
this.iterateRunning.set(iterateRunning ? 1 : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setBytesLeftToMove(long bytesLeftToMove) {
|
||||||
|
this.bytesLeftToMove.set(bytesLeftToMove);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setNumOfUnderUtilizedNodes(int numOfUnderUtilizedNodes) {
|
||||||
|
this.numOfUnderUtilizedNodes.set(numOfUnderUtilizedNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setNumOfOverUtilizedNodes(int numOfOverUtilizedNodes) {
|
||||||
|
this.numOfOverUtilizedNodes.set(numOfOverUtilizedNodes);
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,7 +28,10 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.MetricsAsserts;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -129,8 +132,24 @@ public class TestBalancerService {
|
||||||
newBalancerService(conf, new String[] {"-asService"});
|
newBalancerService(conf, new String[] {"-asService"});
|
||||||
balancerThread.start();
|
balancerThread.start();
|
||||||
|
|
||||||
|
// Check metrics
|
||||||
|
final String balancerMetricsName = "Balancer-"
|
||||||
|
+ cluster.getNameNode(0).getNamesystem().getBlockPoolId();
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
// Validate metrics after metrics system initiated.
|
||||||
|
if (DefaultMetricsSystem.instance().getSource(balancerMetricsName) == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
MetricsRecordBuilder rb = MetricsAsserts.getMetrics(balancerMetricsName);
|
||||||
|
return rb != null && MetricsAsserts.getLongGauge("BytesLeftToMove", rb) > 0;
|
||||||
|
}, 100, 2000);
|
||||||
|
|
||||||
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||||
cluster, BalancerParameters.DEFAULT);
|
cluster, BalancerParameters.DEFAULT);
|
||||||
|
|
||||||
|
MetricsRecordBuilder rb = MetricsAsserts.getMetrics(balancerMetricsName);
|
||||||
|
assertTrue(MetricsAsserts.getLongGauge("BytesMovedInCurrentRun", rb) > 0);
|
||||||
|
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
cluster.triggerBlockReports();
|
cluster.triggerBlockReports();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue