HBASE-21680 Port HBASE-20194 (Basic Replication WebUI - Master) and HBASE-20193 (Basic Replication Web UI - Regionserver) to branch-1

HBASE-20193 Basic Replication Web UI - Regionserver

HBASE-20194 Basic Replication WebUI - Master
This commit is contained in:
Andrew Purtell 2019-01-18 14:58:34 -08:00
parent 3a11028cdf
commit 1fd76bd35a
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
18 changed files with 750 additions and 38 deletions

View File

@ -43,6 +43,7 @@ org.apache.hadoop.hbase.ServerLoad;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.client.Admin;
org.apache.hadoop.hbase.client.HConnectionManager;
org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
org.apache.hadoop.hbase.HRegionInfo;
org.apache.hadoop.hbase.master.RegionState;
org.apache.hadoop.hbase.HTableDescriptor;
@ -52,6 +53,9 @@ org.apache.hadoop.hbase.tool.Canary;
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
org.apache.hadoop.hbase.master.DeadServer;
org.apache.hadoop.hbase.protobuf.ProtobufUtil;
org.apache.hadoop.hbase.replication.ReplicationPeer;
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
org.apache.hadoop.hbase.security.access.AccessControlLists;
org.apache.hadoop.hbase.quotas.QuotaUtil;
@ -232,6 +236,10 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
</div>
</div>
</section>
<section>
<h2>Peers</h2>
<& peerConfigs &>
</section>
<%if master.getAssignmentManager() != null %>
<& AssignmentManagerStatusTmpl; assignmentManager=master.getAssignmentManager()&>
</%if>
@ -554,3 +562,37 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
</table>
</%if>
</%def>
<%def peerConfigs>
<%java>
Map<String, ReplicationPeerConfig> peers = null;
try (ReplicationAdmin admin = new ReplicationAdmin(master.getConfiguration())) {
peers = admin.listPeerConfigs();
}
</%java>
<table class="table table-striped">
<tr>
<th>Peer Id</th>
<th>Cluster Key</th>
<th>Bandwidth</th>
<th>Table Cfs</th>
</tr>
<%if (peers != null && peers.size() > 0)%>
<%for Map.Entry<String, ReplicationPeerConfig> peer : peers.entrySet() %>
<%java>
String peerId = peer.getKey();
ReplicationPeerConfig peerConfig = peer.getValue();
</%java>
<tr>
<td><% peerId %></td>
<td><% peerConfig.getClusterKey() %></td>
<td><% peerConfig.getBandwidth() == 0? "UNLIMITED" : StringUtils.humanReadableInt(peerConfig.getBandwidth()) %></td>
<td>
<% peerConfig.getTableCFsMap() == null ? "" : ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap()).replaceAll(";", "; ") %>
</td>
</tr>
</%for>
</%if>
<tr><td>Total: <% (peers != null) ? peers.size() : 0 %></td></tr>
</table>
</%def>

View File

@ -26,6 +26,8 @@ HMaster master;
<%import>
java.util.*;
org.apache.hadoop.hbase.master.HMaster;
org.apache.hadoop.hbase.procedure2.util.StringUtils;
org.apache.hadoop.hbase.replication.ReplicationLoadSource;
org.apache.hadoop.hbase.ServerLoad;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.client.HBaseAdmin;
@ -33,6 +35,7 @@ HMaster master;
org.apache.hadoop.hbase.HTableDescriptor;
org.apache.hadoop.hbase.HBaseConfiguration;
org.apache.hadoop.hbase.util.VersionInfo;
org.apache.hadoop.hbase.util.Pair;
org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
</%import>
@ -50,7 +53,8 @@ Arrays.sort(serverNames);
<li class=""><a href="#tab_memoryStats" data-toggle="tab">Memory</a></li>
<li class=""><a href="#tab_requestStats" data-toggle="tab">Requests</a></li>
<li class=""><a href="#tab_storeStats" data-toggle="tab">Storefiles</a></li>
<li class=""><a href="#tab_compactStas" data-toggle="tab">Compactions</a></li>
<li class=""><a href="#tab_compactStats" data-toggle="tab">Compactions</a></li>
<li class=""><a href="#tab_replicationStats" data-toggle="tab">Replications</a></li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_baseStats">
@ -65,9 +69,12 @@ Arrays.sort(serverNames);
<div class="tab-pane" id="tab_storeStats">
<& storeStats; serverNames = serverNames; &>
</div>
<div class="tab-pane" id="tab_compactStas">
<div class="tab-pane" id="tab_compactStats">
<& compactionStats; serverNames = serverNames; &>
</div>
<div class="tab-pane" id="tab_replicationStats">
<& replicationStats; serverNames = serverNames; &>
</div>
</div>
</div>
@ -117,7 +124,7 @@ Arrays.sort(serverNames);
long startcode = serverName.getStartcode();
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% new Date(startcode) %></td>
<td><% TraditionalBinaryPrefix.long2String(lastContact, "s", 1) %></td>
<td><% version %></td>
@ -164,7 +171,7 @@ for (ServerName serverName: serverNames) {
if (sl != null) {
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% TraditionalBinaryPrefix.long2String(sl.getUsedHeapMB()
* TraditionalBinaryPrefix.MEGA.value, "B", 1) %></td>
<td><% TraditionalBinaryPrefix.long2String(sl.getMaxHeapMB()
@ -207,7 +214,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName);
if (sl != null) {
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% String.format("%.0f", sl.getRequestsPerSecond()) %></td>
<td><% sl.getReadRequestsCount() %></td>
<td><% sl.getWriteRequestsCount() %></td>
@ -249,7 +256,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName);
if (sl != null) {
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% sl.getStores() %></td>
<td><% sl.getStorefiles() %></td>
<td><% TraditionalBinaryPrefix.long2String(
@ -300,7 +307,7 @@ if (sl.getTotalCompactingKVs() > 0) {
}
</%java>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td><% sl.getTotalCompactingKVs() %></td>
<td><% sl.getCurrentCompactedKVs() %></td>
<td><% sl.getTotalCompactingKVs() - sl.getCurrentCompactedKVs() %></td>
@ -318,11 +325,72 @@ if (sl.getTotalCompactingKVs() > 0) {
</table>
</%def>
<%def replicationStats>
<%args>
ServerName [] serverNames;
</%args>
<%java>
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap
= master.getReplicationLoad(serverNames);
List<String> peers = null;
if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0){
peers = new ArrayList<>(replicationLoadSourceMap.keySet());
Collections.sort(peers);
}
</%java>
<%if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0) %>
<div class="tabbable">
<ul class="nav nav-tabs">
<%java>
String active = "active";
for (String peer : peers){
</%java>
<li class=<% active %>><a href="#tab_<% peer %>" data-toggle="tab">Peer <% peer %></a> </li>
<%java>
active = "";
}
</%java>
</ul>
<div class="tab-content">
<%java>
active = "active";
for (String peer : peers){
</%java>
<div class="tab-pane <% active %>" id="tab_<% peer %>">
<table class="table table-striped">
<tr>
<th>Server</th>
<th>AgeOfLastShippedOp</th>
<th>SizeOfLogQueue</th>
<th>ReplicationLag</th>
</tr>
<%for Pair<ServerName, ReplicationLoadSource> pair: replicationLoadSourceMap.get(peer) %>
<tr>
<td><& serverNameLink; serverName=pair.getFirst(); &></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
<td><% pair.getSecond().getSizeOfLogQueue() %></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
</tr>
</%for>
</table>
</div>
<%java>
active = "";
}
</%java>
</div>
</div>
<%else>
<p>No Peers Metrics</p>
</%if>
</%def>
<%def serverNameLink>
<%args>
ServerName serverName;
ServerLoad serverLoad;
</%args>
<%java>
int infoPort = master.getRegionServerInfoPort(serverName);
@ -341,7 +409,7 @@ if (sl.getTotalCompactingKVs() > 0) {
ServerName serverName;
</%args>
<tr>
<td><& serverNameLink; serverName=serverName; serverLoad = null; &></td>
<td><& serverNameLink; serverName=serverName; &></td>
<td></td>
<td></td>
<td></td>

View File

@ -122,6 +122,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
<& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
</section>
<section>
<h2>Replication Status</h1>
<& ReplicationStatusTmpl; regionServer = regionServer; &>
</section>
<section>
<h2>Software Attributes</h2>

View File

@ -0,0 +1,105 @@
<%doc>
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
</%doc>
<%args>
HRegionServer regionServer;
</%args>
<%import>
java.util.*;
java.util.Map.Entry;
org.apache.hadoop.hbase.procedure2.util.StringUtils;
org.apache.hadoop.hbase.regionserver.HRegionServer;
org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
</%import>
<%java>
Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
</%java>
<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
<div class="tabbable">
<ul class="nav nav-pills">
<li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
<li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_currentLog">
<& currentLog; metrics = walGroupsReplicationStatus; &>
</div>
<div class="tab-pane" id="tab_replicationDelay">
<& replicationDelay; metrics = walGroupsReplicationStatus; &>
</div>
</div>
</div>
<p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
<%else>
<p>No Replication Metrics for Peers</p>
</%if>
<%def currentLog>
<%args>
Map<String, ReplicationStatus> metrics;
</%args>
<table class="table table-striped">
<tr>
<th>PeerId</th>
<th>WalGroup</th>
<th>Current Log</th>
<th>Size</th>
<th>Queue Size</th>
<th>Offset</th>
</tr>
<%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
<tr>
<td><% entry.getValue().getPeerId() %></td>
<td><% entry.getValue().getWalGroup() %></td>
<td><% entry.getValue().getCurrentPath() %> </td>
<td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
<td><% entry.getValue().getQueueSize() %></td>
<td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
</tr>
</%for>
</table>
</%def>
<%def replicationDelay>
<%args>
Map<String, ReplicationStatus> metrics;
</%args>
<table class="table table-striped">
<tr>
<th>PeerId</th>
<th>WalGroup</th>
<th>Current Log</th>
<th>Last Shipped Age</th>
<th>Replication Delay</th>
</tr>
<%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
<tr>
<td><% entry.getValue().getPeerId() %></td>
<td><% entry.getValue().getWalGroup() %></td>
<td><% entry.getValue().getCurrentPath() %> </td>
<td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
<td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
</tr>
</%for>
</table>
</%def>

View File

@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
@ -3292,4 +3293,24 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
public LoadBalancer getLoadBalancer() {
return balancer;
}
public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
getReplicationLoad(ServerName[] serverNames) {
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
new HashMap<>();
for (ServerName serverName : serverNames) {
List<ReplicationLoadSource> replicationLoadSources =
getServerManager().getLoad(serverName).getReplicationLoadSourceList();
for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
List<Pair<ServerName, ReplicationLoadSource>> list =
replicationLoadSourceMap.get(replicationLoadSource.getPeerID());
if (list == null) {
list = new ArrayList<Pair<ServerName, ReplicationLoadSource>>();
replicationLoadSourceMap.put(replicationLoadSource.getPeerID(), list);
}
list.add(new Pair<>(serverName, replicationLoadSource));
}
}
return replicationLoadSourceMap;
}
}

View File

@ -164,6 +164,8 @@ import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@ -2832,6 +2834,20 @@ public class HRegionServer extends HasThread implements
return service;
}
public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
if(!this.isOnline()){
return walGroupsReplicationStatus;
}
List<ReplicationSourceInterface> allSources = new ArrayList<>();
allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
for(ReplicationSourceInterface source: allSources){
walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
}
return walGroupsReplicationStatus;
}
/**
* Utility for constructing an instance of the passed HRegionServer class.
*

View File

@ -57,5 +57,5 @@ public interface ReplicationService {
/**
* Refresh and Get ReplicationLoad
*/
public ReplicationLoad refreshAndGetReplicationLoad();
ReplicationLoad refreshAndGetReplicationLoad();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
/**
* A source for a replication stream has to expose this service.
@ -33,4 +34,9 @@ public interface ReplicationSourceService extends ReplicationService {
* observe log rolls and log archival events.
*/
WALActionsListener getWALActionsListener();
/**
* Returns the replication manager
*/
ReplicationSourceManager getReplicationManager();
}

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -37,10 +35,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource implements BaseSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private Map<String, Long> lastTimestamps = new HashMap<>();
private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
private long lastHFileRefsQueueSize = 0;
private String id;
@ -87,7 +84,8 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
this.lastTimeStamps.put(walGroup, timestamp);
this.ageOfLastShippedOp.put(walGroup, age);
this.lastTimestamps.put(walGroup, timestamp);
}
/**
@ -104,15 +102,33 @@ public class MetricsSource implements BaseSource {
}
this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
}
/**
* get the last timestamp of given wal group. If the walGroup is null, return 0.
* @param walGroup which group we are getting
* @return timeStamp
*/
public long getLastTimeStampOfWalGroup(String walGroup) {
return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
}
/**
* get age of last shipped op of given wal group. If the walGroup is null, return 0
* @param walGroup which group we are getting
* @return age
*/
public long getAgeofLastShippedOp(String walGroup) {
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
}
/**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.
* @param walGroupId id of the group to update
*/
public void refreshAgeOfLastShippedOp(String walGroupId) {
Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
Long lastTimestamp = this.lastTimestamps.get(walGroupId);
if (lastTimestamp == null) {
this.lastTimeStamps.put(walGroupId, 0L);
this.lastTimestamps.put(walGroupId, 0L);
lastTimestamp = 0L;
}
if (lastTimestamp > 0) {
@ -204,7 +220,8 @@ public class MetricsSource implements BaseSource {
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimeStamps.clear();
lastTimestamps.clear();
ageOfLastShippedOp.clear();
lastHFileRefsQueueSize = 0;
}
@ -230,7 +247,7 @@ public class MetricsSource implements BaseSource {
*/
public long getTimeStampOfLastShippedOp() {
long lastTimestamp = 0L;
for (long ts : lastTimeStamps.values()) {
for (long ts : lastTimestamps.values()) {
if (ts > lastTimestamp) {
lastTimestamp = ts;
}

View File

@ -79,19 +79,8 @@ public class ReplicationLoad {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
long replicationLag;
long timePassedAfterLastShippedOp =
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
if (sizeOfLogQueue != 0) {
// err on the large side
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
replicationLag = ageOfLastShippedOp; // last shipped happen recently
} else {
// last shipped may happen last night,
// so NO real lag although ageOfLastShippedOp is non-zero
replicationLag = 0;
}
long replicationLag =
calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
if (rLoadSource != null) {
@ -116,6 +105,24 @@ public class ReplicationLoad {
replicationLoadSourceMap.values());
}
static long calculateReplicationDelay(long ageOfLastShippedOp,
long timeStampOfLastShippedOp, int sizeOfLogQueue) {
long replicationLag;
long timePassedAfterLastShippedOp =
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
if (sizeOfLogQueue > 1) {
// err on the large side
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
replicationLag = ageOfLastShippedOp; // last shipped happen recently
} else {
// last shipped may happen last night,
// so NO real lag although ageOfLastShippedOp is non-zero
replicationLag = 0;
}
return replicationLag;
}
/**
* sourceToString
* @return a string contains sourceReplicationLoad information

View File

@ -29,6 +29,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@ -501,6 +502,38 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
@Override
public Map<String, ReplicationStatus> getWalGroupStatus() {
Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
String walGroupId = worker.getWalGroupId();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = worker.getCurrentPath();
try {
fileSize = fs.getContentSummary(currentPath).getLength();
} catch (IOException e) {
fileSize = -1;
}
ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
statusBuilder.withPeerId(this.getPeerClusterId())
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
.withCurrentPosition(worker.getCurrentPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
sourceReplicationStatus.put(this.getPeerClusterId() + "=>" + walGroupId,
statusBuilder.build());
}
return sourceReplicationStatus;
}
// This thread reads entries from a queue and ships them.
// Entries are placed onto the queue by ReplicationSourceWALReaderThread
public class ReplicationSourceShipperThread extends Thread {
@ -525,6 +558,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.source = source;
}
public String getWalGroupId() {
return walGroupId;
}
@Override
public void run() {
setWorkerState(WorkerState.RUNNING);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -127,4 +128,9 @@ public interface ReplicationSourceInterface {
*/
MetricsSource getSourceMetrics();
/**
* get the stat of replication for each wal group.
* @return stat of replication
*/
Map<String, ReplicationStatus> getWalGroupStatus();
}

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -251,7 +252,11 @@ public class ReplicationSourceWALReaderThread extends Thread {
return entryBatchQueue.take();
}
public long getEntrySizeIncludeBulkLoad(Entry entry) {
public WALEntryBatch poll(long timeout) throws InterruptedException {
return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
@ -470,9 +475,5 @@ public class ReplicationSourceWALReaderThread extends Thread {
private void incrementHeapSize(long increment) {
heapSize += increment;
}
private void setLastPosition(String region, Long sequenceId) {
getLastSeqIds().put(region, sequenceId);
}
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationStatus {
private final String peerId;
private final String walGroup;
private final Path currentPath;
private final int queueSize;
private final long ageOfLastShippedOp;
private final long replicationDelay;
private final long currentPosition;
private final long fileSize;
private ReplicationStatus(ReplicationStatusBuilder builder) {
this.peerId = builder.peerId;
this.walGroup = builder.walGroup;
this.currentPath = builder.currentPath;
this.queueSize = builder.queueSize;
this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
this.replicationDelay = builder.replicationDelay;
this.currentPosition = builder.currentPosition;
this.fileSize = builder.fileSize;
}
public long getCurrentPosition() {
return currentPosition;
}
public long getFileSize() {
return fileSize;
}
public String getPeerId() {
return peerId;
}
public String getWalGroup() {
return walGroup;
}
public int getQueueSize() {
return queueSize;
}
public long getAgeOfLastShippedOp() {
return ageOfLastShippedOp;
}
public long getReplicationDelay() {
return replicationDelay;
}
public Path getCurrentPath() {
return currentPath;
}
public static ReplicationStatusBuilder newBuilder() {
return new ReplicationStatusBuilder();
}
public static class ReplicationStatusBuilder {
private String peerId = "UNKNOWN";
private String walGroup = "UNKNOWN";
private Path currentPath = new Path("UNKNOWN");
private int queueSize = -1;
private long ageOfLastShippedOp = -1;
private long replicationDelay = -1;
private long currentPosition = -1;
private long fileSize = -1;
public ReplicationStatusBuilder withPeerId(String peerId) {
this.peerId = peerId;
return this;
}
public ReplicationStatusBuilder withFileSize(long fileSize) {
this.fileSize = fileSize;
return this;
}
public ReplicationStatusBuilder withWalGroup(String walGroup) {
this.walGroup = walGroup;
return this;
}
public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
this.currentPath = currentPath;
return this;
}
public ReplicationStatusBuilder withQueueSize(int queueSize) {
this.queueSize = queueSize;
return this;
}
public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
this.ageOfLastShippedOp = ageOfLastShippedOp;
return this;
}
public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
this.replicationDelay = replicationDelay;
return this;
}
public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
this.currentPosition = currentPosition;
return this;
}
public ReplicationStatus build() {
return new ReplicationStatus(this);
}
}
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ MasterTests.class, MediumTests.class })
public class TestGetReplicationLoad {
private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class);
private static HBaseTestingUtility TEST_UTIL;
private static MiniHBaseCluster cluster;
private static HMaster master;
private static ReplicationAdmin admin;
private static final String ID_1 = "1";
private static final String ID_2 = "2";
private static final String KEY_1 = "127.0.0.1:2181:/hbase";
private static final String KEY_2 = "127.0.0.1:2181:/hbase2";
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager csm)
throws IOException, KeeperException, InterruptedException {
super(conf, csm);
}
@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
// do nothing
}
}
@BeforeClass
public static void startCluster() throws Exception {
LOG.info("Starting cluster");
TEST_UTIL = new HBaseTestingUtility();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null);
cluster = TEST_UTIL.getHBaseCluster();
LOG.info("Waiting for active/ready master");
cluster.waitForActiveAndReadyMaster();
master = cluster.getMaster();
admin = new ReplicationAdmin(conf);
}
@AfterClass
public static void after() throws Exception {
if (admin != null) {
admin.close();
}
if (TEST_UTIL != null) {
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testGetReplicationMetrics() throws Exception {
String peer1 = "test1", peer2 = "test2";
long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4;
int sizeOfLogQueue = 5;
RegionServerStatusProtos.RegionServerReportRequest.Builder request =
RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
ServerName serverName = cluster.getMaster(0).getServerName();
request.setServer(ProtobufUtil.toServerName(serverName));
ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource
.newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
.setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
.setSizeOfLogQueue(sizeOfLogQueue).build();
ClusterStatusProtos.ReplicationLoadSource rload2 =
ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
.setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1)
.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
.setSizeOfLogQueue(sizeOfLogQueue + 1).build();
ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
.addReplLoadSource(rload1).addReplLoadSource(rload2).build();
request.setLoad(sl);
ReplicationPeerConfig peerConfig_1 = new ReplicationPeerConfig();
peerConfig_1.setClusterKey(KEY_1);
ReplicationPeerConfig peerConfig_2 = new ReplicationPeerConfig();
peerConfig_2.setClusterKey(KEY_2);
admin.addPeer(ID_1, peerConfig_1);
admin.addPeer(ID_2, peerConfig_2);
master.getMasterRpcServices().regionServerReport(null, request.build());
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad =
master.getReplicationLoad(new ServerName[] { serverName });
assertEquals("peer size ", 2, replicationLoad.size());
assertEquals("load size ", 1, replicationLoad.get(peer1).size());
assertEquals("log queue size of peer1", sizeOfLogQueue,
replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue());
assertEquals("replication lag of peer2", replicationLag + 1,
replicationLoad.get(peer2).get(0).getSecond().getReplicationLag());
master.stopMaster();
}
}

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.util.Pair;
/**
@ -106,4 +109,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public MetricsSource getSourceMetrics() {
return metrics;
}
@Override
public Map<String, ReplicationStatus> getWalGroupStatus() {
return new HashMap<>();
}
}

View File

@ -77,6 +77,7 @@ public class TestReplicationBase {
protected static final byte[] famName = Bytes.toBytes("f");
protected static final byte[] row = Bytes.toBytes("row");
protected static final byte[] noRepfamName = Bytes.toBytes("norep");
protected static final String PEER_ID2 = "2";
/**
* @throws java.lang.Exception

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationMetricsforUI extends TestReplicationBase {
private static final byte[] qualName = Bytes.toBytes("q");
@Test
public void testReplicationMetrics() throws Exception {
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
Put p = new Put(Bytes.toBytes("starter"));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
htable1.put(p);
// make sure replication done
while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
Thread.sleep(500);
}
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
Thread.sleep(5000);
HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
Assert.assertEquals("metric size ", 1, metrics.size());
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
long pos = metric.getValue().getCurrentPosition();
// Semantics are a bit different in branch-1: If not started, pos will be -1
if (pos == -1) {
pos = 0;
}
Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
}
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("" + Integer.toString(i)));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
htable1.put(p);
}
while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
.size() == 0) {
Thread.sleep(500);
}
rs = utility1.getRSForFirstRegionInTable(tableName);
metrics = rs.getWalGroupsReplicationStatus();
Path lastPath = null;
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
lastPath = metric.getValue().getCurrentPath();
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
Assert.assertTrue("age of Last Shipped Op should be > 0 ",
metric.getValue().getAgeOfLastShippedOp() > 0);
long pos = metric.getValue().getCurrentPosition();
Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
}
hbaseAdmin.rollWALWriter(rs.getServerName());
p = new Put(Bytes.toBytes("trigger"));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
htable1.put(p);
// make sure replication rolled to a new log
while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
Thread.sleep(500);
}
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
Thread.sleep(5000);
metrics = rs.getWalGroupsReplicationStatus();
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
long pos = metric.getValue().getCurrentPosition();
Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
}
}
}
}