HBASE-20193 Basic Replication Web UI - Regionserver

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
jingyuntian 2018-07-03 11:46:14 +08:00 committed by zhangduo
parent 8f9c322cda
commit 6b8cd00ec0
14 changed files with 496 additions and 17 deletions

View File

@ -383,7 +383,7 @@ if (totalCompactingCells > 0) {
<td><& serverNameLink; serverName=pair.getFirst(); &></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
<td><% pair.getSecond().getSizeOfLogQueue() %></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
<td><% pair.getSecond().getReplicationLag() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
</tr>
</%for>
</table>
@ -393,6 +393,7 @@ if (totalCompactingCells > 0) {
}
</%java>
</div>
<p>If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.</p>
</div>
<%else>
<p>No Peers Metrics</p>

View File

@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
<& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
</section>
<section>
<h2>Replication Status</h1>
<& ReplicationStatusTmpl; regionServer = regionServer; &>
</section>
<section>
<h2>Software Attributes</h2>

View File

@ -0,0 +1,105 @@
<%doc>
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
</%doc>
<%args>
HRegionServer regionServer;
</%args>
<%import>
java.util.*;
java.util.Map.Entry;
org.apache.hadoop.hbase.procedure2.util.StringUtils;
org.apache.hadoop.hbase.regionserver.HRegionServer;
org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
</%import>
<%java>
Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
</%java>
<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
<div class="tabbable">
<ul class="nav nav-pills">
<li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
<li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_currentLog">
<& currentLog; metrics = walGroupsReplicationStatus; &>
</div>
<div class="tab-pane" id="tab_replicationDelay">
<& replicationDelay; metrics = walGroupsReplicationStatus; &>
</div>
</div>
</div>
<p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
<%else>
<p>No Replication Metrics for Peers</p>
</%if>
<%def currentLog>
<%args>
Map<String, ReplicationStatus> metrics;
</%args>
<table class="table table-striped">
<tr>
<th>PeerId</th>
<th>WalGroup</th>
<th>Current Log</th>
<th>Size</th>
<th>Queue Size</th>
<th>Offset</th>
</tr>
<%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
<tr>
<td><% entry.getValue().getPeerId() %></td>
<td><% entry.getValue().getWalGroup() %></td>
<td><% entry.getValue().getCurrentPath() %> </td>
<td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
<td><% entry.getValue().getQueueSize() %></td>
<td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
</tr>
</%for>
</table>
</%def>
<%def replicationDelay>
<%args>
Map<String, ReplicationStatus> metrics;
</%args>
<table class="table table-striped">
<tr>
<th>PeerId</th>
<th>WalGroup</th>
<th>Current Log</th>
<th>Last Shipped Age</th>
<th>Replication Delay</th>
</tr>
<%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
<tr>
<td><% entry.getValue().getPeerId() %></td>
<td><% entry.getValue().getWalGroup() %></td>
<td><% entry.getValue().getCurrentPath() %> </td>
<td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
<td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
</tr>
</%for>
</table>
</%def>

View File

@ -132,6 +132,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@ -2960,6 +2962,20 @@ public class HRegionServer extends HasThread implements
return service;
}
public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
if(!this.isOnline()){
return walGroupsReplicationStatus;
}
List<ReplicationSourceInterface> allSources = new ArrayList<>();
allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
for(ReplicationSourceInterface source: allSources){
walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
}
return walGroupsReplicationStatus;
}
/**
* Utility for constructing an instance of the passed HRegionServer class.
*

View File

@ -54,5 +54,5 @@ public interface ReplicationService {
/**
* Refresh and Get ReplicationLoad
*/
public ReplicationLoad refreshAndGetReplicationLoad();
ReplicationLoad refreshAndGetReplicationLoad();
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -32,4 +33,9 @@ public interface ReplicationSourceService extends ReplicationService {
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
/**
* Returns the replication manager
*/
ReplicationSourceManager getReplicationManager();
}

View File

@ -42,6 +42,7 @@ public class MetricsSource implements BaseSource {
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimestamps = new HashMap<>();
private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
private long lastHFileRefsQueueSize = 0;
private String id;
@ -87,6 +88,7 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
this.ageOfLastShippedOp.put(walGroup, age);
this.lastTimestamps.put(walGroup, timestamp);
}
@ -103,6 +105,24 @@ public class MetricsSource implements BaseSource {
.setLastShippedAge(age);
}
/**
* get the last timestamp of given wal group. If the walGroup is null, return 0.
* @param walGroup which group we are getting
* @return timeStamp
*/
public long getLastTimeStampOfWalGroup(String walGroup) {
return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
}
/**
* get age of last shipped op of given wal group. If the walGroup is null, return 0
* @param walGroup which group we are getting
* @return age
*/
public long getAgeofLastShippedOp(String walGroup) {
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
}
/**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.

View File

@ -79,19 +79,8 @@ public class ReplicationLoad {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
long replicationLag;
long timePassedAfterLastShippedOp =
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
if (sizeOfLogQueue != 0) {
// err on the large side
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
replicationLag = ageOfLastShippedOp; // last shipped happen recently
} else {
// last shipped may happen last night,
// so NO real lag although ageOfLastShippedOp is non-zero
replicationLag = 0;
}
long replicationLag =
calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
if (rLoadSource != null) {
@ -114,6 +103,29 @@ public class ReplicationLoad {
this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
}
static long calculateReplicationDelay(long ageOfLastShippedOp,
long timeStampOfLastShippedOp, int sizeOfLogQueue) {
long replicationLag;
long timePassedAfterLastShippedOp;
if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE
return Long.MAX_VALUE;
} else {
timePassedAfterLastShippedOp =
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
}
if (sizeOfLogQueue > 1) {
// err on the large side
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
replicationLag = ageOfLastShippedOp; // last shipped happen recently
} else {
// last shipped may happen last night,
// so NO real lag although ageOfLastShippedOp is non-zero
replicationLag = 0;
}
return replicationLag;
}
/**
* sourceToString
* @return a string contains sourceReplicationLoad information

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@ -25,6 +28,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@ -308,6 +312,50 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
}
@Override
public Map<String, ReplicationStatus> getWalGroupStatus() {
Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = shipper.getCurrentPath();
try {
fileSize = getFileSize(currentPath);
} catch (IOException e) {
LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
fileSize = -1;
}
ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
statusBuilder.withPeerId(this.getPeerId())
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
.withCurrentPosition(shipper.getCurrentPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
}
return sourceReplicationStatus;
}
private long getFileSize(Path currentPath) throws IOException {
long fileSize;
try {
fileSize = fs.getContentSummary(currentPath).getLength();
} catch (FileNotFoundException e) {
currentPath = getArchivedLogPath(currentPath, conf);
fileSize = fs.getContentSummary(currentPath).getLength();
}
return fileSize;
}
protected ReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue<Path> queue) {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@ -167,6 +169,14 @@ public interface ReplicationSourceInterface {
*/
ServerName getServerWALsBelongTo();
/**
* get the stat of replication for each wal group.
* @return stat of replication
*/
default Map<String, ReplicationStatus> getWalGroupStatus() {
return new HashMap<>();
}
/**
* @return whether this is a replication source for recovery.
*/

View File

@ -70,6 +70,8 @@ public class ReplicationSourceShipper extends Thread {
protected final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
@ -81,6 +83,8 @@ public class ReplicationSourceShipper extends Thread {
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.getEntriesTimeout =
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
}
@Override
@ -96,8 +100,14 @@ public class ReplicationSourceShipper extends Thread {
continue;
}
try {
WALEntryBatch entryBatch = entryReader.take();
// the NO_MORE_DATA instance has no path so do not all shipEdits
WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
if (entryBatch == null) {
// since there is no logs need to replicate, we refresh the ageOfLastShippedOp
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
walGroupId);
continue;
}
// the NO_MORE_DATA instance has no path so do not call shipEdits
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
noMoreData();
} else {

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -299,6 +300,10 @@ class ReplicationSourceWALReader extends Thread {
return entryBatchQueue.take();
}
public WALEntryBatch poll(long timeout) throws InterruptedException {
return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationStatus {
private final String peerId;
private final String walGroup;
private final Path currentPath;
private final int queueSize;
private final long ageOfLastShippedOp;
private final long replicationDelay;
private final long currentPosition;
private final long fileSize;
private ReplicationStatus(ReplicationStatusBuilder builder) {
this.peerId = builder.peerId;
this.walGroup = builder.walGroup;
this.currentPath = builder.currentPath;
this.queueSize = builder.queueSize;
this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
this.replicationDelay = builder.replicationDelay;
this.currentPosition = builder.currentPosition;
this.fileSize = builder.fileSize;
}
public long getCurrentPosition() {
return currentPosition;
}
public long getFileSize() {
return fileSize;
}
public String getPeerId() {
return peerId;
}
public String getWalGroup() {
return walGroup;
}
public int getQueueSize() {
return queueSize;
}
public long getAgeOfLastShippedOp() {
return ageOfLastShippedOp;
}
public long getReplicationDelay() {
return replicationDelay;
}
public Path getCurrentPath() {
return currentPath;
}
public static ReplicationStatusBuilder newBuilder() {
return new ReplicationStatusBuilder();
}
public static class ReplicationStatusBuilder {
private String peerId = "UNKNOWN";
private String walGroup = "UNKNOWN";
private Path currentPath = new Path("UNKNOWN");
private int queueSize = -1;
private long ageOfLastShippedOp = -1;
private long replicationDelay = -1;
private long currentPosition = -1;
private long fileSize = -1;
public ReplicationStatusBuilder withPeerId(String peerId) {
this.peerId = peerId;
return this;
}
public ReplicationStatusBuilder withFileSize(long fileSize) {
this.fileSize = fileSize;
return this;
}
public ReplicationStatusBuilder withWalGroup(String walGroup) {
this.walGroup = walGroup;
return this;
}
public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
this.currentPath = currentPath;
return this;
}
public ReplicationStatusBuilder withQueueSize(int queueSize) {
this.queueSize = queueSize;
return this;
}
public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
this.ageOfLastShippedOp = ageOfLastShippedOp;
return this;
}
public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
this.replicationDelay = replicationDelay;
return this;
}
public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
this.currentPosition = currentPosition;
return this;
}
public ReplicationStatus build() {
return new ReplicationStatus(this);
}
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationMetricsforUI extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationMetricsforUI.class);
private static final byte[] qualName = Bytes.toBytes("q");
@Test
public void testReplicationMetrics() throws Exception {
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
Put p = new Put(Bytes.toBytes("starter"));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
htable1.put(p);
// make sure replication done
while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
Thread.sleep(500);
}
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
Thread.sleep(5000);
HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
Assert.assertEquals("metric size ", 1, metrics.size());
long lastPosition = 0;
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
Assert.assertTrue("current position >= 0", metric.getValue().getCurrentPosition() >= 0);
lastPosition = metric.getValue().getCurrentPosition();
}
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("" + Integer.toString(i)));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
htable1.put(p);
}
while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
.size() == 0) {
Thread.sleep(500);
}
rs = utility1.getRSForFirstRegionInTable(tableName);
metrics = rs.getWalGroupsReplicationStatus();
Path lastPath = null;
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
lastPath = metric.getValue().getCurrentPath();
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
Assert.assertTrue("age of Last Shipped Op should be > 0 ",
metric.getValue().getAgeOfLastShippedOp() > 0);
Assert.assertTrue("current position should > last position",
metric.getValue().getCurrentPosition() - lastPosition > 0);
lastPosition = metric.getValue().getCurrentPosition();
}
hbaseAdmin.rollWALWriter(rs.getServerName());
p = new Put(Bytes.toBytes("trigger"));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
htable1.put(p);
// make sure replication rolled to a new log
while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
Thread.sleep(500);
}
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
Thread.sleep(5000);
metrics = rs.getWalGroupsReplicationStatus();
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
Assert.assertTrue("current position should < last position",
metric.getValue().getCurrentPosition() < lastPosition);
Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
}
}
}
}