HBASE-9531 a command line (hbase shell) interface to retreive the replication metrics and show replication lag

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Ashish Singhi 2015-02-12 14:50:01 -08:00 committed by Andrew Purtell
parent e0160d6937
commit 16ed345191
24 changed files with 2870 additions and 32 deletions

View File

@ -28,8 +28,11 @@ import java.util.TreeSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Strings;
@ -52,7 +55,7 @@ public class ServerLoad {
private int totalStaticBloomSizeKB = 0; private int totalStaticBloomSizeKB = 0;
private long totalCompactingKVs = 0; private long totalCompactingKVs = 0;
private long currentCompactedKVs = 0; private long currentCompactedKVs = 0;
public ServerLoad(ClusterStatusProtos.ServerLoad serverLoad) { public ServerLoad(ClusterStatusProtos.ServerLoad serverLoad) {
this.serverLoad = serverLoad; this.serverLoad = serverLoad;
for (ClusterStatusProtos.RegionLoad rl: serverLoad.getRegionLoadsList()) { for (ClusterStatusProtos.RegionLoad rl: serverLoad.getRegionLoadsList()) {
@ -70,7 +73,7 @@ public class ServerLoad {
totalCompactingKVs += rl.getTotalCompactingKVs(); totalCompactingKVs += rl.getTotalCompactingKVs();
currentCompactedKVs += rl.getCurrentCompactedKVs(); currentCompactedKVs += rl.getCurrentCompactedKVs();
} }
} }
// NOTE: Function name cannot start with "get" because then an OpenDataException is thrown because // NOTE: Function name cannot start with "get" because then an OpenDataException is thrown because
@ -177,6 +180,26 @@ public class ServerLoad {
return serverLoad.getInfoServerPort(); return serverLoad.getInfoServerPort();
} }
/**
* Call directly from client such as hbase shell
* @return the list of ReplicationLoadSource
*/
public List<ReplicationLoadSource> getReplicationLoadSourceList() {
return ProtobufUtil.toReplicationLoadSourceList(serverLoad.getReplLoadSourceList());
}
/**
* Call directly from client such as hbase shell
* @return ReplicationLoadSink
*/
public ReplicationLoadSink getReplicationLoadSink() {
if (serverLoad.hasReplLoadSink()) {
return ProtobufUtil.toReplicationLoadSink(serverLoad.getReplLoadSink());
} else {
return null;
}
}
/** /**
* Originally, this method factored in the effect of requests going to the * Originally, this method factored in the effect of requests going to the
* server as well. However, this does not interact very well with the current * server as well. However, this does not interact very well with the current

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Col
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.DeleteType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.DeleteType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
@ -130,6 +131,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.quotas.QuotaScope; import org.apache.hadoop.hbase.quotas.QuotaScope;
import org.apache.hadoop.hbase.quotas.QuotaType; import org.apache.hadoop.hbase.quotas.QuotaType;
import org.apache.hadoop.hbase.quotas.ThrottleType; import org.apache.hadoop.hbase.quotas.ThrottleType;
import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission; import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission; import org.apache.hadoop.hbase.security.access.UserPermission;
@ -2994,4 +2997,25 @@ public final class ProtobufUtil {
return desc.build(); return desc.build();
} }
public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink cls) {
return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp());
}
public static ReplicationLoadSource toReplicationLoadSource(
ClusterStatusProtos.ReplicationLoadSource cls) {
return new ReplicationLoadSource(cls.getPeerID(), cls.getAgeOfLastShippedOp(),
cls.getSizeOfLogQueue(), cls.getTimeStampOfLastShippedOp(), cls.getReplicationLag());
}
public static List<ReplicationLoadSource> toReplicationLoadSourceList(
List<ClusterStatusProtos.ReplicationLoadSource> clsList) {
ArrayList<ReplicationLoadSource> rlsList = new ArrayList<ReplicationLoadSource>();
for (ClusterStatusProtos.ReplicationLoadSource cls : clsList) {
rlsList.add(toReplicationLoadSource(cls));
}
return rlsList;
}
} }

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A HBase ReplicationLoad to present MetricsSink information
*/
@InterfaceAudience.Private
public class ReplicationLoadSink {
private long ageOfLastAppliedOp;
private long timeStampsOfLastAppliedOp;
public ReplicationLoadSink(long age, long timeStamp) {
this.ageOfLastAppliedOp = age;
this.timeStampsOfLastAppliedOp = timeStamp;
}
public long getAgeOfLastAppliedOp() {
return this.ageOfLastAppliedOp;
}
public long getTimeStampsOfLastAppliedOp() {
return this.timeStampsOfLastAppliedOp;
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A HBase ReplicationLoad to present MetricsSource information
*/
@InterfaceAudience.Private
public class ReplicationLoadSource {
private String peerID;
private long ageOfLastShippedOp;
private int sizeOfLogQueue;
private long timeStampOfLastShippedOp;
private long replicationLag;
public ReplicationLoadSource(String id, long age, int size, long timeStamp, long lag) {
this.peerID = id;
this.ageOfLastShippedOp = age;
this.sizeOfLogQueue = size;
this.timeStampOfLastShippedOp = timeStamp;
this.replicationLag = lag;
}
public String getPeerID() {
return this.peerID;
}
public long getAgeOfLastShippedOp() {
return this.ageOfLastShippedOp;
}
public long getSizeOfLogQueue() {
return this.sizeOfLogQueue;
}
public long getTimeStampOfLastShippedOp() {
return this.timeStampOfLastShippedOp;
}
public long getReplicationLag() {
return this.replicationLag;
}
}

View File

@ -26,4 +26,5 @@ public interface MetricsReplicationSinkSource {
void setLastAppliedOpAge(long age); void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches); void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize); void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
} }

View File

@ -43,4 +43,5 @@ public interface MetricsReplicationSourceSource {
void incrLogReadInBytes(long size); void incrLogReadInBytes(long size);
void incrLogReadInEdits(long size); void incrLogReadInEdits(long size);
void clear(); void clear();
long getLastShippedAge();
} }

View File

@ -95,4 +95,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
@Override public void clear() { @Override public void clear() {
} }
@Override
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
} }

View File

@ -44,4 +44,9 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
@Override public void incrAppliedOps(long batchsize) { @Override public void incrAppliedOps(long batchsize) {
opsCounter.incr(batchsize); opsCounter.incr(batchsize);
} }
@Override
public long getLastAppliedOpAge() {
return ageGauge.value();
}
} }

View File

@ -125,4 +125,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(logEditsFilteredKey); rms.removeMetric(logEditsFilteredKey);
} }
@Override
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
} }

View File

@ -119,6 +119,19 @@ message RegionLoad {
/* Server-level protobufs */ /* Server-level protobufs */
message ReplicationLoadSink {
required uint64 ageOfLastAppliedOp = 1;
required uint64 timeStampsOfLastAppliedOp = 2;
}
message ReplicationLoadSource {
required string peerID = 1;
required uint64 ageOfLastShippedOp = 2;
required uint32 sizeOfLogQueue = 3;
required uint64 timeStampOfLastShippedOp = 4;
required uint64 replicationLag = 5;
}
message ServerLoad { message ServerLoad {
/** Number of requests since last report. */ /** Number of requests since last report. */
optional uint32 number_of_requests = 1; optional uint32 number_of_requests = 1;
@ -160,6 +173,16 @@ message ServerLoad {
* The port number that this region server is hosing an info server on. * The port number that this region server is hosing an info server on.
*/ */
optional uint32 info_server_port = 9; optional uint32 info_server_port = 9;
/**
* The replicationLoadSource for the replication Source status of this region server.
*/
repeated ReplicationLoadSource replLoadSource = 10;
/**
* The replicationLoadSink for the replication Sink status of this region server.
*/
optional ReplicationLoadSink replLoadSink = 11;
} }
message LiveServerInfo { message LiveServerInfo {

View File

@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
@ -1147,6 +1148,22 @@ public class HRegionServer extends HasThread implements
} else { } else {
serverLoad.setInfoServerPort(-1); serverLoad.setInfoServerPort(-1);
} }
// for the replicationLoad purpose. Only need to get from one service
// either source or sink will get the same info
ReplicationSourceService rsources = getReplicationSourceService();
if (rsources != null) {
// always refresh first to get the latest value
ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
serverLoad.addReplLoadSource(rLS);
}
}
}
return serverLoad.build(); return serverLoad.build();
} }

View File

@ -22,11 +22,12 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
/** /**
* Gateway to Cluster Replication. * Gateway to Cluster Replication.
* Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
* One such application is a cross-datacenter * One such application is a cross-datacenter
* replication service that can keep two hbase clusters in sync. * replication service that can keep two hbase clusters in sync.
@ -52,4 +53,9 @@ public interface ReplicationService {
* Stops replication service. * Stops replication service.
*/ */
void stopReplicationService(); void stopReplicationService();
/**
* Refresh and Get ReplicationLoad
*/
public ReplicationLoad refreshAndGetReplicationLoad();
} }

View File

@ -71,4 +71,21 @@ public class MetricsSink {
mss.incrAppliedOps(batchSize); mss.incrAppliedOps(batchSize);
} }
/**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp
*/
public long getAgeOfLastAppliedOp() {
return mss.getLastAppliedOpAge();
}
/**
* Get the TimeStampOfLastAppliedOp. If no replication Op applied yet, the value is the timestamp
* at which hbase instance starts
* @return timeStampsOfLastAppliedOp;
*/
public long getTimeStampOfLastAppliedOp() {
return this.lastTimestampForAge;
}
} }

View File

@ -36,6 +36,7 @@ public class MetricsSource {
private long lastTimestamp = 0; private long lastTimestamp = 0;
private int lastQueueSize = 0; private int lastQueueSize = 0;
private String id;
private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource; private final MetricsReplicationSourceSource globalSourceSource;
@ -46,6 +47,7 @@ public class MetricsSource {
* @param id Name of the source this class is monitoring * @param id Name of the source this class is monitoring
*/ */
public MetricsSource(String id) { public MetricsSource(String id) {
this.id = id;
singleSourceSource = singleSourceSource =
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
.getSource(id); .getSource(id);
@ -143,4 +145,36 @@ public class MetricsSource {
globalSourceSource.decrSizeOfLogQueue(lastQueueSize); globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
lastQueueSize = 0; lastQueueSize = 0;
} }
/**
* Get AgeOfLastShippedOp
* @return AgeOfLastShippedOp
*/
public Long getAgeOfLastShippedOp() {
return singleSourceSource.getLastShippedAge();
}
/**
* Get the sizeOfLogQueue
* @return sizeOfLogQueue
*/
public int getSizeOfLogQueue() {
return this.lastQueueSize;
}
/**
* Get the timeStampsOfLastShippedOp
* @return lastTimestampForAge
*/
public long getTimeStampOfLastShippedOp() {
return lastTimestamp;
}
/**
* Get the slave peer ID
* @return peerID
*/
public String getPeerID() {
return id;
}
} }

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -65,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class Replication extends WALActionsListener.Base implements public class Replication extends WALActionsListener.Base implements
ReplicationSourceService, ReplicationSinkService { ReplicationSourceService, ReplicationSinkService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(Replication.class); LogFactory.getLog(Replication.class);
@ -81,6 +82,8 @@ public class Replication extends WALActionsListener.Base implements
/** Statistics thread schedule pool */ /** Statistics thread schedule pool */
private ScheduledExecutorService scheduleThreadPool; private ScheduledExecutorService scheduleThreadPool;
private int statsThreadPeriod; private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
/** /**
* Instantiate the replication management (if rep is enabled). * Instantiate the replication management (if rep is enabled).
@ -137,11 +140,13 @@ public class Replication extends WALActionsListener.Base implements
this.statsThreadPeriod = this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
} else { } else {
this.replicationManager = null; this.replicationManager = null;
this.replicationQueues = null; this.replicationQueues = null;
this.replicationPeers = null; this.replicationPeers = null;
this.replicationTracker = null; this.replicationTracker = null;
this.replicationLoad = null;
} }
} }
@ -309,4 +314,29 @@ public class Replication extends WALActionsListener.Base implements
} }
} }
} }
@Override
public ReplicationLoad refreshAndGetReplicationLoad() {
if (this.replicationLoad == null) {
return null;
}
// always build for latest data
buildReplicationLoad();
return this.replicationLoad;
}
private void buildReplicationLoad() {
// get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
for (ReplicationSourceInterface source : sources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
}
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
}
} }

View File

@ -0,0 +1,151 @@
/**
* Copyright 2014 The Apache Software Foundation Licensed to the Apache Software Foundation (ASF)
* under one or more contributor license agreements. See the NOTICE file distributed with this work
* for additional information regarding copyright ownership. The ASF licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in
* writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Strings;
/**
* This class is used for exporting some of the info from replication metrics
*/
@InterfaceAudience.Private
public class ReplicationLoad {
// Empty load instance.
public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
private List<MetricsSource> sourceMetricsList;
private MetricsSink sinkMetrics;
private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceList;
private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
/** default constructor */
public ReplicationLoad() {
super();
}
/**
* buildReplicationLoad
* @param srMetricsList
* @param skMetrics
*/
public void buildReplicationLoad(final List<MetricsSource> srMetricsList,
final MetricsSink skMetrics) {
this.sourceMetricsList = srMetricsList;
this.sinkMetrics = skMetrics;
// build the SinkLoad
ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimeStampOfLastAppliedOp());
this.replicationLoadSink = rLoadSinkBuild.build();
// build the SourceLoad List
this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>();
for (MetricsSource sm : this.sourceMetricsList) {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
long replicationLag;
long timePassedAfterLastShippedOp =
EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
if (sizeOfLogQueue != 0) {
// err on the large side
replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
} else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
replicationLag = ageOfLastShippedOp; // last shipped happen recently
} else {
// last shipped may happen last night,
// so NO real lag although ageOfLastShippedOp is non-zero
replicationLag = 0;
}
ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
ClusterStatusProtos.ReplicationLoadSource.newBuilder();
rLoadSourceBuild.setPeerID(sm.getPeerID());
rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
rLoadSourceBuild.setReplicationLag(replicationLag);
this.replicationLoadSourceList.add(rLoadSourceBuild.build());
}
}
/**
* sourceToString
* @return a string contains sourceReplicationLoad information
*/
public String sourceToString() {
if (this.sourceMetricsList == null) return null;
StringBuilder sb = new StringBuilder();
for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) {
sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID());
sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
sb =
Strings.appendKeyValue(sb, "TimeStampsOfLastShippedOp",
(new Date(rls.getTimeStampOfLastShippedOp()).toString()));
sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
}
return sb.toString();
}
/**
* sinkToString
* @return a string contains sinkReplicationLoad information
*/
public String sinkToString() {
if (this.replicationLoadSink == null) return null;
StringBuilder sb = new StringBuilder();
sb =
Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
this.replicationLoadSink.getAgeOfLastAppliedOp());
sb =
Strings.appendKeyValue(sb, "TimeStampsOfLastAppliedOp",
(new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
return sb.toString();
}
public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
return this.replicationLoadSink;
}
public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceList() {
return this.replicationLoadSourceList;
}
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
}
}

View File

@ -254,4 +254,12 @@ public class ReplicationSink {
"age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
", total replicated edits: " + this.totalReplicatedEdits; ", total replicated edits: " + this.totalReplicatedEdits;
} }
/**
* Get replication Sink Metrics
* @return MetricsSink
*/
public MetricsSink getSinkMetrics() {
return this.metrics;
}
} }

View File

@ -869,4 +869,12 @@ public class ReplicationSource extends Thread
", currently replicating from: " + this.currentPath + ", currently replicating from: " + this.currentPath +
" at position: " + position; " at position: " + position;
} }
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
public MetricsSource getSourceMetrics() {
return this.metrics;
}
} }

View File

@ -31,11 +31,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -556,4 +560,45 @@ public class TestReplicationSmallTests extends TestReplicationBase {
hadmin.close(); hadmin.close();
} }
/**
* Test for HBASE-9531
* put a few rows into htable1, which should be replicated to htable2
* create a ClusterStatus instance 'status' from HBaseAdmin
* test : status.getLoad(server).getReplicationLoadSourceList()
* test : status.getLoad(server).getReplicationLoadSink()
* * @throws Exception
*/
@Test(timeout = 300000)
public void testReplicationStatus() throws Exception {
LOG.info("testReplicationStatus");
try (Admin admin = utility1.getConnection().getAdmin()) {
final byte[] qualName = Bytes.toBytes("q");
Put p;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("row" + i));
p.add(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
ClusterStatus status = admin.getClusterStatus();
for (ServerName server : status.getServers()) {
ServerLoad sl = status.getLoad(server);
List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
// check SourceList has at least one entry
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
// check Sink exist only as it is difficult to verify the value on the fly
assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
(rLoadSink.getAgeOfLastAppliedOp() >= 0));
assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
(rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
}
}
}
} }

View File

@ -608,7 +608,7 @@ module Hbase
end end
end end
def status(format) def status(format, type)
status = @admin.getClusterStatus() status = @admin.getClusterStatus()
if format == "detailed" if format == "detailed"
puts("version %s" % [ status.getHBaseVersion() ]) puts("version %s" % [ status.getHBaseVersion() ])
@ -635,6 +635,46 @@ module Hbase
for server in status.getDeadServerNames() for server in status.getDeadServerNames()
puts(" %s" % [ server ]) puts(" %s" % [ server ])
end end
elsif format == "replication"
#check whether replication is enabled or not
if (!@admin.getConfiguration().getBoolean(org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_KEY,
org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_DEFAULT))
puts("Please enable replication first.")
else
puts("version %s" % [ status.getHBaseVersion() ])
puts("%d live servers" % [ status.getServersSize() ])
for server in status.getServers()
sl = status.getLoad(server)
rSinkString = " SINK :"
rSourceString = " SOURCE:"
rLoadSink = sl.getReplicationLoadSink()
rSinkString << " AgeOfLastAppliedOp=" + rLoadSink.getAgeOfLastAppliedOp().to_s
rSinkString << ", TimeStampsOfLastAppliedOp=" +
(java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString()
rLoadSourceList = sl.getReplicationLoadSourceList()
index = 0
while index < rLoadSourceList.size()
rLoadSource = rLoadSourceList.get(index)
rSourceString << " PeerID=" + rLoadSource.getPeerID()
rSourceString << ", AgeOfLastShippedOp=" + rLoadSource.getAgeOfLastShippedOp().to_s
rSourceString << ", SizeOfLogQueue=" + rLoadSource.getSizeOfLogQueue().to_s
rSourceString << ", TimeStampsOfLastShippedOp=" +
(java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp())).toString()
rSourceString << ", Replication Lag=" + rLoadSource.getReplicationLag().to_s
index = index + 1
end
puts(" %s:" %
[ server.getHostname() ])
if type.casecmp("SOURCE") == 0
puts("%s" % rSourceString)
elsif type.casecmp("SINK") == 0
puts("%s" % rSinkString)
else
puts("%s" % rSourceString)
puts("%s" % rSinkString)
end
end
end
elsif format == "simple" elsif format == "simple"
load = 0 load = 0
regions = 0 regions = 0

View File

@ -22,18 +22,21 @@ module Shell
class Status < Command class Status < Command
def help def help
return <<-EOF return <<-EOF
Show cluster status. Can be 'summary', 'simple', or 'detailed'. The Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. The
default is 'summary'. Examples: default is 'summary'. Examples:
hbase> status hbase> status
hbase> status 'simple' hbase> status 'simple'
hbase> status 'summary' hbase> status 'summary'
hbase> status 'detailed' hbase> status 'detailed'
hbase> status 'replication'
hbase> status 'replication', 'source'
hbase> status 'replication', 'sink'
EOF EOF
end end
def command(format = 'summary') def command(format = 'summary',type = 'both')
admin.status(format) admin.status(format, type)
end end
end end
end end

View File

@ -356,5 +356,17 @@ module Hbase
assert_not_equal(nil, table) assert_not_equal(nil, table)
table.close table.close
end end
define_test "Get replication status" do
replication_status("replication", "both")
end
define_test "Get replication source metrics information" do
replication_status("replication", "source")
end
define_test "Get replication sink metrics information" do
replication_status("replication", "sink")
end
end end
end end

View File

@ -94,6 +94,10 @@ module Hbase
puts "IGNORING DROP TABLE ERROR: #{e}" puts "IGNORING DROP TABLE ERROR: #{e}"
end end
end end
def replication_status(format,type)
return admin.status(format,type)
end
end end
end end