HBASE-20194 Basic Replication WebUI - Master

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
jingyuntian 2018-06-25 10:23:00 +08:00 committed by zhangduo
parent b5163bc9f6
commit bd40cba8dd
4 changed files with 284 additions and 9 deletions

View File

@ -31,8 +31,11 @@ AssignmentManager assignmentManager = null;
<%import>
java.util.*;
java.io.IOException;
org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
org.apache.hadoop.hbase.client.RegionInfo;
org.apache.hadoop.hbase.client.TableDescriptor;
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
org.apache.hadoop.hbase.HBaseConfiguration;
org.apache.hadoop.hbase.HConstants;
org.apache.hadoop.hbase.HTableDescriptor;
@ -259,6 +262,10 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
</div>
</div>
</section>
<section>
<h2>Peers</h2>
<& peerConfigs &>
</section>
<%if master.getAssignmentManager() != null %>
<& AssignmentManagerStatusTmpl; assignmentManager=master.getAssignmentManager()&>
</%if>
@ -569,3 +576,55 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
</table>
</%if>
</%def>
<%def peerConfigs>
<%java>
List<ReplicationPeerDescription> peers = null;
if (master.getReplicationPeerManager() != null) {
peers = master.getReplicationPeerManager().listPeers(null);
}
</%java>
<table class="table table-striped">
<tr>
<th>Peer Id</th>
<th>Cluster Key</th>
<th>State</th>
<th>IsSerial</th>
<th>Bandwidth</th>
<th>ReplicateAll</th>
<th>Namespaces</th>
<th>Exclude Namespaces</th>
<th>Table Cfs</th>
<th>Exclude Table Cfs</th>
</tr>
<%if (peers != null && peers.size() > 0)%>
<%for ReplicationPeerDescription peer : peers %>
<%java>
String peerId = peer.getPeerId();
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
</%java>
<tr>
<td><% peerId %></td>
<td><% peerConfig.getClusterKey() %></td>
<td><% peer.isEnabled() ? "ENABLED" : "DISABLED" %></td>
<td><% peerConfig.isSerial() %></td>
<td><% peerConfig.getBandwidth() == 0? "UNLIMITED" : StringUtils.humanReadableInt(peerConfig.getBandwidth()) %></td>
<td><% peerConfig.replicateAllUserTables() %></td>
<td>
<% peerConfig.getNamespaces() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getNamespaces()).replaceAll(";", "; ") %>
</td>
<td>
<% peerConfig.getExcludeNamespaces() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getExcludeNamespaces()).replaceAll(";", "; ") %>
</td>
<td>
<% peerConfig.getTableCFsMap() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap()).replaceAll(";", "; ") %>
</td>
<td>
<% peerConfig.getExcludeTableCFsMap() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getExcludeTableCFsMap()).replaceAll(";", "; ") %>
</td>
</tr>
</%for>
</%if>
<tr><td>Total: <% (peers != null) ? peers.size() : 0 %></td></tr>
</table>
</%def>

View File

@ -26,11 +26,14 @@ HMaster master;
<%import>
java.util.*;
org.apache.hadoop.hbase.master.HMaster;
org.apache.hadoop.hbase.procedure2.util.StringUtils;
org.apache.hadoop.hbase.replication.ReplicationLoadSource;
org.apache.hadoop.hbase.RegionMetrics;
org.apache.hadoop.hbase.ServerMetrics;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.Size;
org.apache.hadoop.hbase.util.VersionInfo;
org.apache.hadoop.hbase.util.Pair;
org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
</%import>
@ -48,7 +51,8 @@ Arrays.sort(serverNames);
<li class=""><a href="#tab_memoryStats" data-toggle="tab">Memory</a></li>
<li class=""><a href="#tab_requestStats" data-toggle="tab">Requests</a></li>
<li class=""><a href="#tab_storeStats" data-toggle="tab">Storefiles</a></li>
<li class=""><a href="#tab_compactStas" data-toggle="tab">Compactions</a></li>
<li class=""><a href="#tab_compactStats" data-toggle="tab">Compactions</a></li>
<li class=""><a href="#tab_replicationStats" data-toggle="tab">Replications</a></li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_baseStats">
@ -63,9 +67,12 @@ Arrays.sort(serverNames);
<div class="tab-pane" id="tab_storeStats">
<& storeStats; serverNames = serverNames; &>
</div>
<div class="tab-pane" id="tab_compactStas">
<div class="tab-pane" id="tab_compactStats">
<& compactionStats; serverNames = serverNames; &>
</div>
<div class="tab-pane" id="tab_replicationStats">
<& replicationStats; serverNames = serverNames; &>
</div>
</div>
</div>
@ -111,7 +118,7 @@ Arrays.sort(serverNames);
long startcode = serverName.getStartcode();
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% new Date(startcode) %></td>
<td><% TraditionalBinaryPrefix.long2String(lastContact, "s", 1) %></td>
<td><% version %></td>
@ -158,7 +165,7 @@ for (ServerName serverName: serverNames) {
}
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% TraditionalBinaryPrefix.long2String((long) sl.getUsedHeapSize().get(Size.Unit.MEGABYTE)
* TraditionalBinaryPrefix.MEGA.value, "B", 1) %></td>
<td><% TraditionalBinaryPrefix.long2String((long) sl.getMaxHeapSize().get(Size.Unit.MEGABYTE)
@ -206,7 +213,7 @@ if (sl != null) {
}
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% sl.getRequestCountPerSecond() %></td>
<td><% readRequestCount %></td>
<td><% filteredReadRequestCount %></td>
@ -259,7 +266,7 @@ if (sl != null) {
}
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% storeCount %></td>
<td><% storeFileCount %></td>
<td><% TraditionalBinaryPrefix.long2String(
@ -312,7 +319,7 @@ if (totalCompactingCells > 0) {
}
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% totalCompactingCells %></td>
<td><% totalCompactedCells %></td>
<td><% totalCompactingCells - totalCompactedCells %></td>
@ -329,11 +336,74 @@ if (totalCompactingCells > 0) {
</table>
</%def>
<%def replicationStats>
<%args>
ServerName [] serverNames;
</%args>
<%java>
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap
= master.getReplicationLoad(serverNames);
List<String> peers = null;
if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0){
peers = new ArrayList<>(replicationLoadSourceMap.keySet());
Collections.sort(peers);
}
</%java>
<%if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0) %>
<div class="tabbable">
<ul class="nav nav-tabs">
<%java>
String active = "active";
for (String peer : peers){
</%java>
<li class=<% active %>><a href="#tab_<% peer %>" data-toggle="tab">Peer <% peer %></a> </li>
<%java>
active = "";
}
</%java>
</ul>
<div class="tab-content">
<%java>
active = "active";
for (String peer : peers){
</%java>
<div class="tab-pane <% active %>" id="tab_<% peer %>">
<table class="table table-striped">
<tr>
<th>Server</th>
<th>AgeOfLastShippedOp</th>
<th>SizeOfLogQueue</th>
<th>ReplicationLag</th>
</tr>
<%for Pair<ServerName, ReplicationLoadSource> pair: replicationLoadSourceMap.get(peer) %>
<tr>
<td><& serverNameLink; serverName=pair.getFirst(); &></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
<td><% pair.getSecond().getSizeOfLogQueue() %></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
</tr>
</%for>
</table>
</div>
<%java>
active = "";
}
</%java>
</div>
</div>
<%else>
<p>No Peers Metrics</p>
</%if>
</%def>
<%def serverNameLink>
<%args>
ServerName serverName;
ServerMetrics serverLoad;
</%args>
<%java>
int infoPort = master.getRegionServerInfoPort(serverName);
@ -352,7 +422,7 @@ if (totalCompactingCells > 0) {
ServerName serverName;
</%args>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = null; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td></td>
<td></td>
<td></td>

View File

@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -3688,6 +3689,32 @@ public class HMaster extends HRegionServer implements MasterServices {
return replicationPeerManager;
}
public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
getReplicationLoad(ServerName[] serverNames) {
List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
if (peerList == null) {
return null;
}
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
new HashMap<>(peerList.size());
peerList.stream()
.forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList()));
for (ServerName serverName : serverNames) {
List<ReplicationLoadSource> replicationLoadSources =
getServerManager().getLoad(serverName).getReplicationLoadSourceList();
for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
replicationLoadSourceMap.get(replicationLoadSource.getPeerID())
.add(new Pair<>(serverName, replicationLoadSource));
}
}
for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) {
if (loads.size() > 0) {
loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag()));
}
}
return replicationLoadSourceMap;
}
/**
* This method modifies the master's configuration in order to inject replication-related features
*/

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
@Category({ MasterTests.class, MediumTests.class })
public class TestGetReplicationLoad {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestGetReplicationLoad.class);
private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class);
private static MiniHBaseCluster cluster;
private static HMaster master;
private static HBaseTestingUtility TEST_UTIL;
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
super(conf);
}
@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
// do nothing
}
}
@BeforeClass
public static void startCluster() throws Exception {
LOG.info("Starting cluster");
TEST_UTIL = new HBaseTestingUtility();
TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null);
cluster = TEST_UTIL.getHBaseCluster();
LOG.info("Waiting for active/ready master");
cluster.waitForActiveAndReadyMaster();
master = cluster.getMaster();
}
@AfterClass
public static void after() throws Exception {
if (TEST_UTIL != null) {
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testGetReplicationMetrics() throws Exception {
String peer1 = "test1", peer2 = "test2";
long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4;
int sizeOfLogQueue = 5;
RegionServerStatusProtos.RegionServerReportRequest.Builder request =
RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
ServerName serverName = cluster.getMaster(0).getServerName();
request.setServer(ProtobufUtil.toServerName(serverName));
ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource
.newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
.setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
.setSizeOfLogQueue(sizeOfLogQueue).build();
ClusterStatusProtos.ReplicationLoadSource rload2 =
ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
.setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1)
.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
.setSizeOfLogQueue(sizeOfLogQueue + 1).build();
ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
.addReplLoadSource(rload1).addReplLoadSource(rload2).build();
request.setLoad(sl);
master.getReplicationPeerManager().addPeer(peer1,
ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true);
master.getReplicationPeerManager().addPeer(peer2,
ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true);
master.getMasterRpcServices().regionServerReport(null, request.build());
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad =
master.getReplicationLoad(new ServerName[] { serverName });
assertEquals("peer size ", 2, replicationLoad.size());
assertEquals("load size ", 1, replicationLoad.get(peer1).size());
assertEquals("log queue size of peer1", sizeOfLogQueue,
replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue());
assertEquals("replication lag of peer2", replicationLag + 1,
replicationLoad.get(peer2).get(0).getSecond().getReplicationLag());
master.stopMaster();
}
}