diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index 7a1019aacab..15c8e63ae95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -389,6 +389,15 @@ public class ServerLoad implements ServerMetrics { return metrics.getReplicationLoadSourceList(); } + /** + * Call directly from client such as hbase shell + * @return a map of ReplicationLoadSource list per peer id + */ + @Override + public Map> getReplicationLoadSourceMap() { + return metrics.getReplicationLoadSourceMap(); + } + /** * Call directly from client such as hbase shell * @return ReplicationLoadSink diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 1e1d395e592..391e62ff390 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -75,6 +75,12 @@ public interface ServerMetrics { */ List getReplicationLoadSourceList(); + /** + * Call directly from client such as hbase shell + * @return a map of ReplicationLoadSource list per peer id + */ + Map> getReplicationLoadSourceMap(); + /** * Call directly from client such as hbase shell * @return ReplicationLoadSink diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 2a487d7d818..a22907b5d86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hbase; import edu.umd.cs.findbugs.annotations.Nullable; + +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,7 +78,7 @@ public final class ServerMetricsBuilder { .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream() .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList())) .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream() - .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) + .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .setReplicationLoadSink(serverLoadPB.hasReplLoadSink() ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) : null) @@ -301,6 +304,16 @@ public final class ServerMetricsBuilder { return Collections.unmodifiableList(sources); } + @Override + public Map> getReplicationLoadSourceMap(){ + Map> sourcesMap = new HashMap<>(); + for(ReplicationLoadSource loadSource : sources){ + sourcesMap.computeIfAbsent(loadSource.getPeerID(), + peerId -> new ArrayList()).add(loadSource); + } + return sourcesMap; + } + @Override public ReplicationLoadSink getReplicationLoadSink() { return sink; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java index 9e24e225f11..8ee221958c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java @@ -16,21 +16,36 @@ import org.apache.yetus.audience.InterfaceAudience; * A HBase ReplicationLoad to present MetricsSource information */ @InterfaceAudience.Public -public class ReplicationLoadSource { +public final class ReplicationLoadSource { private final String peerID; private final long ageOfLastShippedOp; private final int sizeOfLogQueue; private final long timestampOfLastShippedOp; private final long replicationLag; + private long timeStampOfNextToReplicate; + private String queueId; + private boolean recovered; + private boolean running; + private boolean editsSinceRestart; + private long editsRead; + private long oPsShipped; - // TODO: add the builder for this class @InterfaceAudience.Private - public ReplicationLoadSource(String id, long age, int size, long timestamp, long lag) { + private ReplicationLoadSource(String id, long age, int size, long timestamp, + long timeStampOfNextToReplicate, long lag, String queueId, boolean recovered, boolean running, + boolean editsSinceRestart, long editsRead, long oPsShipped) { this.peerID = id; this.ageOfLastShippedOp = age; this.sizeOfLogQueue = size; this.timestampOfLastShippedOp = timestamp; this.replicationLag = lag; + this.timeStampOfNextToReplicate = timeStampOfNextToReplicate; + this.queueId = queueId; + this.recovered = recovered; + this.running = running; + this.editsSinceRestart = editsSinceRestart; + this.editsRead = editsRead; + this.oPsShipped = oPsShipped; } public String getPeerID() { @@ -61,4 +76,123 @@ public class ReplicationLoadSource { public long getReplicationLag() { return this.replicationLag; } -} + + public long getTimeStampOfNextToReplicate() { + return this.timeStampOfNextToReplicate; + } + + public String getQueueId() { + return queueId; + } + + public boolean isRecovered() { + return recovered; + } + + public boolean isRunning() { + return running; + } + + public boolean hasEditsSinceRestart() { + return editsSinceRestart; + } + + public long getEditsRead() { + return editsRead; + } + + public long getOPsShipped() { + return oPsShipped; + } + + public static ReplicationLoadSourceBuilder newBuilder(){ + return new ReplicationLoadSourceBuilder(); + } + + public static final class ReplicationLoadSourceBuilder { + + private String peerID; + private long ageOfLastShippedOp; + private int sizeOfLogQueue; + private long timestampOfLastShippedOp; + private long replicationLag; + private long timeStampOfNextToReplicate; + private String queueId; + private boolean recovered; + private boolean running; + private boolean editsSinceRestart; + private long editsRead; + private long oPsShipped; + + private ReplicationLoadSourceBuilder(){ + + } + + public ReplicationLoadSourceBuilder setTimeStampOfNextToReplicate( + long timeStampOfNextToReplicate) { + this.timeStampOfNextToReplicate = timeStampOfNextToReplicate; + return this; + } + + public ReplicationLoadSourceBuilder setPeerID(String peerID) { + this.peerID = peerID; + return this; + } + + public ReplicationLoadSourceBuilder setAgeOfLastShippedOp(long ageOfLastShippedOp) { + this.ageOfLastShippedOp = ageOfLastShippedOp; + return this; + } + + public ReplicationLoadSourceBuilder setSizeOfLogQueue(int sizeOfLogQueue) { + this.sizeOfLogQueue = sizeOfLogQueue; + return this; + } + + public ReplicationLoadSourceBuilder setTimestampOfLastShippedOp(long timestampOfLastShippedOp) { + this.timestampOfLastShippedOp = timestampOfLastShippedOp; + return this; + } + + public ReplicationLoadSourceBuilder setReplicationLag(long replicationLag) { + this.replicationLag = replicationLag; + return this; + } + + public ReplicationLoadSourceBuilder setQueueId(String queueId) { + this.queueId = queueId; + return this; + } + + public ReplicationLoadSourceBuilder setRecovered(boolean recovered) { + this.recovered = recovered; + return this; + } + + public ReplicationLoadSourceBuilder setRunning(boolean running) { + this.running = running; + return this; + } + + public ReplicationLoadSourceBuilder setEditsSinceRestart(boolean editsSinceRestart) { + this.editsSinceRestart = editsSinceRestart; + return this; + } + + public ReplicationLoadSourceBuilder setEditsRead(long editsRead) { + this.editsRead = editsRead; + return this; + } + + public ReplicationLoadSourceBuilder setoPsShipped(long oPsShipped) { + this.oPsShipped = oPsShipped; + return this; + } + + public ReplicationLoadSource build(){ + return new ReplicationLoadSource(peerID, ageOfLastShippedOp, sizeOfLogQueue, + timestampOfLastShippedOp, timeStampOfNextToReplicate, replicationLag, queueId, recovered, + running, editsSinceRestart, editsRead, oPsShipped); + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 3039392f608..bc97b013f10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2717,8 +2717,20 @@ public final class ProtobufUtil { public static ReplicationLoadSource toReplicationLoadSource( ClusterStatusProtos.ReplicationLoadSource rls) { - return new ReplicationLoadSource(rls.getPeerID(), rls.getAgeOfLastShippedOp(), - rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), rls.getReplicationLag()); + ReplicationLoadSource.ReplicationLoadSourceBuilder builder = ReplicationLoadSource.newBuilder(); + builder.setPeerID(rls.getPeerID()). + setAgeOfLastShippedOp(rls.getAgeOfLastShippedOp()). + setSizeOfLogQueue(rls.getSizeOfLogQueue()). + setTimestampOfLastShippedOp(rls.getTimeStampOfLastShippedOp()). + setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate()). + setReplicationLag(rls.getReplicationLag()). + setQueueId(rls.getQueueId()). + setRecovered(rls.getRecovered()). + setRunning(rls.getRunning()). + setEditsSinceRestart(rls.getEditsSinceRestart()). + setEditsRead(rls.getEditsRead()). + setoPsShipped(rls.getOPsShipped()); + return builder.build(); } /** @@ -3227,6 +3239,13 @@ public final class ProtobufUtil { .setSizeOfLogQueue((int) rls.getSizeOfLogQueue()) .setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp()) .setReplicationLag(rls.getReplicationLag()) + .setQueueId(rls.getQueueId()) + .setRecovered(rls.isRecovered()) + .setRunning(rls.isRunning()) + .setEditsSinceRestart(rls.hasEditsSinceRestart()) + .setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate()) + .setOPsShipped(rls.getOPsShipped()) + .setEditsRead(rls.getEditsRead()) .build(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 1a17f37e205..3fd5ac62b8d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -76,4 +76,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void incrCompletedWAL(); void incrCompletedRecoveryQueue(); void incrFailedRecoveryQueue(); + long getWALEditsRead(); + long getShippedOps(); + long getEditsFiltered(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 4e8c810138b..8942182d271 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -32,7 +32,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableFastCounter logReadInEditsCounter; - private final MutableFastCounter logEditsFilteredCounter; + private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedBytesCounter; @@ -73,7 +73,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); - logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); + walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); @@ -111,7 +111,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS } @Override public void incrLogEditsFiltered(long size) { - logEditsFilteredCounter.incr(size); + walEditsFilteredCounter.incr(size); } @Override public void incrBatchesShipped(int batches) { @@ -260,4 +260,19 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public String getMetricsName() { return rms.getMetricsName(); } + + @Override + public long getWALEditsRead() { + return this.logReadInEditsCounter.value(); + } + + @Override + public long getShippedOps() { + return this.shippedOpsCounter.value(); + } + + @Override + public long getEditsFiltered() { + return this.walEditsFilteredCounter.value(); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 0ad50524f15..ec9271e0883 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -48,7 +48,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableFastCounter logReadInEditsCounter; - private final MutableFastCounter logEditsFilteredCounter; + private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedKBsCounter; @@ -102,7 +102,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L); logEditsFilteredKey = this.keyPrefix + "logEditsFiltered"; - logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L); + walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L); shippedHFilesKey = this.keyPrefix + "shippedHFiles"; shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L); @@ -149,7 +149,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou } @Override public void incrLogEditsFiltered(long size) { - logEditsFilteredCounter.incr(size); + walEditsFilteredCounter.incr(size); } @Override public void incrBatchesShipped(int batches) { @@ -314,4 +314,16 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public String getMetricsName() { return rms.getMetricsName(); } + + @Override public long getWALEditsRead() { + return this.logReadInEditsCounter.value(); + } + + @Override public long getShippedOps() { + return this.shippedOpsCounter.value(); + } + + @Override public long getEditsFiltered() { + return this.walEditsFilteredCounter.value(); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto index c3fe19dafe5..563db9f445e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto @@ -158,6 +158,13 @@ message ReplicationLoadSource { required uint32 sizeOfLogQueue = 3; required uint64 timeStampOfLastShippedOp = 4; required uint64 replicationLag = 5; + optional uint64 timeStampOfNextToReplicate=6; + optional string queueId = 7; + optional bool recovered = 8; + optional bool running = 9; + optional bool editsSinceRestart = 10; + optional uint64 editsRead = 11; + optional uint64 oPsShipped = 12; } message ServerLoad { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6ef602115e1..1e871b70f6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1372,9 +1372,11 @@ public class HRegionServer extends HasThread implements ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); if (rLoad != null) { serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); - for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { + for (ClusterStatusProtos.ReplicationLoadSource rLS : + rLoad.getReplicationLoadSourceEntries()) { serverLoad.addReplLoadSource(rLS); } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa7c17..57301fc5663 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -344,7 +344,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @Override public boolean replicate(ReplicateContext replicateContext) { CompletionService pool = new ExecutorCompletionService<>(this.exec); - String walGroupId = replicateContext.getWalGroupId(); int sleepMultiplier = 1; if (!peersSelected && this.isRunning()) { @@ -371,19 +370,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi reconnectToPeerCluster(); } try { - long lastWriteTime; - // replicate the batches to sink side. - lastWriteTime = parallelReplicate(pool, replicateContext, batches); - - // update metrics - if (lastWriteTime > 0) { - this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); - } + parallelReplicate(pool, replicateContext, batches); return true; } catch (IOException ioe) { - // Didn't ship anything, but must still age the last time we did - this.metrics.refreshAgeOfLastShippedOp(walGroupId); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 830ebe182d2..92ab070d6dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -41,10 +41,11 @@ public class MetricsSource implements BaseSource { private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); // tracks last shipped timestamp for each wal group - private Map lastTimestamps = new HashMap<>(); + private Map lastShippedTimeStamps = new HashMap(); private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; + private long timeStampNextToReplicate; private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; @@ -81,7 +82,7 @@ public class MetricsSource implements BaseSource { /** * Set the age of the last edit that was shipped - * @param timestamp write time of the edit + * @param timestamp target write time of the edit * @param walGroup which group we are setting */ public void setAgeOfLastShippedOp(long timestamp, String walGroup) { @@ -89,7 +90,7 @@ public class MetricsSource implements BaseSource { singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); this.ageOfLastShippedOp.put(walGroup, age); - this.lastTimestamps.put(walGroup, timestamp); + this.lastShippedTimeStamps.put(walGroup, timestamp); } /** @@ -105,15 +106,6 @@ public class MetricsSource implements BaseSource { .setLastShippedAge(age); } - /** - * get the last timestamp of given wal group. If the walGroup is null, return 0. - * @param walGroup which group we are getting - * @return timeStamp - */ - public long getLastTimeStampOfWalGroup(String walGroup) { - return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup); - } - /** * get age of last shipped op of given wal group. If the walGroup is null, return 0 * @param walGroup which group we are getting @@ -129,9 +121,9 @@ public class MetricsSource implements BaseSource { * @param walGroupId id of the group to update */ public void refreshAgeOfLastShippedOp(String walGroupId) { - Long lastTimestamp = this.lastTimestamps.get(walGroupId); + Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); if (lastTimestamp == null) { - this.lastTimestamps.put(walGroupId, 0L); + this.lastShippedTimeStamps.put(walGroupId, 0L); lastTimestamp = 0L; } if (lastTimestamp > 0) { @@ -198,6 +190,30 @@ public class MetricsSource implements BaseSource { globalSourceSource.incrShippedBytes(sizeInBytes); } + /** + * Gets the number of edits not eligible for replication this source queue logs so far. + * @return logEditsFiltered non-replicable edits filtered from this queue logs. + */ + public long getEditsFiltered(){ + return this.singleSourceSource.getEditsFiltered(); + } + + /** + * Gets the number of edits eligible for replication read from this source queue logs so far. + * @return replicableEdits total number of replicable edits read from this queue logs. + */ + public long getReplicableEdits(){ + return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered(); + } + + /** + * Gets the number of OPs shipped by this source queue to target cluster. + * @return oPsShipped total number of OPs shipped by this source. + */ + public long getOpsShipped() { + return this.singleSourceSource.getShippedOps(); + } + /** * Convience method to apply changes to metrics do to shipping a batch of logs. * @@ -223,8 +239,9 @@ public class MetricsSource implements BaseSource { singleSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); - lastTimestamps.clear(); + lastShippedTimeStamps.clear(); lastHFileRefsQueueSize = 0; + timeStampNextToReplicate = 0; } /** @@ -260,7 +277,7 @@ public class MetricsSource implements BaseSource { */ public long getTimestampOfLastShippedOp() { long lastTimestamp = 0L; - for (long ts : lastTimestamps.values()) { + for (long ts : lastShippedTimeStamps.values()) { if (ts > lastTimestamp) { lastTimestamp = ts; } @@ -268,6 +285,32 @@ public class MetricsSource implements BaseSource { return lastTimestamp; } + /** + * TimeStamp of next edit to be replicated. + * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. + */ + public long getTimeStampNextToReplicate() { + return timeStampNextToReplicate; + } + + /** + * TimeStamp of next edit targeted for replication. Used for calculating lag, + * as if this timestamp is greater than timestamp of last shipped, it means there's + * at least one edit pending replication. + * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. + */ + public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { + this.timeStampNextToReplicate = timeStampNextToReplicate; + } + + public long getReplicationDelay() { + if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){ + return 0; + }else{ + return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; + } + } + /** * Get the slave peer ID * @return peerID diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 6c46a853dd3..7a25c64b688 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -249,24 +249,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } private void buildReplicationLoad() { - List sourceMetricsList = new ArrayList<>(); - - // get source - List sources = this.replicationManager.getSources(); - for (ReplicationSourceInterface source : sources) { - sourceMetricsList.add(source.getSourceMetrics()); - } - - // get old source - List oldSources = this.replicationManager.getOldSources(); - for (ReplicationSourceInterface source : oldSources) { - if (source instanceof ReplicationSource) { - sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); - } - } + List allSources = new ArrayList<>(); + allSources.addAll(this.replicationManager.getSources()); + allSources.addAll(this.replicationManager.getOldSources()); // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); - this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); + this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 53e560b0d53..fe4086b632a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -19,15 +19,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.ArrayList; -import java.util.Map; + +import org.apache.hadoop.hbase.util.Strings; import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Strings; /** * This class is used for exporting some of the info from replication metrics @@ -37,11 +36,9 @@ public class ReplicationLoad { // Empty load instance. public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); - - private List sourceMetricsList; private MetricsSink sinkMetrics; - private List replicationLoadSourceList; + private List replicationLoadSourceEntries; private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; /** default constructor */ @@ -51,13 +48,12 @@ public class ReplicationLoad { /** * buildReplicationLoad - * @param srMetricsList + * @param sources List of ReplicationSource instances for which metrics should be reported * @param skMetrics */ - public void buildReplicationLoad(final List srMetricsList, + public void buildReplicationLoad(final List sources, final MetricsSink skMetrics) { - this.sourceMetricsList = srMetricsList; this.sinkMetrics = skMetrics; // build the SinkLoad @@ -67,10 +63,9 @@ public class ReplicationLoad { rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); this.replicationLoadSink = rLoadSinkBuild.build(); - // build the SourceLoad List - Map replicationLoadSourceMap = - new HashMap<>(); - for (MetricsSource sm : this.sourceMetricsList) { + this.replicationLoadSourceEntries = new ArrayList<>(); + for (ReplicationSourceInterface source : sources) { + MetricsSource sm = source.getSourceMetrics(); // Get the actual peer id String peerId = sm.getPeerID(); String[] parts = peerId.split("-", 2); @@ -78,18 +73,11 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); + long editsRead = sm.getReplicableEdits(); + long oPsShipped = sm.getOpsShipped(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); - long replicationLag = - calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); - - ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); - if (rLoadSource != null) { - ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); - sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); - timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), - timeStampOfLastShippedOp); - replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); - } + long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate(); + long replicationLag = sm.getReplicationDelay(); ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); rLoadSourceBuild.setPeerID(peerId); @@ -97,33 +85,19 @@ public class ReplicationLoad { rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); + rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate); + rLoadSourceBuild.setEditsRead(editsRead); + rLoadSourceBuild.setOPsShipped(oPsShipped); + if (source instanceof ReplicationSource){ + ReplicationSource replSource = (ReplicationSource)source; + rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered()); + rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId()); + rLoadSourceBuild.setRunning(replSource.isWorkerRunning()); + rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate>0); + } - replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); + this.replicationLoadSourceEntries.add(rLoadSourceBuild.build()); } - this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); - } - - static long calculateReplicationDelay(long ageOfLastShippedOp, - long timeStampOfLastShippedOp, int sizeOfLogQueue) { - long replicationLag; - long timePassedAfterLastShippedOp; - if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE - return Long.MAX_VALUE; - } else { - timePassedAfterLastShippedOp = - EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; - } - if (sizeOfLogQueue > 1) { - // err on the large side - replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); - } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { - replicationLag = ageOfLastShippedOp; // last shipped happen recently - } else { - // last shipped may happen last night, - // so NO real lag although ageOfLastShippedOp is non-zero - replicationLag = 0; - } - return replicationLag; } /** @@ -131,18 +105,17 @@ public class ReplicationLoad { * @return a string contains sourceReplicationLoad information */ public String sourceToString() { - if (this.sourceMetricsList == null) return null; - StringBuilder sb = new StringBuilder(); - for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) { + for (ClusterStatusProtos.ReplicationLoadSource rls : + this.replicationLoadSourceEntries) { sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID()); sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp()); sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue()); sb = Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp", - (new Date(rls.getTimeStampOfLastShippedOp()).toString())); + (new Date(rls.getTimeStampOfLastShippedOp()).toString())); sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag()); } @@ -171,8 +144,8 @@ public class ReplicationLoad { return this.replicationLoadSink; } - public List getReplicationLoadSourceList() { - return this.replicationLoadSourceList; + public List getReplicationLoadSourceEntries() { + return this.replicationLoadSourceEntries; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8dd856875c3..894ebedaeaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -181,8 +181,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId - + ", currentBandwidth=" + this.currentBandwidth); + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, + replicationPeer.getId(), this.currentBandwidth); } private void decorateConf() { @@ -207,6 +207,8 @@ public class ReplicationSource implements ReplicationSourceInterface { } } queue.put(log); + LOG.trace("Added log file {} to queue of source {}.", logPrefix, + this.replicationQueueInfo.getQueueId()); this.metrics.incrSizeOfLogQueue(); // This will log a warning for each new log that gets created above the warn threshold int queueSize = queue.size(); @@ -315,15 +317,13 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public Map getWalGroupStatus() { Map sourceReplicationStatus = new TreeMap<>(); - long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + long ageOfLastShippedOp, replicationDelay, fileSize; for (Map.Entry walGroupShipper : workerThreads.entrySet()) { String walGroupId = walGroupShipper.getKey(); ReplicationSourceShipper shipper = walGroupShipper.getValue(); - lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); int queueSize = queues.get(walGroupId).size(); - replicationDelay = - ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); + replicationDelay = metrics.getReplicationDelay(); Path currentPath = shipper.getCurrentPath(); fileSize = -1; if (currentPath != null) { @@ -486,6 +486,8 @@ public class ReplicationSource implements ReplicationSourceInterface { for (;;) { peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isSourceActive() && peerClusterId == null) { + LOG.debug("Could not connect to Peer ZK. Sleeping for " + + (this.sleepForRetries * sleepMultiplier) + " millis."); if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } @@ -503,7 +505,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.manager.removeSource(this); return; } - LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + LOG.info("Source: {}, is now replicating from cluster: {}; to peer cluster: {};", + this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); // start workers @@ -552,7 +555,9 @@ public class ReplicationSource implements ReplicationSourceInterface { Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - worker.entryReader.setReaderRunning(false); + if(worker.entryReader != null) { + worker.entryReader.setReaderRunning(false); + } } for (ReplicationSourceShipper worker : workers) { @@ -622,6 +627,10 @@ public class ReplicationSource implements ReplicationSourceInterface { return !this.server.isStopped() && this.sourceRunning; } + public UUID getPeerClusterUUID(){ + return this.clusterId; + } + /** * Comparator used to compare logs together based on their start time */ @@ -644,6 +653,19 @@ public class ReplicationSource implements ReplicationSourceInterface { } } + public ReplicationQueueInfo getReplicationQueueInfo() { + return replicationQueueInfo; + } + + public boolean isWorkerRunning(){ + for(ReplicationSourceShipper worker : this.workerThreads.values()){ + if(worker.isActive()){ + return worker.isActive(); + } + } + return false; + } + @Override public String getStats() { StringBuilder sb = new StringBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5d4f0349add..36cfdd7fb92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -383,7 +383,13 @@ public class ReplicationSourceManager implements ReplicationListener { toRemove.terminate(terminateMessage); } for (SortedSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); + walsByGroup.forEach(wal -> { + Path walPath = new Path(this.logDir, wal); + src.enqueueLog(walPath); + LOG.trace("Enqueued {} to source {} during source creation.", + walPath, src.getQueueId()); + }); + } } LOG.info("Startup replication source for " + src.getPeerId()); @@ -403,8 +409,13 @@ public class ReplicationSourceManager implements ReplicationListener { for (String queueId : previousQueueIds) { ReplicationSourceInterface replicationSource = createSource(queueId, peer); this.oldsources.add(replicationSource); + LOG.trace("Added source for recovered queue: " + src.getQueueId()); for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + walsByGroup.forEach(wal -> { + LOG.trace("Enqueueing log from recovered queue for source: {}", + src.getQueueId()); + src.enqueueLog(new Path(wal)); + }); } toStartup.add(replicationSource); } @@ -613,6 +624,8 @@ public class ReplicationSourceManager implements ReplicationListener { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); + LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", + newLog, source.getQueueId()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index edd2c4bd3f9..5fee6598955 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -90,6 +89,7 @@ public class ReplicationSourceShipper extends Thread { @Override public final void run() { setWorkerState(WorkerState.RUNNING); + LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); // Loop until we close down while (isActive()) { // Sleep until replication is enabled again @@ -101,10 +101,9 @@ public class ReplicationSourceShipper extends Thread { } try { WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + LOG.debug("Shipper from source {} got entry batch from reader: {}", + source.getQueueId(), entryBatch); if (entryBatch == null) { - // since there is no logs need to replicate, we refresh the ageOfLastShippedOp - source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), - walGroupId); continue; } // the NO_MORE_DATA instance has no path so do not call shipEdits @@ -153,16 +152,13 @@ public class ReplicationSourceShipper extends Thread { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (updateLogPosition(entryBatch)) { - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), - walGroupId); - } + updateLogPosition(entryBatch); return; } int currentSize = (int) entryBatch.getHeapSize(); int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); + source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1) + .getKey().getWriteTime()); while (isActive()) { try { try { @@ -174,7 +170,6 @@ public class ReplicationSourceShipper extends Thread { // directly go back to while() for confirm this continue; } - // create replicateContext here, so the entries can be GC'd upon return from this call // stack ReplicationEndpoint.ReplicateContext replicateContext = @@ -195,7 +190,7 @@ public class ReplicationSourceShipper extends Thread { // Clean up hfile references for (Entry entry : entries) { cleanUpHFileRefs(entry.getEdit()); - + LOG.trace("shipped entry {}: ", entry); TableName tableName = entry.getKey().getTableName(); source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), tableName.getNameAsString()); @@ -214,7 +209,7 @@ public class ReplicationSourceShipper extends Thread { source.getSourceMetrics().setAgeOfLastShippedOp( entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { - LOG.trace("Replicated {} entries or {} operations in {} ms", + LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); } break; @@ -299,7 +294,7 @@ public class ReplicationSourceShipper extends Thread { return 0; } - private boolean isActive() { + protected boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 4483d7ac953..1552c473912 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -140,7 +140,7 @@ class ReplicationSourceWALReader extends Thread { if (batch != null) { // need to propagate the batch even it has no entries since it may carry the last // sequence id information for serial replication. - LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries()); + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); entryBatchQueue.put(batch); sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL @@ -168,8 +168,11 @@ class ReplicationSourceWALReader extends Thread { protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { WALEdit edit = entry.getEdit(); if (edit == null || edit.isEmpty()) { + LOG.debug("Edit null or empty for entry {} ", entry); return false; } + LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", + entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); @@ -284,7 +287,8 @@ class ReplicationSourceWALReader extends Thread { protected final Entry filterEntry(Entry entry) { Entry filtered = filter.filter(entry); - if (entry != null && filtered == null) { + if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { + LOG.debug("Filtered entry for replication: {}", entry); source.getSourceMetrics().incrLogEditsFiltered(); } return filtered; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index b2c199e2433..93f76f83860 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -173,6 +173,7 @@ class WALEntryStream implements Closeable { private void tryAdvanceEntry() throws IOException { if (checkReader()) { boolean beingWritten = readNextEntryAndRecordReaderPosition(); + LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten); if (currentEntry == null && !beingWritten) { // no more entries in this log file, and the file is already closed, i.e, rolled // Before dequeueing, we should always get one more attempt at reading. @@ -272,6 +273,7 @@ class WALEntryStream implements Closeable { return true; } if (readEntry != null) { + LOG.trace("reading entry: {} ", readEntry); metrics.incrLogEditsRead(); metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java index bf72383b8d7..ac9ae397267 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java @@ -86,22 +86,52 @@ public class TestGetReplicationLoad { @Test public void testGetReplicationMetrics() throws Exception { - String peer1 = "test1", peer2 = "test2"; - long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4; - int sizeOfLogQueue = 5; + String peer1 = "test1", peer2 = "test2", queueId="1"; + long ageOfLastShippedOp = 2, + replicationLag = 3, + timeStampOfLastShippedOp = 4, + timeStampOfNextToReplicate=5, + editsRead=6, + oPsShipped=7; + int sizeOfLogQueue = 8; + boolean recovered=false, + running=false, + editsSinceRestart=false; RegionServerStatusProtos.RegionServerReportRequest.Builder request = RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); ServerName serverName = cluster.getMaster(0).getServerName(); request.setServer(ProtobufUtil.toServerName(serverName)); ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource - .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) - .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) - .setSizeOfLogQueue(sizeOfLogQueue).build(); + .newBuilder().setPeerID(peer1) + .setAgeOfLastShippedOp(ageOfLastShippedOp) + .setReplicationLag(replicationLag) + .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) + .setSizeOfLogQueue(sizeOfLogQueue) + .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate) + .setQueueId(queueId) + .setEditsRead(editsRead) + .setOPsShipped(oPsShipped) + .setRunning(running) + .setRecovered(recovered) + .setEditsSinceRestart(editsSinceRestart) + .build(); ClusterStatusProtos.ReplicationLoadSource rload2 = - ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) - .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) + ClusterStatusProtos.ReplicationLoadSource + .newBuilder() + .setPeerID(peer2) + .setAgeOfLastShippedOp(ageOfLastShippedOp + 1) + .setReplicationLag(replicationLag + 1) .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) - .setSizeOfLogQueue(sizeOfLogQueue + 1).build(); + .setSizeOfLogQueue(sizeOfLogQueue + 1) + .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate+1) + .setQueueId(queueId) + .setEditsRead(editsRead+1) + .setOPsShipped(oPsShipped+1) + .setRunning(running) + .setRecovered(recovered) + .setEditsSinceRestart(editsSinceRestart) + .build(); + ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); request.setLoad(sl); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index f96dbe5dc17..11a94554219 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -165,8 +165,7 @@ public class TestReplicationBase { htable1.put(puts); } - @BeforeClass - public static void setUpBeforeClass() throws Exception { + protected static void configureClusters(){ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger // sufficient number of events. But we don't want to go too low because @@ -188,6 +187,17 @@ public class TestReplicationBase { conf1.setLong("hbase.serial.replication.waiting.ms", 100); utility1 = new HBaseTestingUtility(conf1); + + // Base conf2 on conf1 so it gets the right zk cluster. + conf2 = HBaseConfiguration.create(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); + + utility2 = new HBaseTestingUtility(conf2); + } + + protected static void startClusters() throws Exception{ utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different @@ -197,13 +207,6 @@ public class TestReplicationBase { admin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); - // Base conf2 on conf1 so it gets the right zk cluster. - conf2 = HBaseConfiguration.create(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); - conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); - - utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); zkw2 = new ZKWatcher(conf2, "cluster2", null, true); LOG.info("Setup second Zk"); @@ -238,6 +241,12 @@ public class TestReplicationBase { htable2 = connection2.getTable(tableName); } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + configureClusters(); + startClusters(); + } + private boolean peerExist(String peerId) throws IOException { return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index aaa843ef3dc..c778f528c93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.EnumSet; @@ -33,6 +34,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -60,7 +65,8 @@ public class TestReplicationStatus extends TestReplicationBase { @Test public void testReplicationStatus() throws Exception { LOG.info("testReplicationStatus"); - + utility2.shutdownMiniHBaseCluster(); + utility2.startMiniHBaseCluster(1,4); try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { // disable peer admin.disablePeer(PEER_ID); @@ -103,11 +109,204 @@ public class TestReplicationStatus extends TestReplicationBase { ServerLoad sl = status.getLoad(server); List rLoadSourceList = sl.getReplicationLoadSourceList(); // check SourceList still only has one entry - assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2)); assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); } finally { admin.enablePeer(PEER_ID); utility1.getHBaseCluster().getRegionServer(1).start(); } } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + //we need to perform initialisations from TestReplicationBase.setUpBeforeClass() on each + //test here, so we override BeforeClass to do nothing and call + // TestReplicationBase.setUpBeforeClass() from setup method + TestReplicationBase.configureClusters(); + } + + @Before + @Override + public void setUpBase() throws Exception { + TestReplicationBase.startClusters(); + super.setUpBase(); + } + + @After + @Override + public void tearDownBase() throws Exception { + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass(){ + //We need to override it here to avoid issues when trying to execute super class teardown + } + + @Test + public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + Thread.sleep(10000); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(1, loadSources.size()); + ReplicationLoadSource loadSource = loadSources.get(0); + assertFalse(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertEquals(0, loadSource.getReplicationLag()); + assertFalse(loadSource.isRecovered()); + } + + @Test + public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + //add some values to source cluster + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(1, loadSources.size()); + ReplicationLoadSource loadSource = loadSources.get(0); + assertTrue(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertTrue(loadSource.getReplicationLag()>0); + assertFalse(loadSource.isRecovered()); + } + + @Test + public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + //add some values to cluster 1 + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + Thread.sleep(10000); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(2, loadSources.size()); + boolean foundRecovery = false; + boolean foundNormal = false; + for(ReplicationLoadSource loadSource : loadSources){ + if (loadSource.isRecovered()){ + foundRecovery = true; + assertTrue(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertTrue(loadSource.getReplicationLag()>0); + } else { + foundNormal = true; + assertFalse(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertEquals(0, loadSource.getReplicationLag()); + } + } + assertTrue("No normal queue found.", foundNormal); + assertTrue("No recovery queue found.", foundRecovery); + } + + @Test + public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + //add some values to cluster 1 + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + Thread.sleep(10000); + //add more values to cluster 1, these should cause normal queue to lag + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(2, loadSources.size()); + boolean foundRecovery = false; + boolean foundNormal = false; + for(ReplicationLoadSource loadSource : loadSources){ + if (loadSource.isRecovered()){ + foundRecovery = true; + } else { + foundNormal = true; + } + assertTrue(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertTrue(loadSource.getReplicationLag()>0); + } + assertTrue("No normal queue found.", foundNormal); + assertTrue("No recovery queue found.", foundRecovery); + } + + @Test + public void testReplicationStatusAfterLagging() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + //add some values to cluster 1 + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + utility2.startMiniHBaseCluster(); + Thread.sleep(10000); + try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + ServerName serverName = utility1.getHBaseCluster().getRegionServer(0). + getServerName(); + ClusterStatus status = + new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics().get(serverName). + getReplicationLoadSourceList(); + assertEquals(1, loadSources.size()); + ReplicationLoadSource loadSource = loadSources.get(0); + assertTrue(loadSource.hasEditsSinceRestart()); + assertTrue(loadSource.getTimestampOfLastShippedOp() > 0); + assertEquals(0, loadSource.getReplicationLag()); + }finally{ + utility2.shutdownMiniHBaseCluster(); + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index f716de99a5a..6b0e7c05cee 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -785,37 +785,30 @@ module Hbase puts(format(' %s', server)) end elsif format == 'replication' - puts(format('version %s', status.getHBaseVersion)) - puts(format('%d live servers', status.getServersSize)) - for server in status.getServers - sl = status.getLoad(server) - rSinkString = ' SINK :' - rSourceString = ' SOURCE:' - rLoadSink = sl.getReplicationLoadSink - next if rLoadSink.nil? - rSinkString << ' AgeOfLastAppliedOp=' + rLoadSink.getAgeOfLastAppliedOp.to_s - rSinkString << ', TimeStampsOfLastAppliedOp=' + - java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString - rLoadSourceList = sl.getReplicationLoadSourceList - index = 0 - while index < rLoadSourceList.size - rLoadSource = rLoadSourceList.get(index) - rSourceString << ' PeerID=' + rLoadSource.getPeerID - rSourceString << ', AgeOfLastShippedOp=' + rLoadSource.getAgeOfLastShippedOp.to_s - rSourceString << ', SizeOfLogQueue=' + rLoadSource.getSizeOfLogQueue.to_s - rSourceString << ', TimeStampsOfLastShippedOp=' + - java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp).toString - rSourceString << ', Replication Lag=' + rLoadSource.getReplicationLag.to_s - index += 1 - end - puts(format(' %s:', server.getHostname)) - if type.casecmp('SOURCE') == 0 - puts(format('%s', rSourceString)) - elsif type.casecmp('SINK') == 0 - puts(format('%s', rSinkString)) + puts(format('version %s', version: status.getHBaseVersion)) + puts(format('%d live servers', servers: status.getServersSize)) + status.getServers.each do |server_status| + sl = status.getLoad(server_status) + r_sink_string = ' SINK:' + r_source_string = ' SOURCE:' + r_load_sink = sl.getReplicationLoadSink + next if r_load_sink.nil? + + r_sink_string << ' AgeOfLastAppliedOp=' + + r_load_sink.getAgeOfLastAppliedOp.to_s + r_sink_string << ', TimeStampsOfLastAppliedOp=' + + java.util.Date.new(r_load_sink + .getTimeStampsOfLastAppliedOp).toString + r_load_source_map = sl.getReplicationLoadSourceMap + build_source_string(r_load_source_map, r_source_string) + puts(format(' %s:', host: server_status.getHostname)) + if type.casecmp('SOURCE').zero? + puts(format('%s', source: r_source_string)) + elsif type.casecmp('SINK').zero? + puts(format('%s', sink: r_sink_string)) else - puts(format('%s', rSourceString)) - puts(format('%s', rSinkString)) + puts(format('%s', source: r_source_string)) + puts(format('%s', sink: r_sink_string)) end end elsif format == 'simple' @@ -844,6 +837,71 @@ module Hbase end end + def build_source_string(r_load_source_map, r_source_string) + r_load_source_map.each do |peer, sources| + r_source_string << ' PeerID=' + peer + sources.each do |source_load| + build_queue_title(source_load, r_source_string) + build_running_source_stats(source_load, r_source_string) + end + end + end + + def build_queue_title(source_load, r_source_string) + r_source_string << if source_load.isRecovered + "\n Recovered Queue: " + else + "\n Normal Queue: " + end + r_source_string << source_load.getQueueId + end + + def build_running_source_stats(source_load, r_source_string) + if source_load.isRunning + build_shipped_stats(source_load, r_source_string) + build_load_general_stats(source_load, r_source_string) + r_source_string << ', Replication Lag=' + + source_load.getReplicationLag.to_s + else + r_source_string << "\n " + r_source_string << 'No Reader/Shipper threads runnning yet.' + end + end + + def build_shipped_stats(source_load, r_source_string) + r_source_string << if source_load.getTimeStampOfLastShippedOp.zero? + "\n " \ + 'No Ops shipped since last restart' + else + "\n AgeOfLastShippedOp=" + + source_load.getAgeOfLastShippedOp.to_s + + ', TimeStampOfLastShippedOp=' + + java.util.Date.new(source_load + .getTimeStampOfLastShippedOp).toString + end + end + + def build_load_general_stats(source_load, r_source_string) + r_source_string << ', SizeOfLogQueue=' + + source_load.getSizeOfLogQueue.to_s + r_source_string << ', EditsReadFromLogQueue=' + + source_load.getEditsRead.to_s + r_source_string << ', OpsShippedToTarget=' + + source_load.getOPsShipped.to_s + build_edits_for_source(source_load, r_source_string) + end + + def build_edits_for_source(source_load, r_source_string) + if source_load.hasEditsSinceRestart + r_source_string << ', TimeStampOfNextToReplicate=' + + java.util.Date.new(source_load + .getTimeStampOfNextToReplicate).toString + else + r_source_string << ', No edits for this source' + r_source_string << ' since it started' + end + end + #---------------------------------------------------------------------------------------------- # # Helper methods