HBASE-20193 Basic Replication Web UI - Regionserver
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
380350d5bc
commit
66ad9fdef8
|
@ -383,7 +383,7 @@ if (totalCompactingCells > 0) {
|
||||||
<td><& serverNameLink; serverName=pair.getFirst(); &></td>
|
<td><& serverNameLink; serverName=pair.getFirst(); &></td>
|
||||||
<td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
|
<td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
|
||||||
<td><% pair.getSecond().getSizeOfLogQueue() %></td>
|
<td><% pair.getSecond().getSizeOfLogQueue() %></td>
|
||||||
<td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
|
<td><% pair.getSecond().getReplicationLag() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
|
||||||
</tr>
|
</tr>
|
||||||
</%for>
|
</%for>
|
||||||
</table>
|
</table>
|
||||||
|
@ -393,6 +393,7 @@ if (totalCompactingCells > 0) {
|
||||||
}
|
}
|
||||||
</%java>
|
</%java>
|
||||||
</div>
|
</div>
|
||||||
|
<p>If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.</p>
|
||||||
</div>
|
</div>
|
||||||
<%else>
|
<%else>
|
||||||
<p>No Peers Metrics</p>
|
<p>No Peers Metrics</p>
|
||||||
|
|
|
@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||||
<& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
|
<& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
|
||||||
</section>
|
</section>
|
||||||
|
|
||||||
|
<section>
|
||||||
|
<h2>Replication Status</h1>
|
||||||
|
<& ReplicationStatusTmpl; regionServer = regionServer; &>
|
||||||
|
</section>
|
||||||
|
|
||||||
<section>
|
<section>
|
||||||
<h2>Software Attributes</h2>
|
<h2>Software Attributes</h2>
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
<%doc>
|
||||||
|
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
or more contributor license agreements. See the NOTICE file
|
||||||
|
distributed with this work for additional information
|
||||||
|
regarding copyright ownership. The ASF licenses this file
|
||||||
|
to you under the Apache License, Version 2.0 (the
|
||||||
|
"License"); you may not use this file except in compliance
|
||||||
|
with the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
</%doc>
|
||||||
|
<%args>
|
||||||
|
HRegionServer regionServer;
|
||||||
|
</%args>
|
||||||
|
<%import>
|
||||||
|
java.util.*;
|
||||||
|
java.util.Map.Entry;
|
||||||
|
org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
|
org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
||||||
|
</%import>
|
||||||
|
|
||||||
|
<%java>
|
||||||
|
Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
|
||||||
|
</%java>
|
||||||
|
|
||||||
|
<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
|
||||||
|
|
||||||
|
<div class="tabbable">
|
||||||
|
<ul class="nav nav-pills">
|
||||||
|
<li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
|
||||||
|
<li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
|
||||||
|
</ul>
|
||||||
|
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
|
||||||
|
<div class="tab-pane active" id="tab_currentLog">
|
||||||
|
<& currentLog; metrics = walGroupsReplicationStatus; &>
|
||||||
|
</div>
|
||||||
|
<div class="tab-pane" id="tab_replicationDelay">
|
||||||
|
<& replicationDelay; metrics = walGroupsReplicationStatus; &>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
|
||||||
|
If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
|
||||||
|
|
||||||
|
<%else>
|
||||||
|
<p>No Replication Metrics for Peers</p>
|
||||||
|
</%if>
|
||||||
|
|
||||||
|
<%def currentLog>
|
||||||
|
<%args>
|
||||||
|
Map<String, ReplicationStatus> metrics;
|
||||||
|
</%args>
|
||||||
|
<table class="table table-striped">
|
||||||
|
<tr>
|
||||||
|
<th>PeerId</th>
|
||||||
|
<th>WalGroup</th>
|
||||||
|
<th>Current Log</th>
|
||||||
|
<th>Size</th>
|
||||||
|
<th>Queue Size</th>
|
||||||
|
<th>Offset</th>
|
||||||
|
</tr>
|
||||||
|
<%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
|
||||||
|
<tr>
|
||||||
|
<td><% entry.getValue().getPeerId() %></td>
|
||||||
|
<td><% entry.getValue().getWalGroup() %></td>
|
||||||
|
<td><% entry.getValue().getCurrentPath() %> </td>
|
||||||
|
<td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
|
||||||
|
<td><% entry.getValue().getQueueSize() %></td>
|
||||||
|
<td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
|
||||||
|
</tr>
|
||||||
|
</%for>
|
||||||
|
</table>
|
||||||
|
</%def>
|
||||||
|
|
||||||
|
<%def replicationDelay>
|
||||||
|
<%args>
|
||||||
|
Map<String, ReplicationStatus> metrics;
|
||||||
|
</%args>
|
||||||
|
<table class="table table-striped">
|
||||||
|
<tr>
|
||||||
|
<th>PeerId</th>
|
||||||
|
<th>WalGroup</th>
|
||||||
|
<th>Current Log</th>
|
||||||
|
<th>Last Shipped Age</th>
|
||||||
|
<th>Replication Delay</th>
|
||||||
|
</tr>
|
||||||
|
<%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
|
||||||
|
<tr>
|
||||||
|
<td><% entry.getValue().getPeerId() %></td>
|
||||||
|
<td><% entry.getValue().getWalGroup() %></td>
|
||||||
|
<td><% entry.getValue().getCurrentPath() %> </td>
|
||||||
|
<td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
|
||||||
|
<td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
|
||||||
|
</tr>
|
||||||
|
</%for>
|
||||||
|
</table>
|
||||||
|
</%def>
|
|
@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
||||||
import org.apache.hadoop.hbase.security.Superusers;
|
import org.apache.hadoop.hbase.security.Superusers;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
@ -2984,6 +2986,20 @@ public class HRegionServer extends HasThread implements
|
||||||
return service;
|
return service;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
|
||||||
|
Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
|
||||||
|
if(!this.isOnline()){
|
||||||
|
return walGroupsReplicationStatus;
|
||||||
|
}
|
||||||
|
List<ReplicationSourceInterface> allSources = new ArrayList<>();
|
||||||
|
allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
|
||||||
|
allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
|
||||||
|
for(ReplicationSourceInterface source: allSources){
|
||||||
|
walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
|
||||||
|
}
|
||||||
|
return walGroupsReplicationStatus;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility for constructing an instance of the passed HRegionServer class.
|
* Utility for constructing an instance of the passed HRegionServer class.
|
||||||
*
|
*
|
||||||
|
|
|
@ -54,5 +54,5 @@ public interface ReplicationService {
|
||||||
/**
|
/**
|
||||||
* Refresh and Get ReplicationLoad
|
* Refresh and Get ReplicationLoad
|
||||||
*/
|
*/
|
||||||
public ReplicationLoad refreshAndGetReplicationLoad();
|
ReplicationLoad refreshAndGetReplicationLoad();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
|
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
|
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@ -43,4 +44,9 @@ public interface ReplicationSourceService extends ReplicationService {
|
||||||
* Return the replication peers.
|
* Return the replication peers.
|
||||||
*/
|
*/
|
||||||
ReplicationPeers getReplicationPeers();
|
ReplicationPeers getReplicationPeers();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the replication manager
|
||||||
|
*/
|
||||||
|
ReplicationSourceManager getReplicationManager();
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ public class MetricsSource implements BaseSource {
|
||||||
|
|
||||||
// tracks last shipped timestamp for each wal group
|
// tracks last shipped timestamp for each wal group
|
||||||
private Map<String, Long> lastTimestamps = new HashMap<>();
|
private Map<String, Long> lastTimestamps = new HashMap<>();
|
||||||
|
private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
|
||||||
private long lastHFileRefsQueueSize = 0;
|
private long lastHFileRefsQueueSize = 0;
|
||||||
private String id;
|
private String id;
|
||||||
|
|
||||||
|
@ -87,6 +88,7 @@ public class MetricsSource implements BaseSource {
|
||||||
long age = EnvironmentEdgeManager.currentTime() - timestamp;
|
long age = EnvironmentEdgeManager.currentTime() - timestamp;
|
||||||
singleSourceSource.setLastShippedAge(age);
|
singleSourceSource.setLastShippedAge(age);
|
||||||
globalSourceSource.setLastShippedAge(age);
|
globalSourceSource.setLastShippedAge(age);
|
||||||
|
this.ageOfLastShippedOp.put(walGroup, age);
|
||||||
this.lastTimestamps.put(walGroup, timestamp);
|
this.lastTimestamps.put(walGroup, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,6 +105,24 @@ public class MetricsSource implements BaseSource {
|
||||||
.setLastShippedAge(age);
|
.setLastShippedAge(age);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the last timestamp of given wal group. If the walGroup is null, return 0.
|
||||||
|
* @param walGroup which group we are getting
|
||||||
|
* @return timeStamp
|
||||||
|
*/
|
||||||
|
public long getLastTimeStampOfWalGroup(String walGroup) {
|
||||||
|
return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get age of last shipped op of given wal group. If the walGroup is null, return 0
|
||||||
|
* @param walGroup which group we are getting
|
||||||
|
* @return age
|
||||||
|
*/
|
||||||
|
public long getAgeofLastShippedOp(String walGroup) {
|
||||||
|
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
|
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
|
||||||
* when replication fails and need to keep that metric accurate.
|
* when replication fails and need to keep that metric accurate.
|
||||||
|
|
|
@ -79,19 +79,8 @@ public class ReplicationLoad {
|
||||||
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
|
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
|
||||||
int sizeOfLogQueue = sm.getSizeOfLogQueue();
|
int sizeOfLogQueue = sm.getSizeOfLogQueue();
|
||||||
long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
|
long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
|
||||||
long replicationLag;
|
long replicationLag =
|
||||||
long timePassedAfterLastShippedOp =
|
calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
|
||||||
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
|
|
||||||
if (sizeOfLogQueue != 0) {
|
|
||||||
// err on the large side
|
|
||||||
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
|
|
||||||
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
|
|
||||||
replicationLag = ageOfLastShippedOp; // last shipped happen recently
|
|
||||||
} else {
|
|
||||||
// last shipped may happen last night,
|
|
||||||
// so NO real lag although ageOfLastShippedOp is non-zero
|
|
||||||
replicationLag = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
|
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
|
||||||
if (rLoadSource != null) {
|
if (rLoadSource != null) {
|
||||||
|
@ -114,6 +103,29 @@ public class ReplicationLoad {
|
||||||
this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
|
this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static long calculateReplicationDelay(long ageOfLastShippedOp,
|
||||||
|
long timeStampOfLastShippedOp, int sizeOfLogQueue) {
|
||||||
|
long replicationLag;
|
||||||
|
long timePassedAfterLastShippedOp;
|
||||||
|
if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
} else {
|
||||||
|
timePassedAfterLastShippedOp =
|
||||||
|
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
|
||||||
|
}
|
||||||
|
if (sizeOfLogQueue > 1) {
|
||||||
|
// err on the large side
|
||||||
|
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
|
||||||
|
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
|
||||||
|
replicationLag = ageOfLastShippedOp; // last shipped happen recently
|
||||||
|
} else {
|
||||||
|
// last shipped may happen last night,
|
||||||
|
// so NO real lag although ageOfLastShippedOp is non-zero
|
||||||
|
replicationLag = 0;
|
||||||
|
}
|
||||||
|
return replicationLag;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* sourceToString
|
* sourceToString
|
||||||
* @return a string contains sourceReplicationLoad information
|
* @return a string contains sourceReplicationLoad information
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -25,6 +28,7 @@ import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
@ -312,6 +316,50 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, ReplicationStatus> getWalGroupStatus() {
|
||||||
|
Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
|
||||||
|
long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
|
||||||
|
for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
|
||||||
|
String walGroupId = walGroupShipper.getKey();
|
||||||
|
ReplicationSourceShipper shipper = walGroupShipper.getValue();
|
||||||
|
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
|
||||||
|
ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
|
||||||
|
int queueSize = queues.get(walGroupId).size();
|
||||||
|
replicationDelay =
|
||||||
|
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
|
||||||
|
Path currentPath = shipper.getCurrentPath();
|
||||||
|
try {
|
||||||
|
fileSize = getFileSize(currentPath);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
|
||||||
|
fileSize = -1;
|
||||||
|
}
|
||||||
|
ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
|
||||||
|
statusBuilder.withPeerId(this.getPeerId())
|
||||||
|
.withQueueSize(queueSize)
|
||||||
|
.withWalGroup(walGroupId)
|
||||||
|
.withCurrentPath(currentPath)
|
||||||
|
.withCurrentPosition(shipper.getCurrentPosition())
|
||||||
|
.withFileSize(fileSize)
|
||||||
|
.withAgeOfLastShippedOp(ageOfLastShippedOp)
|
||||||
|
.withReplicationDelay(replicationDelay);
|
||||||
|
sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
|
||||||
|
}
|
||||||
|
return sourceReplicationStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getFileSize(Path currentPath) throws IOException {
|
||||||
|
long fileSize;
|
||||||
|
try {
|
||||||
|
fileSize = fs.getContentSummary(currentPath).getLength();
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
currentPath = getArchivedLogPath(currentPath, conf);
|
||||||
|
fileSize = fs.getContentSummary(currentPath).getLength();
|
||||||
|
}
|
||||||
|
return fileSize;
|
||||||
|
}
|
||||||
|
|
||||||
protected ReplicationSourceShipper createNewShipper(String walGroupId,
|
protected ReplicationSourceShipper createNewShipper(String walGroupId,
|
||||||
PriorityBlockingQueue<Path> queue) {
|
PriorityBlockingQueue<Path> queue) {
|
||||||
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
|
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -182,6 +184,14 @@ public interface ReplicationSourceInterface {
|
||||||
*/
|
*/
|
||||||
ServerName getServerWALsBelongTo();
|
ServerName getServerWALsBelongTo();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the stat of replication for each wal group.
|
||||||
|
* @return stat of replication
|
||||||
|
*/
|
||||||
|
default Map<String, ReplicationStatus> getWalGroupStatus() {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return whether this is a replication source for recovery.
|
* @return whether this is a replication source for recovery.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -72,6 +72,8 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
protected final long sleepForRetries;
|
protected final long sleepForRetries;
|
||||||
// Maximum number of retries before taking bold actions
|
// Maximum number of retries before taking bold actions
|
||||||
protected final int maxRetriesMultiplier;
|
protected final int maxRetriesMultiplier;
|
||||||
|
private final int DEFAULT_TIMEOUT = 20000;
|
||||||
|
private final int getEntriesTimeout;
|
||||||
|
|
||||||
public ReplicationSourceShipper(Configuration conf, String walGroupId,
|
public ReplicationSourceShipper(Configuration conf, String walGroupId,
|
||||||
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
|
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
|
||||||
|
@ -83,6 +85,8 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
|
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
|
||||||
this.maxRetriesMultiplier =
|
this.maxRetriesMultiplier =
|
||||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||||
|
this.getEntriesTimeout =
|
||||||
|
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -98,7 +102,13 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
WALEntryBatch entryBatch = entryReader.take();
|
WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
|
||||||
|
if (entryBatch == null) {
|
||||||
|
// since there is no logs need to replicate, we refresh the ageOfLastShippedOp
|
||||||
|
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
|
||||||
|
walGroupId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// the NO_MORE_DATA instance has no path so do not call shipEdits
|
// the NO_MORE_DATA instance has no path so do not call shipEdits
|
||||||
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
|
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
|
||||||
noMoreData();
|
noMoreData();
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -300,6 +301,10 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
return entryBatchQueue.take();
|
return entryBatchQueue.take();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public WALEntryBatch poll(long timeout) throws InterruptedException {
|
||||||
|
return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
private long getEntrySizeIncludeBulkLoad(Entry entry) {
|
private long getEntrySizeIncludeBulkLoad(Entry entry) {
|
||||||
WALEdit edit = entry.getEdit();
|
WALEdit edit = entry.getEdit();
|
||||||
WALKey key = entry.getKey();
|
WALKey key = entry.getKey();
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class ReplicationStatus {
|
||||||
|
private final String peerId;
|
||||||
|
private final String walGroup;
|
||||||
|
private final Path currentPath;
|
||||||
|
private final int queueSize;
|
||||||
|
private final long ageOfLastShippedOp;
|
||||||
|
private final long replicationDelay;
|
||||||
|
private final long currentPosition;
|
||||||
|
private final long fileSize;
|
||||||
|
|
||||||
|
private ReplicationStatus(ReplicationStatusBuilder builder) {
|
||||||
|
this.peerId = builder.peerId;
|
||||||
|
this.walGroup = builder.walGroup;
|
||||||
|
this.currentPath = builder.currentPath;
|
||||||
|
this.queueSize = builder.queueSize;
|
||||||
|
this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
|
||||||
|
this.replicationDelay = builder.replicationDelay;
|
||||||
|
this.currentPosition = builder.currentPosition;
|
||||||
|
this.fileSize = builder.fileSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCurrentPosition() {
|
||||||
|
return currentPosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getFileSize() {
|
||||||
|
return fileSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPeerId() {
|
||||||
|
return peerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWalGroup() {
|
||||||
|
return walGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getQueueSize() {
|
||||||
|
return queueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAgeOfLastShippedOp() {
|
||||||
|
return ageOfLastShippedOp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getReplicationDelay() {
|
||||||
|
return replicationDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getCurrentPath() {
|
||||||
|
return currentPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ReplicationStatusBuilder newBuilder() {
|
||||||
|
return new ReplicationStatusBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ReplicationStatusBuilder {
|
||||||
|
private String peerId = "UNKNOWN";
|
||||||
|
private String walGroup = "UNKNOWN";
|
||||||
|
private Path currentPath = new Path("UNKNOWN");
|
||||||
|
private int queueSize = -1;
|
||||||
|
private long ageOfLastShippedOp = -1;
|
||||||
|
private long replicationDelay = -1;
|
||||||
|
private long currentPosition = -1;
|
||||||
|
private long fileSize = -1;
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withPeerId(String peerId) {
|
||||||
|
this.peerId = peerId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withFileSize(long fileSize) {
|
||||||
|
this.fileSize = fileSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withWalGroup(String walGroup) {
|
||||||
|
this.walGroup = walGroup;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
|
||||||
|
this.currentPath = currentPath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withQueueSize(int queueSize) {
|
||||||
|
this.queueSize = queueSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
|
||||||
|
this.ageOfLastShippedOp = ageOfLastShippedOp;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
|
||||||
|
this.replicationDelay = replicationDelay;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
|
||||||
|
this.currentPosition = currentPosition;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationStatus build() {
|
||||||
|
return new ReplicationStatus(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
|
public class TestReplicationMetricsforUI extends TestReplicationBase {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReplicationMetricsforUI.class);
|
||||||
|
private static final byte[] qualName = Bytes.toBytes("q");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicationMetrics() throws Exception {
|
||||||
|
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
|
||||||
|
Put p = new Put(Bytes.toBytes("starter"));
|
||||||
|
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
|
||||||
|
htable1.put(p);
|
||||||
|
// make sure replication done
|
||||||
|
while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
|
||||||
|
Thread.sleep(5000);
|
||||||
|
HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
|
||||||
|
Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
|
||||||
|
Assert.assertEquals("metric size ", 1, metrics.size());
|
||||||
|
long lastPosition = 0;
|
||||||
|
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
|
||||||
|
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
|
||||||
|
Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
|
||||||
|
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
|
||||||
|
Assert.assertTrue("current position >= 0", metric.getValue().getCurrentPosition() >= 0);
|
||||||
|
lastPosition = metric.getValue().getCurrentPosition();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
|
||||||
|
p = new Put(Bytes.toBytes("" + Integer.toString(i)));
|
||||||
|
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
|
||||||
|
htable1.put(p);
|
||||||
|
}
|
||||||
|
while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
|
||||||
|
.size() == 0) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
rs = utility1.getRSForFirstRegionInTable(tableName);
|
||||||
|
metrics = rs.getWalGroupsReplicationStatus();
|
||||||
|
Path lastPath = null;
|
||||||
|
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
|
||||||
|
lastPath = metric.getValue().getCurrentPath();
|
||||||
|
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
|
||||||
|
Assert.assertTrue("age of Last Shipped Op should be > 0 ",
|
||||||
|
metric.getValue().getAgeOfLastShippedOp() > 0);
|
||||||
|
Assert.assertTrue("current position should > last position",
|
||||||
|
metric.getValue().getCurrentPosition() - lastPosition > 0);
|
||||||
|
lastPosition = metric.getValue().getCurrentPosition();
|
||||||
|
}
|
||||||
|
|
||||||
|
hbaseAdmin.rollWALWriter(rs.getServerName());
|
||||||
|
p = new Put(Bytes.toBytes("trigger"));
|
||||||
|
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
|
||||||
|
htable1.put(p);
|
||||||
|
// make sure replication rolled to a new log
|
||||||
|
while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
|
||||||
|
Thread.sleep(5000);
|
||||||
|
metrics = rs.getWalGroupsReplicationStatus();
|
||||||
|
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
|
||||||
|
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
|
||||||
|
Assert.assertTrue("current position should < last position",
|
||||||
|
metric.getValue().getCurrentPosition() < lastPosition);
|
||||||
|
Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue