HBASE-17296 Provide per peer throttling for replication (Guanghao Zhang)
This commit is contained in:
parent
adb319f5c2
commit
2333596279
|
@ -287,6 +287,9 @@ public final class ReplicationSerDeHelper {
|
|||
}
|
||||
peerConfig.setNamespaces(namespaces);
|
||||
}
|
||||
if (peer.hasBandwidth()) {
|
||||
peerConfig.setBandwidth(peer.getBandwidth());
|
||||
}
|
||||
return peerConfig;
|
||||
}
|
||||
|
||||
|
@ -326,6 +329,7 @@ public final class ReplicationSerDeHelper {
|
|||
}
|
||||
}
|
||||
|
||||
builder.setBandwidth(peerConfig.getBandwidth());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,12 @@ public interface ReplicationPeer {
|
|||
*/
|
||||
public Set<String> getNamespaces();
|
||||
|
||||
/**
|
||||
* Get the per node bandwidth upper limit for this peer
|
||||
* @return the bandwidth upper limit
|
||||
*/
|
||||
public long getPeerBandwidth();
|
||||
|
||||
void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
|
||||
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ public class ReplicationPeerConfig {
|
|||
private final Map<String, String> configuration;
|
||||
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
|
||||
private Set<String> namespaces = null;
|
||||
private long bandwidth = 0;
|
||||
|
||||
public ReplicationPeerConfig() {
|
||||
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
|
@ -102,6 +103,15 @@ public class ReplicationPeerConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
public long getBandwidth() {
|
||||
return this.bandwidth;
|
||||
}
|
||||
|
||||
public ReplicationPeerConfig setBandwidth(long bandwidth) {
|
||||
this.bandwidth = bandwidth;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
|
||||
|
@ -110,8 +120,9 @@ public class ReplicationPeerConfig {
|
|||
builder.append("namespaces=").append(namespaces.toString()).append(",");
|
||||
}
|
||||
if (tableCFsMap != null) {
|
||||
builder.append("tableCFs=").append(tableCFsMap.toString());
|
||||
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
|
||||
}
|
||||
builder.append("bandwidth=").append(bandwidth);
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,6 +173,11 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
|
|||
return this.peerConfig.getNamespaces();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPeerBandwidth() {
|
||||
return this.peerConfig.getBandwidth();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
|
||||
if (this.peerConfigTracker != null){
|
||||
|
|
|
@ -367,6 +367,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
existingConfig.getPeerData().putAll(newConfig.getPeerData());
|
||||
existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
|
||||
existingConfig.setNamespaces(newConfig.getNamespaces());
|
||||
existingConfig.setBandwidth(newConfig.getBandwidth());
|
||||
|
||||
try {
|
||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||
|
|
|
@ -4994,6 +4994,15 @@ public final class ZooKeeperProtos {
|
|||
* <code>repeated bytes namespaces = 6;</code>
|
||||
*/
|
||||
org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString getNamespaces(int index);
|
||||
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
boolean hasBandwidth();
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
long getBandwidth();
|
||||
}
|
||||
/**
|
||||
* <pre>
|
||||
|
@ -5018,6 +5027,7 @@ public final class ZooKeeperProtos {
|
|||
configuration_ = java.util.Collections.emptyList();
|
||||
tableCfs_ = java.util.Collections.emptyList();
|
||||
namespaces_ = java.util.Collections.emptyList();
|
||||
bandwidth_ = 0L;
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
|
@ -5095,6 +5105,11 @@ public final class ZooKeeperProtos {
|
|||
namespaces_.add(input.readBytes());
|
||||
break;
|
||||
}
|
||||
case 56: {
|
||||
bitField0_ |= 0x00000004;
|
||||
bandwidth_ = input.readInt64();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -5358,6 +5373,21 @@ public final class ZooKeeperProtos {
|
|||
return namespaces_.get(index);
|
||||
}
|
||||
|
||||
public static final int BANDWIDTH_FIELD_NUMBER = 7;
|
||||
private long bandwidth_;
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public boolean hasBandwidth() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public long getBandwidth() {
|
||||
return bandwidth_;
|
||||
}
|
||||
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
byte isInitialized = memoizedIsInitialized;
|
||||
|
@ -5410,6 +5440,9 @@ public final class ZooKeeperProtos {
|
|||
for (int i = 0; i < namespaces_.size(); i++) {
|
||||
output.writeBytes(6, namespaces_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeInt64(7, bandwidth_);
|
||||
}
|
||||
unknownFields.writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -5445,6 +5478,10 @@ public final class ZooKeeperProtos {
|
|||
size += dataSize;
|
||||
size += 1 * getNamespacesList().size();
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
|
||||
.computeInt64Size(7, bandwidth_);
|
||||
}
|
||||
size += unknownFields.getSerializedSize();
|
||||
memoizedSize = size;
|
||||
return size;
|
||||
|
@ -5480,6 +5517,11 @@ public final class ZooKeeperProtos {
|
|||
.equals(other.getTableCfsList());
|
||||
result = result && getNamespacesList()
|
||||
.equals(other.getNamespacesList());
|
||||
result = result && (hasBandwidth() == other.hasBandwidth());
|
||||
if (hasBandwidth()) {
|
||||
result = result && (getBandwidth()
|
||||
== other.getBandwidth());
|
||||
}
|
||||
result = result && unknownFields.equals(other.unknownFields);
|
||||
return result;
|
||||
}
|
||||
|
@ -5515,6 +5557,11 @@ public final class ZooKeeperProtos {
|
|||
hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getNamespacesList().hashCode();
|
||||
}
|
||||
if (hasBandwidth()) {
|
||||
hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
|
||||
hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
|
||||
getBandwidth());
|
||||
}
|
||||
hash = (29 * hash) + unknownFields.hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -5665,6 +5712,8 @@ public final class ZooKeeperProtos {
|
|||
}
|
||||
namespaces_ = java.util.Collections.emptyList();
|
||||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
bandwidth_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -5729,6 +5778,10 @@ public final class ZooKeeperProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
}
|
||||
result.namespaces_ = namespaces_;
|
||||
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.bandwidth_ = bandwidth_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -5869,6 +5922,9 @@ public final class ZooKeeperProtos {
|
|||
}
|
||||
onChanged();
|
||||
}
|
||||
if (other.hasBandwidth()) {
|
||||
setBandwidth(other.getBandwidth());
|
||||
}
|
||||
this.mergeUnknownFields(other.unknownFields);
|
||||
onChanged();
|
||||
return this;
|
||||
|
@ -6888,6 +6944,38 @@ public final class ZooKeeperProtos {
|
|||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
private long bandwidth_ ;
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public boolean hasBandwidth() {
|
||||
return ((bitField0_ & 0x00000040) == 0x00000040);
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public long getBandwidth() {
|
||||
return bandwidth_;
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public Builder setBandwidth(long value) {
|
||||
bitField0_ |= 0x00000040;
|
||||
bandwidth_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public Builder clearBandwidth() {
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
bandwidth_ = 0L;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public final Builder setUnknownFields(
|
||||
final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
|
||||
return super.setUnknownFields(unknownFields);
|
||||
|
@ -9803,23 +9891,24 @@ public final class ZooKeeperProtos {
|
|||
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
|
||||
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
|
||||
"ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
|
||||
"bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
|
||||
"bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
|
||||
"nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
|
||||
"EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
|
||||
".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
|
||||
"\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
|
||||
"\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
|
||||
"\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
|
||||
"2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
|
||||
"ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
|
||||
"ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
|
||||
"TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
|
||||
".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
|
||||
"b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
|
||||
"ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
|
||||
"me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
|
||||
"BL\n1org.apache.hadoop.hbase.shaded.proto" +
|
||||
"buf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
|
||||
"\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
|
||||
"tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
|
||||
"onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
|
||||
"ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
|
||||
"position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
|
||||
"me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
|
||||
"ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
|
||||
"d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
|
||||
"\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
|
||||
"te\022\017\n\007enabled\030\001 \001(\010BL\n1org.apache.hadoop" +
|
||||
".hbase.shaded.protobuf.generatedB\017ZooKee" +
|
||||
"perProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
|
||||
|
@ -9876,7 +9965,7 @@ public final class ZooKeeperProtos {
|
|||
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
|
||||
org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
|
||||
internal_static_hbase_pb_ReplicationPeer_descriptor,
|
||||
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
|
||||
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
|
||||
internal_static_hbase_pb_ReplicationState_descriptor =
|
||||
getDescriptor().getMessageTypes().get(7);
|
||||
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
|
||||
|
|
|
@ -122,6 +122,7 @@ message ReplicationPeer {
|
|||
repeated NameStringPair configuration = 4;
|
||||
repeated TableCF table_cfs = 5;
|
||||
repeated bytes namespaces = 6;
|
||||
optional int64 bandwidth = 7;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -4796,6 +4796,16 @@ public final class ZooKeeperProtos {
|
|||
* <code>repeated bytes namespaces = 6;</code>
|
||||
*/
|
||||
com.google.protobuf.ByteString getNamespaces(int index);
|
||||
|
||||
// optional int64 bandwidth = 7;
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
boolean hasBandwidth();
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
long getBandwidth();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code hbase.pb.ReplicationPeer}
|
||||
|
@ -4895,6 +4905,11 @@ public final class ZooKeeperProtos {
|
|||
namespaces_.add(input.readBytes());
|
||||
break;
|
||||
}
|
||||
case 56: {
|
||||
bitField0_ |= 0x00000004;
|
||||
bandwidth_ = input.readInt64();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -5179,6 +5194,22 @@ public final class ZooKeeperProtos {
|
|||
return namespaces_.get(index);
|
||||
}
|
||||
|
||||
// optional int64 bandwidth = 7;
|
||||
public static final int BANDWIDTH_FIELD_NUMBER = 7;
|
||||
private long bandwidth_;
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public boolean hasBandwidth() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public long getBandwidth() {
|
||||
return bandwidth_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
clusterkey_ = "";
|
||||
replicationEndpointImpl_ = "";
|
||||
|
@ -5186,6 +5217,7 @@ public final class ZooKeeperProtos {
|
|||
configuration_ = java.util.Collections.emptyList();
|
||||
tableCfs_ = java.util.Collections.emptyList();
|
||||
namespaces_ = java.util.Collections.emptyList();
|
||||
bandwidth_ = 0L;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -5239,6 +5271,9 @@ public final class ZooKeeperProtos {
|
|||
for (int i = 0; i < namespaces_.size(); i++) {
|
||||
output.writeBytes(6, namespaces_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeInt64(7, bandwidth_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -5277,6 +5312,10 @@ public final class ZooKeeperProtos {
|
|||
size += dataSize;
|
||||
size += 1 * getNamespacesList().size();
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeInt64Size(7, bandwidth_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -5318,6 +5357,11 @@ public final class ZooKeeperProtos {
|
|||
.equals(other.getTableCfsList());
|
||||
result = result && getNamespacesList()
|
||||
.equals(other.getNamespacesList());
|
||||
result = result && (hasBandwidth() == other.hasBandwidth());
|
||||
if (hasBandwidth()) {
|
||||
result = result && (getBandwidth()
|
||||
== other.getBandwidth());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -5355,6 +5399,10 @@ public final class ZooKeeperProtos {
|
|||
hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getNamespacesList().hashCode();
|
||||
}
|
||||
if (hasBandwidth()) {
|
||||
hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getBandwidth());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -5496,6 +5544,8 @@ public final class ZooKeeperProtos {
|
|||
}
|
||||
namespaces_ = java.util.Collections.emptyList();
|
||||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
bandwidth_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -5564,6 +5614,10 @@ public final class ZooKeeperProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
}
|
||||
result.namespaces_ = namespaces_;
|
||||
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.bandwidth_ = bandwidth_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -5678,6 +5732,9 @@ public final class ZooKeeperProtos {
|
|||
}
|
||||
onChanged();
|
||||
}
|
||||
if (other.hasBandwidth()) {
|
||||
setBandwidth(other.getBandwidth());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -6697,6 +6754,39 @@ public final class ZooKeeperProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional int64 bandwidth = 7;
|
||||
private long bandwidth_ ;
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public boolean hasBandwidth() {
|
||||
return ((bitField0_ & 0x00000040) == 0x00000040);
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public long getBandwidth() {
|
||||
return bandwidth_;
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public Builder setBandwidth(long value) {
|
||||
bitField0_ |= 0x00000040;
|
||||
bandwidth_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional int64 bandwidth = 7;</code>
|
||||
*/
|
||||
public Builder clearBandwidth() {
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
bandwidth_ = 0L;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
|
||||
}
|
||||
|
||||
|
@ -9446,23 +9536,24 @@ public final class ZooKeeperProtos {
|
|||
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
|
||||
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
|
||||
"ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
|
||||
"bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
|
||||
"bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
|
||||
"nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
|
||||
"EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
|
||||
".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
|
||||
"\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
|
||||
"\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
|
||||
"\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
|
||||
"2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
|
||||
"ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
|
||||
"ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
|
||||
"TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
|
||||
".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
|
||||
"b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
|
||||
"ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
|
||||
"me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
|
||||
"BE\n*org.apache.hadoop.hbase.protobuf.gen" +
|
||||
"eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
|
||||
"\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
|
||||
"tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
|
||||
"onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
|
||||
"ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
|
||||
"position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
|
||||
"me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
|
||||
"ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
|
||||
"d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
|
||||
"\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
|
||||
"te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" +
|
||||
".hbase.protobuf.generatedB\017ZooKeeperProt" +
|
||||
"osH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -9510,7 +9601,7 @@ public final class ZooKeeperProtos {
|
|||
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_hbase_pb_ReplicationPeer_descriptor,
|
||||
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
|
||||
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
|
||||
internal_static_hbase_pb_ReplicationState_descriptor =
|
||||
getDescriptor().getMessageTypes().get(7);
|
||||
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
|
||||
|
|
|
@ -122,6 +122,7 @@ message ReplicationPeer {
|
|||
repeated NameStringPair configuration = 4;
|
||||
repeated TableCF table_cfs = 5;
|
||||
repeated bytes namespaces = 6;
|
||||
optional int64 bandwidth = 7;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
|||
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
|
@ -144,6 +145,8 @@ public class ReplicationSource extends Thread
|
|||
private WALEntryFilter walEntryFilter;
|
||||
// throttler
|
||||
private ReplicationThrottler throttler;
|
||||
private long defaultBandwidth;
|
||||
private long currentBandwidth;
|
||||
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
|
||||
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
|
||||
|
||||
|
@ -179,8 +182,6 @@ public class ReplicationSource extends Thread
|
|||
this.maxRetriesMultiplier =
|
||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
||||
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
|
||||
this.replicationQueues = replicationQueues;
|
||||
this.replicationPeers = replicationPeers;
|
||||
this.manager = manager;
|
||||
|
@ -196,6 +197,15 @@ public class ReplicationSource extends Thread
|
|||
this.actualPeerId = replicationQueueInfo.getPeerId();
|
||||
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
||||
this.replicationEndpoint = replicationEndpoint;
|
||||
|
||||
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
||||
currentBandwidth = getCurrentBandwidth();
|
||||
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
||||
|
||||
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
|
||||
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
|
||||
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
|
||||
+ this.currentBandwidth);
|
||||
}
|
||||
|
||||
private void decorateConf() {
|
||||
|
@ -494,6 +504,13 @@ public class ReplicationSource extends Thread
|
|||
return this.metrics;
|
||||
}
|
||||
|
||||
private long getCurrentBandwidth() {
|
||||
ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
|
||||
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
|
||||
// user can set peer bandwidth to 0 to use default bandwidth
|
||||
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
|
||||
}
|
||||
|
||||
public class ReplicationSourceWorkerThread extends Thread {
|
||||
ReplicationSource source;
|
||||
String walGroupId;
|
||||
|
@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread
|
|||
return distinctRowKeys + totalHFileEntries;
|
||||
}
|
||||
|
||||
private void checkBandwidthChangeAndResetThrottler() {
|
||||
long peerBandwidth = getCurrentBandwidth();
|
||||
if (peerBandwidth != currentBandwidth) {
|
||||
currentBandwidth = peerBandwidth;
|
||||
throttler.setBandwidth((double) currentBandwidth / 10.0);
|
||||
LOG.info("ReplicationSource : " + peerId
|
||||
+ " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Do the shipping logic
|
||||
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
|
||||
|
@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread
|
|||
}
|
||||
while (isWorkerActive()) {
|
||||
try {
|
||||
checkBandwidthChangeAndResetThrottler();
|
||||
if (throttler.isEnabled()) {
|
||||
long sleepTicks = throttler.getNextSleepInterval(currentSize);
|
||||
if (sleepTicks > 0) {
|
||||
|
|
|
@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationThrottler {
|
||||
private final boolean enabled;
|
||||
private final double bandwidth;
|
||||
private boolean enabled;
|
||||
private double bandwidth;
|
||||
private long cyclePushSize;
|
||||
private long cycleStartTick;
|
||||
|
||||
|
@ -118,4 +118,9 @@ public class ReplicationThrottler {
|
|||
this.cycleStartTick = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
}
|
||||
|
||||
public void setBandwidth(double bandwidth) {
|
||||
this.bandwidth = bandwidth;
|
||||
this.enabled = this.bandwidth > 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -469,4 +469,21 @@ public class TestReplicationAdmin {
|
|||
|
||||
admin.removePeer(ID_ONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPeerBandwidth() throws ReplicationException {
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(KEY_ONE);
|
||||
admin.addPeer(ID_ONE, rpc);
|
||||
admin.peerAdded(ID_ONE);
|
||||
|
||||
rpc = admin.getPeerConfig(ID_ONE);
|
||||
assertEquals(0, rpc.getBandwidth());
|
||||
|
||||
rpc.setBandwidth(2097152);
|
||||
admin.updatePeerConfig(ID_ONE, rpc);
|
||||
|
||||
assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
|
||||
admin.removePeer(ID_ONE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -252,6 +252,15 @@ module Hbase
|
|||
end
|
||||
end
|
||||
|
||||
# Set new bandwidth config for the specified peer
|
||||
def set_peer_bandwidth(id, bandwidth)
|
||||
rpc = get_peer_config(id)
|
||||
unless rpc.nil?
|
||||
rpc.setBandwidth(bandwidth)
|
||||
@replication_admin.updatePeerConfig(id, rpc)
|
||||
end
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Enables a table's replication switch
|
||||
def enable_tablerep(table_name)
|
||||
|
|
|
@ -375,6 +375,7 @@ Shell.load_command_group(
|
|||
remove_peer_namespaces
|
||||
show_peer_tableCFs
|
||||
set_peer_tableCFs
|
||||
set_peer_bandwidth
|
||||
list_replicated_tables
|
||||
append_peer_tableCFs
|
||||
remove_peer_tableCFs
|
||||
|
|
|
@ -33,14 +33,15 @@ EOF
|
|||
peers = replication_admin.list_peers
|
||||
|
||||
formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
|
||||
"STATE", "NAMESPACES", "TABLE_CFS"])
|
||||
"STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"])
|
||||
|
||||
peers.entrySet().each do |e|
|
||||
state = replication_admin.get_peer_state(e.key)
|
||||
namespaces = replication_admin.show_peer_namespaces(e.value)
|
||||
tableCFs = replication_admin.show_peer_tableCFs(e.key)
|
||||
formatter.row([ e.key, e.value.getClusterKey,
|
||||
e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ])
|
||||
e.value.getReplicationEndpointImpl, state, namespaces, tableCFs,
|
||||
e.value.getBandwidth ])
|
||||
end
|
||||
|
||||
formatter.footer()
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
#
|
||||
# Copyright The Apache Software Foundation
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class SetPeerBandwidth< Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Set the replication source per node bandwidth for the specified peer.
|
||||
Examples:
|
||||
|
||||
# set bandwidth=2MB per regionserver for a peer
|
||||
hbase> set_peer_bandwidth '1', 2097152
|
||||
# unset bandwidth for a peer to use the default bandwidth configured in server-side
|
||||
hbase> set_peer_bandwidth '1'
|
||||
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(id, bandwidth = 0)
|
||||
replication_admin.set_peer_bandwidth(id, bandwidth)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -371,6 +371,24 @@ module Hbase
|
|||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
|
||||
cluster_key = "localhost:2181:/hbase-test"
|
||||
args = { CLUSTER_KEY => cluster_key }
|
||||
command(:add_peer, @peer_id, args)
|
||||
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
|
||||
# but here we have to do it ourselves
|
||||
replication_admin.peer_added(@peer_id)
|
||||
|
||||
peer_config = command(:get_peer_config, @peer_id)
|
||||
assert_equal(0, peer_config.get_bandwidth)
|
||||
command(:set_peer_bandwidth, @peer_id, 2097152)
|
||||
peer_config = command(:get_peer_config, @peer_id)
|
||||
assert_equal(2097152, peer_config.get_bandwidth)
|
||||
|
||||
#cleanup
|
||||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test "get_peer_config: works with simple clusterKey peer" do
|
||||
cluster_key = "localhost:2181:/hbase-test"
|
||||
args = { CLUSTER_KEY => cluster_key }
|
||||
|
|
Loading…
Reference in New Issue