diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon index c6c7fc3b1c7..67d330584bc 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon @@ -383,7 +383,7 @@ if (totalCompactingCells > 0) { <& serverNameLink; serverName=pair.getFirst(); &> <% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %> <% pair.getSecond().getSizeOfLogQueue() %> - <% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %> + <% pair.getSecond().getReplicationLag() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %> @@ -393,6 +393,7 @@ if (totalCompactingCells > 0) { } +

If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.

<%else>

No Peers Metrics

diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index c9bfcc95eba..646d8359275 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &> +
+

Replication Status

+ <& ReplicationStatusTmpl; regionServer = regionServer; &> +

Software Attributes

diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon new file mode 100644 index 00000000000..7dc1c7f6638 --- /dev/null +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon @@ -0,0 +1,105 @@ +<%doc> + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +<%args> + HRegionServer regionServer; + +<%import> + java.util.*; + java.util.Map.Entry; + org.apache.hadoop.hbase.procedure2.util.StringUtils; + org.apache.hadoop.hbase.regionserver.HRegionServer; + org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; + + +<%java> + Map walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus(); + + +<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %> + +
+ +
+
+ <& currentLog; metrics = walGroupsReplicationStatus; &> +
+
+ <& replicationDelay; metrics = walGroupsReplicationStatus; &> +
+
+
+

If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled. + If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.

+ +<%else> +

No Replication Metrics for Peers

+ + +<%def currentLog> +<%args> + Map metrics; + + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + + +
PeerIdWalGroupCurrent LogSizeQueue SizeOffset
<% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanSize(entry.getValue().getFileSize()) %><% entry.getValue().getQueueSize() %><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %>
+ + +<%def replicationDelay> +<%args> + Map metrics; + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + +
PeerIdWalGroupCurrent LogLast Shipped AgeReplication Delay
<% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %>
+ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9509ea79323..1614cf5a52f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2984,6 +2986,20 @@ public class HRegionServer extends HasThread implements return service; } + public Map getWalGroupsReplicationStatus(){ + Map walGroupsReplicationStatus = new TreeMap<>(); + if(!this.isOnline()){ + return walGroupsReplicationStatus; + } + List allSources = new ArrayList<>(); + allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); + allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); + for(ReplicationSourceInterface source: allSources){ + walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); + } + return walGroupsReplicationStatus; + } + /** * Utility for constructing an instance of the passed HRegionServer class. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index c34231d9ed1..e9bbaea8ae4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -54,5 +54,5 @@ public interface ReplicationService { /** * Refresh and Get ReplicationLoad */ - public ReplicationLoad refreshAndGetReplicationLoad(); + ReplicationLoad refreshAndGetReplicationLoad(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 09ec47776fe..ffc96357536 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -43,4 +44,9 @@ public interface ReplicationSourceService extends ReplicationService { * Return the replication peers. */ ReplicationPeers getReplicationPeers(); + + /** + * Returns the replication manager + */ + ReplicationSourceManager getReplicationManager(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 7bc7084a71d..906f0c6d330 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -42,6 +42,7 @@ public class MetricsSource implements BaseSource { // tracks last shipped timestamp for each wal group private Map lastTimestamps = new HashMap<>(); + private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; @@ -87,6 +88,7 @@ public class MetricsSource implements BaseSource { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); + this.ageOfLastShippedOp.put(walGroup, age); this.lastTimestamps.put(walGroup, timestamp); } @@ -103,6 +105,24 @@ public class MetricsSource implements BaseSource { .setLastShippedAge(age); } + /** + * get the last timestamp of given wal group. If the walGroup is null, return 0. + * @param walGroup which group we are getting + * @return timeStamp + */ + public long getLastTimeStampOfWalGroup(String walGroup) { + return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup); + } + + /** + * get age of last shipped op of given wal group. If the walGroup is null, return 0 + * @param walGroup which group we are getting + * @return age + */ + public long getAgeofLastShippedOp(String walGroup) { + return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); + } + /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 219c03d4d4f..53e560b0d53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -79,19 +79,8 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); - long replicationLag; - long timePassedAfterLastShippedOp = - EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; - if (sizeOfLogQueue != 0) { - // err on the large side - replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); - } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { - replicationLag = ageOfLastShippedOp; // last shipped happen recently - } else { - // last shipped may happen last night, - // so NO real lag although ageOfLastShippedOp is non-zero - replicationLag = 0; - } + long replicationLag = + calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); if (rLoadSource != null) { @@ -114,6 +103,29 @@ public class ReplicationLoad { this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); } + static long calculateReplicationDelay(long ageOfLastShippedOp, + long timeStampOfLastShippedOp, int sizeOfLogQueue) { + long replicationLag; + long timePassedAfterLastShippedOp; + if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE + return Long.MAX_VALUE; + } else { + timePassedAfterLastShippedOp = + EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; + } + if (sizeOfLogQueue > 1) { + // err on the large side + replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); + } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { + replicationLag = ageOfLastShippedOp; // last shipped happen recently + } else { + // last shipped may happen last night, + // so NO real lag although ageOfLastShippedOp is non-zero + replicationLag = 0; + } + return replicationLag; + } + /** * sourceToString * @return a string contains sourceReplicationLoad information diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a0d8321ac60..10fa50f553b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; + +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -25,6 +28,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -312,6 +316,50 @@ public class ReplicationSource implements ReplicationSourceInterface { } } + @Override + public Map getWalGroupStatus() { + Map sourceReplicationStatus = new TreeMap<>(); + long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + for (Map.Entry walGroupShipper : workerThreads.entrySet()) { + String walGroupId = walGroupShipper.getKey(); + ReplicationSourceShipper shipper = walGroupShipper.getValue(); + lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); + ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); + int queueSize = queues.get(walGroupId).size(); + replicationDelay = + ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); + Path currentPath = shipper.getCurrentPath(); + try { + fileSize = getFileSize(currentPath); + } catch (IOException e) { + LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); + fileSize = -1; + } + ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); + statusBuilder.withPeerId(this.getPeerId()) + .withQueueSize(queueSize) + .withWalGroup(walGroupId) + .withCurrentPath(currentPath) + .withCurrentPosition(shipper.getCurrentPosition()) + .withFileSize(fileSize) + .withAgeOfLastShippedOp(ageOfLastShippedOp) + .withReplicationDelay(replicationDelay); + sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); + } + return sourceReplicationStatus; + } + + private long getFileSize(Path currentPath) throws IOException { + long fileSize; + try { + fileSize = fs.getContentSummary(currentPath).getLength(); + } catch (FileNotFoundException e) { + currentPath = getArchivedLogPath(currentPath, conf); + fileSize = fs.getContentSummary(currentPath).getLength(); + } + return fileSize; + } + protected ReplicationSourceShipper createNewShipper(String walGroupId, PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 3ce5bfe1644..df7a8cc7b2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -182,6 +184,14 @@ public interface ReplicationSourceInterface { */ ServerName getServerWALsBelongTo(); + /** + * get the stat of replication for each wal group. + * @return stat of replication + */ + default Map getWalGroupStatus() { + return new HashMap<>(); + } + /** * @return whether this is a replication source for recovery. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 30696d1b2c5..5d6198e96e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -72,6 +72,8 @@ public class ReplicationSourceShipper extends Thread { protected final long sleepForRetries; // Maximum number of retries before taking bold actions protected final int maxRetriesMultiplier; + private final int DEFAULT_TIMEOUT = 20000; + private final int getEntriesTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, ReplicationSource source) { @@ -83,6 +85,8 @@ public class ReplicationSourceShipper extends Thread { this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per + this.getEntriesTimeout = + this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds } @Override @@ -98,7 +102,13 @@ public class ReplicationSourceShipper extends Thread { continue; } try { - WALEntryBatch entryBatch = entryReader.take(); + WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + if (entryBatch == null) { + // since there is no logs need to replicate, we refresh the ageOfLastShippedOp + source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), + walGroupId); + continue; + } // the NO_MORE_DATA instance has no path so do not call shipEdits if (entryBatch == WALEntryBatch.NO_MORE_DATA) { noMoreData(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 666ee2aac73..b3bdb029405 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -300,6 +301,10 @@ class ReplicationSourceWALReader extends Thread { return entryBatchQueue.take(); } + public WALEntryBatch poll(long timeout) throws InterruptedException { + return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); + } + private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); WALKey key = entry.getKey(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java new file mode 100644 index 00000000000..10d6cd59d4a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public final class ReplicationStatus { + private final String peerId; + private final String walGroup; + private final Path currentPath; + private final int queueSize; + private final long ageOfLastShippedOp; + private final long replicationDelay; + private final long currentPosition; + private final long fileSize; + + private ReplicationStatus(ReplicationStatusBuilder builder) { + this.peerId = builder.peerId; + this.walGroup = builder.walGroup; + this.currentPath = builder.currentPath; + this.queueSize = builder.queueSize; + this.ageOfLastShippedOp = builder.ageOfLastShippedOp; + this.replicationDelay = builder.replicationDelay; + this.currentPosition = builder.currentPosition; + this.fileSize = builder.fileSize; + } + + public long getCurrentPosition() { + return currentPosition; + } + + public long getFileSize() { + return fileSize; + } + + public String getPeerId() { + return peerId; + } + + public String getWalGroup() { + return walGroup; + } + + public int getQueueSize() { + return queueSize; + } + + public long getAgeOfLastShippedOp() { + return ageOfLastShippedOp; + } + + public long getReplicationDelay() { + return replicationDelay; + } + + public Path getCurrentPath() { + return currentPath; + } + + public static ReplicationStatusBuilder newBuilder() { + return new ReplicationStatusBuilder(); + } + + public static class ReplicationStatusBuilder { + private String peerId = "UNKNOWN"; + private String walGroup = "UNKNOWN"; + private Path currentPath = new Path("UNKNOWN"); + private int queueSize = -1; + private long ageOfLastShippedOp = -1; + private long replicationDelay = -1; + private long currentPosition = -1; + private long fileSize = -1; + + public ReplicationStatusBuilder withPeerId(String peerId) { + this.peerId = peerId; + return this; + } + + public ReplicationStatusBuilder withFileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public ReplicationStatusBuilder withWalGroup(String walGroup) { + this.walGroup = walGroup; + return this; + } + + public ReplicationStatusBuilder withCurrentPath(Path currentPath) { + this.currentPath = currentPath; + return this; + } + + public ReplicationStatusBuilder withQueueSize(int queueSize) { + this.queueSize = queueSize; + return this; + } + + public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) { + this.ageOfLastShippedOp = ageOfLastShippedOp; + return this; + } + + public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) { + this.replicationDelay = replicationDelay; + return this; + } + + public ReplicationStatusBuilder withCurrentPosition(long currentPosition) { + this.currentPosition = currentPosition; + return this; + } + + public ReplicationStatus build() { + return new ReplicationStatus(this); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java new file mode 100644 index 00000000000..8ff4d84dcd7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationMetricsforUI extends TestReplicationBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationMetricsforUI.class); + private static final byte[] qualName = Bytes.toBytes("q"); + + @Test + public void testReplicationMetrics() throws Exception { + try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + Put p = new Put(Bytes.toBytes("starter")); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay")); + htable1.put(p); + // make sure replication done + while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) { + Thread.sleep(500); + } + // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp + Thread.sleep(5000); + HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName); + Map metrics = rs.getWalGroupsReplicationStatus(); + Assert.assertEquals("metric size ", 1, metrics.size()); + long lastPosition = 0; + for (Map.Entry metric : metrics.entrySet()) { + Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); + Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize()); + Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); + Assert.assertTrue("current position >= 0", metric.getValue().getCurrentPosition() >= 0); + lastPosition = metric.getValue().getCurrentPosition(); + } + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("" + Integer.toString(i))); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i)); + htable1.put(p); + } + while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1)))) + .size() == 0) { + Thread.sleep(500); + } + rs = utility1.getRSForFirstRegionInTable(tableName); + metrics = rs.getWalGroupsReplicationStatus(); + Path lastPath = null; + for (Map.Entry metric : metrics.entrySet()) { + lastPath = metric.getValue().getCurrentPath(); + Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); + Assert.assertTrue("age of Last Shipped Op should be > 0 ", + metric.getValue().getAgeOfLastShippedOp() > 0); + Assert.assertTrue("current position should > last position", + metric.getValue().getCurrentPosition() - lastPosition > 0); + lastPosition = metric.getValue().getCurrentPosition(); + } + + hbaseAdmin.rollWALWriter(rs.getServerName()); + p = new Put(Bytes.toBytes("trigger")); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay")); + htable1.put(p); + // make sure replication rolled to a new log + while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) { + Thread.sleep(500); + } + // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp + Thread.sleep(5000); + metrics = rs.getWalGroupsReplicationStatus(); + for (Map.Entry metric : metrics.entrySet()) { + Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); + Assert.assertTrue("current position should < last position", + metric.getValue().getCurrentPosition() < lastPosition); + Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath()); + } + } + } +} \ No newline at end of file