HBASE-21505 - proposal for a more consistent report on status

Signed-off-by: Jingyun Tian <tianjy@apache.org>
This commit is contained in:
Wellington Chevreuil 2019-02-19 14:47:18 +00:00 committed by Jingyun Tian
parent d7c5d73de0
commit a8af0e5894
23 changed files with 735 additions and 189 deletions

View File

@ -389,6 +389,15 @@ public class ServerLoad implements ServerMetrics {
return metrics.getReplicationLoadSourceList(); return metrics.getReplicationLoadSourceList();
} }
/**
* Call directly from client such as hbase shell
* @return a map of ReplicationLoadSource list per peer id
*/
@Override
public Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap() {
return metrics.getReplicationLoadSourceMap();
}
/** /**
* Call directly from client such as hbase shell * Call directly from client such as hbase shell
* @return ReplicationLoadSink * @return ReplicationLoadSink

View File

@ -75,6 +75,12 @@ public interface ServerMetrics {
*/ */
List<ReplicationLoadSource> getReplicationLoadSourceList(); List<ReplicationLoadSource> getReplicationLoadSourceList();
/**
* Call directly from client such as hbase shell
* @return a map of ReplicationLoadSource list per peer id
*/
Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap();
/** /**
* Call directly from client such as hbase shell * Call directly from client such as hbase shell
* @return ReplicationLoadSink * @return ReplicationLoadSink

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import edu.umd.cs.findbugs.annotations.Nullable; import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -75,7 +78,7 @@ public final class ServerMetricsBuilder {
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream() .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList())) .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream() .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink() .setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
: null) : null)
@ -301,6 +304,16 @@ public final class ServerMetricsBuilder {
return Collections.unmodifiableList(sources); return Collections.unmodifiableList(sources);
} }
@Override
public Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap(){
Map<String,List<ReplicationLoadSource>> sourcesMap = new HashMap<>();
for(ReplicationLoadSource loadSource : sources){
sourcesMap.computeIfAbsent(loadSource.getPeerID(),
peerId -> new ArrayList()).add(loadSource);
}
return sourcesMap;
}
@Override @Override
public ReplicationLoadSink getReplicationLoadSink() { public ReplicationLoadSink getReplicationLoadSink() {
return sink; return sink;

View File

@ -16,21 +16,36 @@ import org.apache.yetus.audience.InterfaceAudience;
* A HBase ReplicationLoad to present MetricsSource information * A HBase ReplicationLoad to present MetricsSource information
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
public class ReplicationLoadSource { public final class ReplicationLoadSource {
private final String peerID; private final String peerID;
private final long ageOfLastShippedOp; private final long ageOfLastShippedOp;
private final int sizeOfLogQueue; private final int sizeOfLogQueue;
private final long timestampOfLastShippedOp; private final long timestampOfLastShippedOp;
private final long replicationLag; private final long replicationLag;
private long timeStampOfNextToReplicate;
private String queueId;
private boolean recovered;
private boolean running;
private boolean editsSinceRestart;
private long editsRead;
private long oPsShipped;
// TODO: add the builder for this class
@InterfaceAudience.Private @InterfaceAudience.Private
public ReplicationLoadSource(String id, long age, int size, long timestamp, long lag) { private ReplicationLoadSource(String id, long age, int size, long timestamp,
long timeStampOfNextToReplicate, long lag, String queueId, boolean recovered, boolean running,
boolean editsSinceRestart, long editsRead, long oPsShipped) {
this.peerID = id; this.peerID = id;
this.ageOfLastShippedOp = age; this.ageOfLastShippedOp = age;
this.sizeOfLogQueue = size; this.sizeOfLogQueue = size;
this.timestampOfLastShippedOp = timestamp; this.timestampOfLastShippedOp = timestamp;
this.replicationLag = lag; this.replicationLag = lag;
this.timeStampOfNextToReplicate = timeStampOfNextToReplicate;
this.queueId = queueId;
this.recovered = recovered;
this.running = running;
this.editsSinceRestart = editsSinceRestart;
this.editsRead = editsRead;
this.oPsShipped = oPsShipped;
} }
public String getPeerID() { public String getPeerID() {
@ -61,4 +76,123 @@ public class ReplicationLoadSource {
public long getReplicationLag() { public long getReplicationLag() {
return this.replicationLag; return this.replicationLag;
} }
}
public long getTimeStampOfNextToReplicate() {
return this.timeStampOfNextToReplicate;
}
public String getQueueId() {
return queueId;
}
public boolean isRecovered() {
return recovered;
}
public boolean isRunning() {
return running;
}
public boolean hasEditsSinceRestart() {
return editsSinceRestart;
}
public long getEditsRead() {
return editsRead;
}
public long getOPsShipped() {
return oPsShipped;
}
public static ReplicationLoadSourceBuilder newBuilder(){
return new ReplicationLoadSourceBuilder();
}
public static final class ReplicationLoadSourceBuilder {
private String peerID;
private long ageOfLastShippedOp;
private int sizeOfLogQueue;
private long timestampOfLastShippedOp;
private long replicationLag;
private long timeStampOfNextToReplicate;
private String queueId;
private boolean recovered;
private boolean running;
private boolean editsSinceRestart;
private long editsRead;
private long oPsShipped;
private ReplicationLoadSourceBuilder(){
}
public ReplicationLoadSourceBuilder setTimeStampOfNextToReplicate(
long timeStampOfNextToReplicate) {
this.timeStampOfNextToReplicate = timeStampOfNextToReplicate;
return this;
}
public ReplicationLoadSourceBuilder setPeerID(String peerID) {
this.peerID = peerID;
return this;
}
public ReplicationLoadSourceBuilder setAgeOfLastShippedOp(long ageOfLastShippedOp) {
this.ageOfLastShippedOp = ageOfLastShippedOp;
return this;
}
public ReplicationLoadSourceBuilder setSizeOfLogQueue(int sizeOfLogQueue) {
this.sizeOfLogQueue = sizeOfLogQueue;
return this;
}
public ReplicationLoadSourceBuilder setTimestampOfLastShippedOp(long timestampOfLastShippedOp) {
this.timestampOfLastShippedOp = timestampOfLastShippedOp;
return this;
}
public ReplicationLoadSourceBuilder setReplicationLag(long replicationLag) {
this.replicationLag = replicationLag;
return this;
}
public ReplicationLoadSourceBuilder setQueueId(String queueId) {
this.queueId = queueId;
return this;
}
public ReplicationLoadSourceBuilder setRecovered(boolean recovered) {
this.recovered = recovered;
return this;
}
public ReplicationLoadSourceBuilder setRunning(boolean running) {
this.running = running;
return this;
}
public ReplicationLoadSourceBuilder setEditsSinceRestart(boolean editsSinceRestart) {
this.editsSinceRestart = editsSinceRestart;
return this;
}
public ReplicationLoadSourceBuilder setEditsRead(long editsRead) {
this.editsRead = editsRead;
return this;
}
public ReplicationLoadSourceBuilder setoPsShipped(long oPsShipped) {
this.oPsShipped = oPsShipped;
return this;
}
public ReplicationLoadSource build(){
return new ReplicationLoadSource(peerID, ageOfLastShippedOp, sizeOfLogQueue,
timestampOfLastShippedOp, timeStampOfNextToReplicate, replicationLag, queueId, recovered,
running, editsSinceRestart, editsRead, oPsShipped);
}
}
}

View File

@ -2717,8 +2717,20 @@ public final class ProtobufUtil {
public static ReplicationLoadSource toReplicationLoadSource( public static ReplicationLoadSource toReplicationLoadSource(
ClusterStatusProtos.ReplicationLoadSource rls) { ClusterStatusProtos.ReplicationLoadSource rls) {
return new ReplicationLoadSource(rls.getPeerID(), rls.getAgeOfLastShippedOp(), ReplicationLoadSource.ReplicationLoadSourceBuilder builder = ReplicationLoadSource.newBuilder();
rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), rls.getReplicationLag()); builder.setPeerID(rls.getPeerID()).
setAgeOfLastShippedOp(rls.getAgeOfLastShippedOp()).
setSizeOfLogQueue(rls.getSizeOfLogQueue()).
setTimestampOfLastShippedOp(rls.getTimeStampOfLastShippedOp()).
setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate()).
setReplicationLag(rls.getReplicationLag()).
setQueueId(rls.getQueueId()).
setRecovered(rls.getRecovered()).
setRunning(rls.getRunning()).
setEditsSinceRestart(rls.getEditsSinceRestart()).
setEditsRead(rls.getEditsRead()).
setoPsShipped(rls.getOPsShipped());
return builder.build();
} }
/** /**
@ -3227,6 +3239,13 @@ public final class ProtobufUtil {
.setSizeOfLogQueue((int) rls.getSizeOfLogQueue()) .setSizeOfLogQueue((int) rls.getSizeOfLogQueue())
.setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp()) .setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp())
.setReplicationLag(rls.getReplicationLag()) .setReplicationLag(rls.getReplicationLag())
.setQueueId(rls.getQueueId())
.setRecovered(rls.isRecovered())
.setRunning(rls.isRunning())
.setEditsSinceRestart(rls.hasEditsSinceRestart())
.setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate())
.setOPsShipped(rls.getOPsShipped())
.setEditsRead(rls.getEditsRead())
.build(); .build();
} }

View File

@ -76,4 +76,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrCompletedWAL(); void incrCompletedWAL();
void incrCompletedRecoveryQueue(); void incrCompletedRecoveryQueue();
void incrFailedRecoveryQueue(); void incrFailedRecoveryQueue();
long getWALEditsRead();
long getShippedOps();
long getEditsFiltered();
} }

View File

@ -32,7 +32,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableHistogram ageOfLastShippedOpHist; private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter shippedBytesCounter;
@ -73,7 +73,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
@ -111,7 +111,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
} }
@Override public void incrLogEditsFiltered(long size) { @Override public void incrLogEditsFiltered(long size) {
logEditsFilteredCounter.incr(size); walEditsFilteredCounter.incr(size);
} }
@Override public void incrBatchesShipped(int batches) { @Override public void incrBatchesShipped(int batches) {
@ -260,4 +260,19 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public String getMetricsName() { public String getMetricsName() {
return rms.getMetricsName(); return rms.getMetricsName();
} }
@Override
public long getWALEditsRead() {
return this.logReadInEditsCounter.value();
}
@Override
public long getShippedOps() {
return this.shippedOpsCounter.value();
}
@Override
public long getEditsFiltered() {
return this.walEditsFilteredCounter.value();
}
} }

View File

@ -48,7 +48,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableHistogram ageOfLastShippedOpHist; private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter shippedKBsCounter;
@ -102,7 +102,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L); logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
logEditsFilteredKey = this.keyPrefix + "logEditsFiltered"; logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L); walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
shippedHFilesKey = this.keyPrefix + "shippedHFiles"; shippedHFilesKey = this.keyPrefix + "shippedHFiles";
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L); shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
@ -149,7 +149,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
} }
@Override public void incrLogEditsFiltered(long size) { @Override public void incrLogEditsFiltered(long size) {
logEditsFilteredCounter.incr(size); walEditsFilteredCounter.incr(size);
} }
@Override public void incrBatchesShipped(int batches) { @Override public void incrBatchesShipped(int batches) {
@ -314,4 +314,16 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public String getMetricsName() { public String getMetricsName() {
return rms.getMetricsName(); return rms.getMetricsName();
} }
@Override public long getWALEditsRead() {
return this.logReadInEditsCounter.value();
}
@Override public long getShippedOps() {
return this.shippedOpsCounter.value();
}
@Override public long getEditsFiltered() {
return this.walEditsFilteredCounter.value();
}
} }

View File

@ -158,6 +158,13 @@ message ReplicationLoadSource {
required uint32 sizeOfLogQueue = 3; required uint32 sizeOfLogQueue = 3;
required uint64 timeStampOfLastShippedOp = 4; required uint64 timeStampOfLastShippedOp = 4;
required uint64 replicationLag = 5; required uint64 replicationLag = 5;
optional uint64 timeStampOfNextToReplicate=6;
optional string queueId = 7;
optional bool recovered = 8;
optional bool running = 9;
optional bool editsSinceRestart = 10;
optional uint64 editsRead = 11;
optional uint64 oPsShipped = 12;
} }
message ServerLoad { message ServerLoad {

View File

@ -1372,9 +1372,11 @@ public class HRegionServer extends HasThread implements
ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
if (rLoad != null) { if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { for (ClusterStatusProtos.ReplicationLoadSource rLS :
rLoad.getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS); serverLoad.addReplLoadSource(rLS);
} }
} }
} }

View File

@ -344,7 +344,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
@Override @Override
public boolean replicate(ReplicateContext replicateContext) { public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1; int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) { if (!peersSelected && this.isRunning()) {
@ -371,19 +370,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
reconnectToPeerCluster(); reconnectToPeerCluster();
} }
try { try {
long lastWriteTime;
// replicate the batches to sink side. // replicate the batches to sink side.
lastWriteTime = parallelReplicate(pool, replicateContext, batches); parallelReplicate(pool, replicateContext, batches);
// update metrics
if (lastWriteTime > 0) {
this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
}
return true; return true;
} catch (IOException ioe) { } catch (IOException ioe) {
// Didn't ship anything, but must still age the last time we did
this.metrics.refreshAgeOfLastShippedOp(walGroupId);
if (ioe instanceof RemoteException) { if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException(); ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);

View File

@ -41,10 +41,11 @@ public class MetricsSource implements BaseSource {
private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
// tracks last shipped timestamp for each wal group // tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimestamps = new HashMap<>(); private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
private long lastHFileRefsQueueSize = 0; private long lastHFileRefsQueueSize = 0;
private String id; private String id;
private long timeStampNextToReplicate;
private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource; private final MetricsReplicationSourceSource globalSourceSource;
@ -81,7 +82,7 @@ public class MetricsSource implements BaseSource {
/** /**
* Set the age of the last edit that was shipped * Set the age of the last edit that was shipped
* @param timestamp write time of the edit * @param timestamp target write time of the edit
* @param walGroup which group we are setting * @param walGroup which group we are setting
*/ */
public void setAgeOfLastShippedOp(long timestamp, String walGroup) { public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
@ -89,7 +90,7 @@ public class MetricsSource implements BaseSource {
singleSourceSource.setLastShippedAge(age); singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age);
this.ageOfLastShippedOp.put(walGroup, age); this.ageOfLastShippedOp.put(walGroup, age);
this.lastTimestamps.put(walGroup, timestamp); this.lastShippedTimeStamps.put(walGroup, timestamp);
} }
/** /**
@ -105,15 +106,6 @@ public class MetricsSource implements BaseSource {
.setLastShippedAge(age); .setLastShippedAge(age);
} }
/**
* get the last timestamp of given wal group. If the walGroup is null, return 0.
* @param walGroup which group we are getting
* @return timeStamp
*/
public long getLastTimeStampOfWalGroup(String walGroup) {
return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
}
/** /**
* get age of last shipped op of given wal group. If the walGroup is null, return 0 * get age of last shipped op of given wal group. If the walGroup is null, return 0
* @param walGroup which group we are getting * @param walGroup which group we are getting
@ -129,9 +121,9 @@ public class MetricsSource implements BaseSource {
* @param walGroupId id of the group to update * @param walGroupId id of the group to update
*/ */
public void refreshAgeOfLastShippedOp(String walGroupId) { public void refreshAgeOfLastShippedOp(String walGroupId) {
Long lastTimestamp = this.lastTimestamps.get(walGroupId); Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
if (lastTimestamp == null) { if (lastTimestamp == null) {
this.lastTimestamps.put(walGroupId, 0L); this.lastShippedTimeStamps.put(walGroupId, 0L);
lastTimestamp = 0L; lastTimestamp = 0L;
} }
if (lastTimestamp > 0) { if (lastTimestamp > 0) {
@ -198,6 +190,30 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrShippedBytes(sizeInBytes); globalSourceSource.incrShippedBytes(sizeInBytes);
} }
/**
* Gets the number of edits not eligible for replication this source queue logs so far.
* @return logEditsFiltered non-replicable edits filtered from this queue logs.
*/
public long getEditsFiltered(){
return this.singleSourceSource.getEditsFiltered();
}
/**
* Gets the number of edits eligible for replication read from this source queue logs so far.
* @return replicableEdits total number of replicable edits read from this queue logs.
*/
public long getReplicableEdits(){
return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
}
/**
* Gets the number of OPs shipped by this source queue to target cluster.
* @return oPsShipped total number of OPs shipped by this source.
*/
public long getOpsShipped() {
return this.singleSourceSource.getShippedOps();
}
/** /**
* Convience method to apply changes to metrics do to shipping a batch of logs. * Convience method to apply changes to metrics do to shipping a batch of logs.
* *
@ -223,8 +239,9 @@ public class MetricsSource implements BaseSource {
singleSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear(); singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimestamps.clear(); lastShippedTimeStamps.clear();
lastHFileRefsQueueSize = 0; lastHFileRefsQueueSize = 0;
timeStampNextToReplicate = 0;
} }
/** /**
@ -260,7 +277,7 @@ public class MetricsSource implements BaseSource {
*/ */
public long getTimestampOfLastShippedOp() { public long getTimestampOfLastShippedOp() {
long lastTimestamp = 0L; long lastTimestamp = 0L;
for (long ts : lastTimestamps.values()) { for (long ts : lastShippedTimeStamps.values()) {
if (ts > lastTimestamp) { if (ts > lastTimestamp) {
lastTimestamp = ts; lastTimestamp = ts;
} }
@ -268,6 +285,32 @@ public class MetricsSource implements BaseSource {
return lastTimestamp; return lastTimestamp;
} }
/**
* TimeStamp of next edit to be replicated.
* @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
*/
public long getTimeStampNextToReplicate() {
return timeStampNextToReplicate;
}
/**
* TimeStamp of next edit targeted for replication. Used for calculating lag,
* as if this timestamp is greater than timestamp of last shipped, it means there's
* at least one edit pending replication.
* @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
*/
public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
this.timeStampNextToReplicate = timeStampNextToReplicate;
}
public long getReplicationDelay() {
if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
return 0;
}else{
return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
}
}
/** /**
* Get the slave peer ID * Get the slave peer ID
* @return peerID * @return peerID

View File

@ -249,24 +249,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} }
private void buildReplicationLoad() { private void buildReplicationLoad() {
List<MetricsSource> sourceMetricsList = new ArrayList<>(); List<ReplicationSourceInterface> allSources = new ArrayList<>();
allSources.addAll(this.replicationManager.getSources());
// get source allSources.addAll(this.replicationManager.getOldSources());
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) {
sourceMetricsList.add(source.getSourceMetrics());
}
// get old source
List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
for (ReplicationSourceInterface source : oldSources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
}
// get sink // get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
} }
} }

View File

@ -19,15 +19,14 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Strings;
/** /**
* This class is used for exporting some of the info from replication metrics * This class is used for exporting some of the info from replication metrics
@ -37,11 +36,9 @@ public class ReplicationLoad {
// Empty load instance. // Empty load instance.
public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
private List<MetricsSource> sourceMetricsList;
private MetricsSink sinkMetrics; private MetricsSink sinkMetrics;
private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceList; private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
/** default constructor */ /** default constructor */
@ -51,13 +48,12 @@ public class ReplicationLoad {
/** /**
* buildReplicationLoad * buildReplicationLoad
* @param srMetricsList * @param sources List of ReplicationSource instances for which metrics should be reported
* @param skMetrics * @param skMetrics
*/ */
public void buildReplicationLoad(final List<MetricsSource> srMetricsList, public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
final MetricsSink skMetrics) { final MetricsSink skMetrics) {
this.sourceMetricsList = srMetricsList;
this.sinkMetrics = skMetrics; this.sinkMetrics = skMetrics;
// build the SinkLoad // build the SinkLoad
@ -67,10 +63,9 @@ public class ReplicationLoad {
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
this.replicationLoadSink = rLoadSinkBuild.build(); this.replicationLoadSink = rLoadSinkBuild.build();
// build the SourceLoad List this.replicationLoadSourceEntries = new ArrayList<>();
Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap = for (ReplicationSourceInterface source : sources) {
new HashMap<>(); MetricsSource sm = source.getSourceMetrics();
for (MetricsSource sm : this.sourceMetricsList) {
// Get the actual peer id // Get the actual peer id
String peerId = sm.getPeerID(); String peerId = sm.getPeerID();
String[] parts = peerId.split("-", 2); String[] parts = peerId.split("-", 2);
@ -78,18 +73,11 @@ public class ReplicationLoad {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue(); int sizeOfLogQueue = sm.getSizeOfLogQueue();
long editsRead = sm.getReplicableEdits();
long oPsShipped = sm.getOpsShipped();
long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
long replicationLag = long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate();
calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); long replicationLag = sm.getReplicationDelay();
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
if (rLoadSource != null) {
ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp);
sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
timeStampOfLastShippedOp);
replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag);
}
ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
ClusterStatusProtos.ReplicationLoadSource.newBuilder(); ClusterStatusProtos.ReplicationLoadSource.newBuilder();
rLoadSourceBuild.setPeerID(peerId); rLoadSourceBuild.setPeerID(peerId);
@ -97,33 +85,19 @@ public class ReplicationLoad {
rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
rLoadSourceBuild.setReplicationLag(replicationLag); rLoadSourceBuild.setReplicationLag(replicationLag);
rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate);
rLoadSourceBuild.setEditsRead(editsRead);
rLoadSourceBuild.setOPsShipped(oPsShipped);
if (source instanceof ReplicationSource){
ReplicationSource replSource = (ReplicationSource)source;
rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered());
rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId());
rLoadSourceBuild.setRunning(replSource.isWorkerRunning());
rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate>0);
}
replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); this.replicationLoadSourceEntries.add(rLoadSourceBuild.build());
} }
this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
}
static long calculateReplicationDelay(long ageOfLastShippedOp,
long timeStampOfLastShippedOp, int sizeOfLogQueue) {
long replicationLag;
long timePassedAfterLastShippedOp;
if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE
return Long.MAX_VALUE;
} else {
timePassedAfterLastShippedOp =
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
}
if (sizeOfLogQueue > 1) {
// err on the large side
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
replicationLag = ageOfLastShippedOp; // last shipped happen recently
} else {
// last shipped may happen last night,
// so NO real lag although ageOfLastShippedOp is non-zero
replicationLag = 0;
}
return replicationLag;
} }
/** /**
@ -131,18 +105,17 @@ public class ReplicationLoad {
* @return a string contains sourceReplicationLoad information * @return a string contains sourceReplicationLoad information
*/ */
public String sourceToString() { public String sourceToString() {
if (this.sourceMetricsList == null) return null;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) { for (ClusterStatusProtos.ReplicationLoadSource rls :
this.replicationLoadSourceEntries) {
sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID()); sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID());
sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp()); sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue()); sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
sb = sb =
Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp", Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
(new Date(rls.getTimeStampOfLastShippedOp()).toString())); (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag()); sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
} }
@ -171,8 +144,8 @@ public class ReplicationLoad {
return this.replicationLoadSink; return this.replicationLoadSink;
} }
public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceList() { public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceEntries() {
return this.replicationLoadSourceList; return this.replicationLoadSourceEntries;
} }
/** /**

View File

@ -181,8 +181,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed(); this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider; this.walFileLengthProvider = walFileLengthProvider;
LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
+ ", currentBandwidth=" + this.currentBandwidth); replicationPeer.getId(), this.currentBandwidth);
} }
private void decorateConf() { private void decorateConf() {
@ -207,6 +207,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
queue.put(log); queue.put(log);
LOG.trace("Added log file {} to queue of source {}.", logPrefix,
this.replicationQueueInfo.getQueueId());
this.metrics.incrSizeOfLogQueue(); this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold // This will log a warning for each new log that gets created above the warn threshold
int queueSize = queue.size(); int queueSize = queue.size();
@ -315,15 +317,13 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override @Override
public Map<String, ReplicationStatus> getWalGroupStatus() { public Map<String, ReplicationStatus> getWalGroupStatus() {
Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; long ageOfLastShippedOp, replicationDelay, fileSize;
for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
String walGroupId = walGroupShipper.getKey(); String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue(); ReplicationSourceShipper shipper = walGroupShipper.getValue();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size(); int queueSize = queues.get(walGroupId).size();
replicationDelay = replicationDelay = metrics.getReplicationDelay();
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = shipper.getCurrentPath(); Path currentPath = shipper.getCurrentPath();
fileSize = -1; fileSize = -1;
if (currentPath != null) { if (currentPath != null) {
@ -486,6 +486,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
for (;;) { for (;;) {
peerClusterId = replicationEndpoint.getPeerUUID(); peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && peerClusterId == null) { if (this.isSourceActive() && peerClusterId == null) {
LOG.debug("Could not connect to Peer ZK. Sleeping for "
+ (this.sleepForRetries * sleepMultiplier) + " millis.");
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
@ -503,7 +505,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.manager.removeSource(this); this.manager.removeSource(this);
return; return;
} }
LOG.info("Replicating " + clusterId + " -> " + peerClusterId); LOG.info("Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId); initializeWALEntryFilter(peerClusterId);
// start workers // start workers
@ -552,7 +555,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
Collection<ReplicationSourceShipper> workers = workerThreads.values(); Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
worker.stopWorker(); worker.stopWorker();
worker.entryReader.setReaderRunning(false); if(worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
} }
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
@ -622,6 +627,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
return !this.server.isStopped() && this.sourceRunning; return !this.server.isStopped() && this.sourceRunning;
} }
public UUID getPeerClusterUUID(){
return this.clusterId;
}
/** /**
* Comparator used to compare logs together based on their start time * Comparator used to compare logs together based on their start time
*/ */
@ -644,6 +653,19 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
public ReplicationQueueInfo getReplicationQueueInfo() {
return replicationQueueInfo;
}
public boolean isWorkerRunning(){
for(ReplicationSourceShipper worker : this.workerThreads.values()){
if(worker.isActive()){
return worker.isActive();
}
}
return false;
}
@Override @Override
public String getStats() { public String getStats() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -383,7 +383,13 @@ public class ReplicationSourceManager implements ReplicationListener {
toRemove.terminate(terminateMessage); toRemove.terminate(terminateMessage);
} }
for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); walsByGroup.forEach(wal -> {
Path walPath = new Path(this.logDir, wal);
src.enqueueLog(walPath);
LOG.trace("Enqueued {} to source {} during source creation.",
walPath, src.getQueueId());
});
} }
} }
LOG.info("Startup replication source for " + src.getPeerId()); LOG.info("Startup replication source for " + src.getPeerId());
@ -403,8 +409,13 @@ public class ReplicationSourceManager implements ReplicationListener {
for (String queueId : previousQueueIds) { for (String queueId : previousQueueIds) {
ReplicationSourceInterface replicationSource = createSource(queueId, peer); ReplicationSourceInterface replicationSource = createSource(queueId, peer);
this.oldsources.add(replicationSource); this.oldsources.add(replicationSource);
LOG.trace("Added source for recovered queue: " + src.getQueueId());
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); walsByGroup.forEach(wal -> {
LOG.trace("Enqueueing log from recovered queue for source: {}",
src.getQueueId());
src.enqueueLog(new Path(wal));
});
} }
toStartup.add(replicationSource); toStartup.add(replicationSource);
} }
@ -613,6 +624,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// This only updates the sources we own, not the recovered ones // This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources.values()) { for (ReplicationSourceInterface source : this.sources.values()) {
source.enqueueLog(newLog); source.enqueueLog(newLog);
LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.",
newLog, source.getQueueId());
} }
} }

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -90,6 +89,7 @@ public class ReplicationSourceShipper extends Thread {
@Override @Override
public final void run() { public final void run() {
setWorkerState(WorkerState.RUNNING); setWorkerState(WorkerState.RUNNING);
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down // Loop until we close down
while (isActive()) { while (isActive()) {
// Sleep until replication is enabled again // Sleep until replication is enabled again
@ -101,10 +101,9 @@ public class ReplicationSourceShipper extends Thread {
} }
try { try {
WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
LOG.debug("Shipper from source {} got entry batch from reader: {}",
source.getQueueId(), entryBatch);
if (entryBatch == null) { if (entryBatch == null) {
// since there is no logs need to replicate, we refresh the ageOfLastShippedOp
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
walGroupId);
continue; continue;
} }
// the NO_MORE_DATA instance has no path so do not call shipEdits // the NO_MORE_DATA instance has no path so do not call shipEdits
@ -153,16 +152,13 @@ public class ReplicationSourceShipper extends Thread {
List<Entry> entries = entryBatch.getWalEntries(); List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0; int sleepMultiplier = 0;
if (entries.isEmpty()) { if (entries.isEmpty()) {
if (updateLogPosition(entryBatch)) { updateLogPosition(entryBatch);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
walGroupId);
}
return; return;
} }
int currentSize = (int) entryBatch.getHeapSize(); int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1)
.getKey().getWriteTime());
while (isActive()) { while (isActive()) {
try { try {
try { try {
@ -174,7 +170,6 @@ public class ReplicationSourceShipper extends Thread {
// directly go back to while() for confirm this // directly go back to while() for confirm this
continue; continue;
} }
// create replicateContext here, so the entries can be GC'd upon return from this call // create replicateContext here, so the entries can be GC'd upon return from this call
// stack // stack
ReplicationEndpoint.ReplicateContext replicateContext = ReplicationEndpoint.ReplicateContext replicateContext =
@ -195,7 +190,7 @@ public class ReplicationSourceShipper extends Thread {
// Clean up hfile references // Clean up hfile references
for (Entry entry : entries) { for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit()); cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
TableName tableName = entry.getKey().getTableName(); TableName tableName = entry.getKey().getTableName();
source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
tableName.getNameAsString()); tableName.getNameAsString());
@ -214,7 +209,7 @@ public class ReplicationSourceShipper extends Thread {
source.getSourceMetrics().setAgeOfLastShippedOp( source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Replicated {} entries or {} operations in {} ms", LOG.debug("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
} }
break; break;
@ -299,7 +294,7 @@ public class ReplicationSourceShipper extends Thread {
return 0; return 0;
} }
private boolean isActive() { protected boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
} }

View File

@ -140,7 +140,7 @@ class ReplicationSourceWALReader extends Thread {
if (batch != null) { if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last // need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication. // sequence id information for serial replication.
LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries()); LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch); entryBatchQueue.put(batch);
sleepMultiplier = 1; sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL } else { // got no entries and didn't advance position in WAL
@ -168,8 +168,11 @@ class ReplicationSourceWALReader extends Thread {
protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
if (edit == null || edit.isEmpty()) { if (edit == null || edit.isEmpty()) {
LOG.debug("Edit null or empty for entry {} ", entry);
return false; return false;
} }
LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry); batch.addEntry(entry);
@ -284,7 +287,8 @@ class ReplicationSourceWALReader extends Thread {
protected final Entry filterEntry(Entry entry) { protected final Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry); Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) { if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
LOG.debug("Filtered entry for replication: {}", entry);
source.getSourceMetrics().incrLogEditsFiltered(); source.getSourceMetrics().incrLogEditsFiltered();
} }
return filtered; return filtered;

View File

@ -173,6 +173,7 @@ class WALEntryStream implements Closeable {
private void tryAdvanceEntry() throws IOException { private void tryAdvanceEntry() throws IOException {
if (checkReader()) { if (checkReader()) {
boolean beingWritten = readNextEntryAndRecordReaderPosition(); boolean beingWritten = readNextEntryAndRecordReaderPosition();
LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten);
if (currentEntry == null && !beingWritten) { if (currentEntry == null && !beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled // no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeueing, we should always get one more attempt at reading. // Before dequeueing, we should always get one more attempt at reading.
@ -272,6 +273,7 @@ class WALEntryStream implements Closeable {
return true; return true;
} }
if (readEntry != null) { if (readEntry != null) {
LOG.trace("reading entry: {} ", readEntry);
metrics.incrLogEditsRead(); metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
} }

View File

@ -86,22 +86,52 @@ public class TestGetReplicationLoad {
@Test @Test
public void testGetReplicationMetrics() throws Exception { public void testGetReplicationMetrics() throws Exception {
String peer1 = "test1", peer2 = "test2"; String peer1 = "test1", peer2 = "test2", queueId="1";
long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4; long ageOfLastShippedOp = 2,
int sizeOfLogQueue = 5; replicationLag = 3,
timeStampOfLastShippedOp = 4,
timeStampOfNextToReplicate=5,
editsRead=6,
oPsShipped=7;
int sizeOfLogQueue = 8;
boolean recovered=false,
running=false,
editsSinceRestart=false;
RegionServerStatusProtos.RegionServerReportRequest.Builder request = RegionServerStatusProtos.RegionServerReportRequest.Builder request =
RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
ServerName serverName = cluster.getMaster(0).getServerName(); ServerName serverName = cluster.getMaster(0).getServerName();
request.setServer(ProtobufUtil.toServerName(serverName)); request.setServer(ProtobufUtil.toServerName(serverName));
ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource
.newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) .newBuilder().setPeerID(peer1)
.setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) .setAgeOfLastShippedOp(ageOfLastShippedOp)
.setSizeOfLogQueue(sizeOfLogQueue).build(); .setReplicationLag(replicationLag)
.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
.setSizeOfLogQueue(sizeOfLogQueue)
.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate)
.setQueueId(queueId)
.setEditsRead(editsRead)
.setOPsShipped(oPsShipped)
.setRunning(running)
.setRecovered(recovered)
.setEditsSinceRestart(editsSinceRestart)
.build();
ClusterStatusProtos.ReplicationLoadSource rload2 = ClusterStatusProtos.ReplicationLoadSource rload2 =
ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) ClusterStatusProtos.ReplicationLoadSource
.setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) .newBuilder()
.setPeerID(peer2)
.setAgeOfLastShippedOp(ageOfLastShippedOp + 1)
.setReplicationLag(replicationLag + 1)
.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
.setSizeOfLogQueue(sizeOfLogQueue + 1).build(); .setSizeOfLogQueue(sizeOfLogQueue + 1)
.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate+1)
.setQueueId(queueId)
.setEditsRead(editsRead+1)
.setOPsShipped(oPsShipped+1)
.setRunning(running)
.setRecovered(recovered)
.setEditsSinceRestart(editsSinceRestart)
.build();
ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
.addReplLoadSource(rload1).addReplLoadSource(rload2).build(); .addReplLoadSource(rload1).addReplLoadSource(rload2).build();
request.setLoad(sl); request.setLoad(sl);

View File

@ -165,8 +165,7 @@ public class TestReplicationBase {
htable1.put(puts); htable1.put(puts);
} }
@BeforeClass protected static void configureClusters(){
public static void setUpBeforeClass() throws Exception {
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// We don't want too many edits per batch sent to the ReplicationEndpoint to trigger // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
// sufficient number of events. But we don't want to go too low because // sufficient number of events. But we don't want to go too low because
@ -188,6 +187,17 @@ public class TestReplicationBase {
conf1.setLong("hbase.serial.replication.waiting.ms", 100); conf1.setLong("hbase.serial.replication.waiting.ms", 100);
utility1 = new HBaseTestingUtility(conf1); utility1 = new HBaseTestingUtility(conf1);
// Base conf2 on conf1 so it gets the right zk cluster.
conf2 = HBaseConfiguration.create(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
utility2 = new HBaseTestingUtility(conf2);
}
protected static void startClusters() throws Exception{
utility1.startMiniZKCluster(); utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster();
// Have to reget conf1 in case zk cluster location different // Have to reget conf1 in case zk cluster location different
@ -197,13 +207,6 @@ public class TestReplicationBase {
admin = new ReplicationAdmin(conf1); admin = new ReplicationAdmin(conf1);
LOG.info("Setup first Zk"); LOG.info("Setup first Zk");
// Base conf2 on conf1 so it gets the right zk cluster.
conf2 = HBaseConfiguration.create(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK); utility2.setZkCluster(miniZK);
zkw2 = new ZKWatcher(conf2, "cluster2", null, true); zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
LOG.info("Setup second Zk"); LOG.info("Setup second Zk");
@ -238,6 +241,12 @@ public class TestReplicationBase {
htable2 = connection2.getTable(tableName); htable2 = connection2.getTable(tableName);
} }
@BeforeClass
public static void setUpBeforeClass() throws Exception {
configureClusters();
startClusters();
}
private boolean peerExist(String peerId) throws IOException { private boolean peerExist(String peerId) throws IOException {
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.EnumSet; import java.util.EnumSet;
@ -33,6 +34,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -60,7 +65,8 @@ public class TestReplicationStatus extends TestReplicationBase {
@Test @Test
public void testReplicationStatus() throws Exception { public void testReplicationStatus() throws Exception {
LOG.info("testReplicationStatus"); LOG.info("testReplicationStatus");
utility2.shutdownMiniHBaseCluster();
utility2.startMiniHBaseCluster(1,4);
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
// disable peer // disable peer
admin.disablePeer(PEER_ID); admin.disablePeer(PEER_ID);
@ -103,11 +109,204 @@ public class TestReplicationStatus extends TestReplicationBase {
ServerLoad sl = status.getLoad(server); ServerLoad sl = status.getLoad(server);
List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
// check SourceList still only has one entry // check SourceList still only has one entry
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2));
assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
} finally { } finally {
admin.enablePeer(PEER_ID); admin.enablePeer(PEER_ID);
utility1.getHBaseCluster().getRegionServer(1).start(); utility1.getHBaseCluster().getRegionServer(1).start();
} }
} }
@BeforeClass
public static void setUpBeforeClass() throws Exception {
//we need to perform initialisations from TestReplicationBase.setUpBeforeClass() on each
//test here, so we override BeforeClass to do nothing and call
// TestReplicationBase.setUpBeforeClass() from setup method
TestReplicationBase.configureClusters();
}
@Before
@Override
public void setUpBase() throws Exception {
TestReplicationBase.startClusters();
super.setUpBase();
}
@After
@Override
public void tearDownBase() throws Exception {
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
@AfterClass
public static void tearDownAfterClass(){
//We need to override it here to avoid issues when trying to execute super class teardown
}
@Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
utility2.shutdownMiniHBaseCluster();
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
Admin hbaseAdmin = utility1.getConnection().getAdmin();
ServerName serverName = utility1.getHBaseCluster().
getRegionServer(0).getServerName();
Thread.sleep(10000);
ClusterStatus status = new ClusterStatus(hbaseAdmin.
getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
get(serverName).getReplicationLoadSourceList();
assertEquals(1, loadSources.size());
ReplicationLoadSource loadSource = loadSources.get(0);
assertFalse(loadSource.hasEditsSinceRestart());
assertEquals(0, loadSource.getTimestampOfLastShippedOp());
assertEquals(0, loadSource.getReplicationLag());
assertFalse(loadSource.isRecovered());
}
@Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
utility2.shutdownMiniHBaseCluster();
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
Admin hbaseAdmin = utility1.getConnection().getAdmin();
//add some values to source cluster
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
Thread.sleep(10000);
ServerName serverName = utility1.getHBaseCluster().
getRegionServer(0).getServerName();
ClusterStatus status = new ClusterStatus(hbaseAdmin.
getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
get(serverName).getReplicationLoadSourceList();
assertEquals(1, loadSources.size());
ReplicationLoadSource loadSource = loadSources.get(0);
assertTrue(loadSource.hasEditsSinceRestart());
assertEquals(0, loadSource.getTimestampOfLastShippedOp());
assertTrue(loadSource.getReplicationLag()>0);
assertFalse(loadSource.isRecovered());
}
@Test
public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
utility2.shutdownMiniHBaseCluster();
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
//add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
Thread.sleep(10000);
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
Admin hbaseAdmin = utility1.getConnection().getAdmin();
ServerName serverName = utility1.getHBaseCluster().
getRegionServer(0).getServerName();
Thread.sleep(10000);
ClusterStatus status = new ClusterStatus(hbaseAdmin.
getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
get(serverName).getReplicationLoadSourceList();
assertEquals(2, loadSources.size());
boolean foundRecovery = false;
boolean foundNormal = false;
for(ReplicationLoadSource loadSource : loadSources){
if (loadSource.isRecovered()){
foundRecovery = true;
assertTrue(loadSource.hasEditsSinceRestart());
assertEquals(0, loadSource.getTimestampOfLastShippedOp());
assertTrue(loadSource.getReplicationLag()>0);
} else {
foundNormal = true;
assertFalse(loadSource.hasEditsSinceRestart());
assertEquals(0, loadSource.getTimestampOfLastShippedOp());
assertEquals(0, loadSource.getReplicationLag());
}
}
assertTrue("No normal queue found.", foundNormal);
assertTrue("No recovery queue found.", foundRecovery);
}
@Test
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
utility2.shutdownMiniHBaseCluster();
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
//add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
Thread.sleep(10000);
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
Admin hbaseAdmin = utility1.getConnection().getAdmin();
ServerName serverName = utility1.getHBaseCluster().
getRegionServer(0).getServerName();
Thread.sleep(10000);
//add more values to cluster 1, these should cause normal queue to lag
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
Thread.sleep(10000);
ClusterStatus status = new ClusterStatus(hbaseAdmin.
getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
get(serverName).getReplicationLoadSourceList();
assertEquals(2, loadSources.size());
boolean foundRecovery = false;
boolean foundNormal = false;
for(ReplicationLoadSource loadSource : loadSources){
if (loadSource.isRecovered()){
foundRecovery = true;
} else {
foundNormal = true;
}
assertTrue(loadSource.hasEditsSinceRestart());
assertEquals(0, loadSource.getTimestampOfLastShippedOp());
assertTrue(loadSource.getReplicationLag()>0);
}
assertTrue("No normal queue found.", foundNormal);
assertTrue("No recovery queue found.", foundRecovery);
}
@Test
public void testReplicationStatusAfterLagging() throws Exception {
utility2.shutdownMiniHBaseCluster();
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
//add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
utility2.startMiniHBaseCluster();
Thread.sleep(10000);
try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).
getServerName();
ClusterStatus status =
new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().get(serverName).
getReplicationLoadSourceList();
assertEquals(1, loadSources.size());
ReplicationLoadSource loadSource = loadSources.get(0);
assertTrue(loadSource.hasEditsSinceRestart());
assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
assertEquals(0, loadSource.getReplicationLag());
}finally{
utility2.shutdownMiniHBaseCluster();
}
}
} }

View File

@ -785,37 +785,30 @@ module Hbase
puts(format(' %s', server)) puts(format(' %s', server))
end end
elsif format == 'replication' elsif format == 'replication'
puts(format('version %s', status.getHBaseVersion)) puts(format('version %<version>s', version: status.getHBaseVersion))
puts(format('%d live servers', status.getServersSize)) puts(format('%<servers>d live servers', servers: status.getServersSize))
for server in status.getServers status.getServers.each do |server_status|
sl = status.getLoad(server) sl = status.getLoad(server_status)
rSinkString = ' SINK :' r_sink_string = ' SINK:'
rSourceString = ' SOURCE:' r_source_string = ' SOURCE:'
rLoadSink = sl.getReplicationLoadSink r_load_sink = sl.getReplicationLoadSink
next if rLoadSink.nil? next if r_load_sink.nil?
rSinkString << ' AgeOfLastAppliedOp=' + rLoadSink.getAgeOfLastAppliedOp.to_s
rSinkString << ', TimeStampsOfLastAppliedOp=' + r_sink_string << ' AgeOfLastAppliedOp=' +
java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString r_load_sink.getAgeOfLastAppliedOp.to_s
rLoadSourceList = sl.getReplicationLoadSourceList r_sink_string << ', TimeStampsOfLastAppliedOp=' +
index = 0 java.util.Date.new(r_load_sink
while index < rLoadSourceList.size .getTimeStampsOfLastAppliedOp).toString
rLoadSource = rLoadSourceList.get(index) r_load_source_map = sl.getReplicationLoadSourceMap
rSourceString << ' PeerID=' + rLoadSource.getPeerID build_source_string(r_load_source_map, r_source_string)
rSourceString << ', AgeOfLastShippedOp=' + rLoadSource.getAgeOfLastShippedOp.to_s puts(format(' %<host>s:', host: server_status.getHostname))
rSourceString << ', SizeOfLogQueue=' + rLoadSource.getSizeOfLogQueue.to_s if type.casecmp('SOURCE').zero?
rSourceString << ', TimeStampsOfLastShippedOp=' + puts(format('%<source>s', source: r_source_string))
java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp).toString elsif type.casecmp('SINK').zero?
rSourceString << ', Replication Lag=' + rLoadSource.getReplicationLag.to_s puts(format('%<sink>s', sink: r_sink_string))
index += 1
end
puts(format(' %s:', server.getHostname))
if type.casecmp('SOURCE') == 0
puts(format('%s', rSourceString))
elsif type.casecmp('SINK') == 0
puts(format('%s', rSinkString))
else else
puts(format('%s', rSourceString)) puts(format('%<source>s', source: r_source_string))
puts(format('%s', rSinkString)) puts(format('%<sink>s', sink: r_sink_string))
end end
end end
elsif format == 'simple' elsif format == 'simple'
@ -844,6 +837,71 @@ module Hbase
end end
end end
def build_source_string(r_load_source_map, r_source_string)
r_load_source_map.each do |peer, sources|
r_source_string << ' PeerID=' + peer
sources.each do |source_load|
build_queue_title(source_load, r_source_string)
build_running_source_stats(source_load, r_source_string)
end
end
end
def build_queue_title(source_load, r_source_string)
r_source_string << if source_load.isRecovered
"\n Recovered Queue: "
else
"\n Normal Queue: "
end
r_source_string << source_load.getQueueId
end
def build_running_source_stats(source_load, r_source_string)
if source_load.isRunning
build_shipped_stats(source_load, r_source_string)
build_load_general_stats(source_load, r_source_string)
r_source_string << ', Replication Lag=' +
source_load.getReplicationLag.to_s
else
r_source_string << "\n "
r_source_string << 'No Reader/Shipper threads runnning yet.'
end
end
def build_shipped_stats(source_load, r_source_string)
r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
"\n " \
'No Ops shipped since last restart'
else
"\n AgeOfLastShippedOp=" +
source_load.getAgeOfLastShippedOp.to_s +
', TimeStampOfLastShippedOp=' +
java.util.Date.new(source_load
.getTimeStampOfLastShippedOp).toString
end
end
def build_load_general_stats(source_load, r_source_string)
r_source_string << ', SizeOfLogQueue=' +
source_load.getSizeOfLogQueue.to_s
r_source_string << ', EditsReadFromLogQueue=' +
source_load.getEditsRead.to_s
r_source_string << ', OpsShippedToTarget=' +
source_load.getOPsShipped.to_s
build_edits_for_source(source_load, r_source_string)
end
def build_edits_for_source(source_load, r_source_string)
if source_load.hasEditsSinceRestart
r_source_string << ', TimeStampOfNextToReplicate=' +
java.util.Date.new(source_load
.getTimeStampOfNextToReplicate).toString
else
r_source_string << ', No edits for this source'
r_source_string << ' since it started'
end
end
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# #
# Helper methods # Helper methods