HBASE-17296 Provide per peer throttling for replication
Signed-off-by: Phil Yang <yangzhe1991@apache.org>
This commit is contained in:
parent
e029c554bb
commit
4b3df0f926
|
@ -70,6 +70,12 @@ public interface ReplicationPeer {
|
||||||
*/
|
*/
|
||||||
public Map<TableName, List<String>> getTableCFs();
|
public Map<TableName, List<String>> getTableCFs();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the per node bandwidth upper limit for this peer
|
||||||
|
* @return the bandwidth up limit
|
||||||
|
*/
|
||||||
|
public long getPeerBandwidth();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setup a callback for chanages to the replication peer config
|
* Setup a callback for chanages to the replication peer config
|
||||||
* @param listener Listener for config changes, usually a replication endpoint
|
* @param listener Listener for config changes, usually a replication endpoint
|
||||||
|
|
|
@ -41,6 +41,7 @@ public class ReplicationPeerConfig {
|
||||||
private final Map<byte[], byte[]> peerData;
|
private final Map<byte[], byte[]> peerData;
|
||||||
private final Map<String, String> configuration;
|
private final Map<String, String> configuration;
|
||||||
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
|
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
|
||||||
|
private long bandwidth = 0;
|
||||||
|
|
||||||
public ReplicationPeerConfig() {
|
public ReplicationPeerConfig() {
|
||||||
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -89,13 +90,23 @@ public class ReplicationPeerConfig {
|
||||||
this.tableCFsMap = tableCFsMap;
|
this.tableCFsMap = tableCFsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getBandwidth() {
|
||||||
|
return this.bandwidth;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationPeerConfig setBandwidth(long bandwidth) {
|
||||||
|
this.bandwidth = bandwidth;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
|
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
|
||||||
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
|
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
|
||||||
if (tableCFsMap != null) {
|
if (tableCFsMap != null) {
|
||||||
builder.append(tableCFsMap.toString());
|
builder.append(tableCFsMap.toString()).append(",");
|
||||||
}
|
}
|
||||||
|
builder.append("bandwidth=").append(bandwidth);
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,6 +163,12 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep
|
||||||
return this.tableCFs;
|
return this.tableCFs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPeerBandwidth() {
|
||||||
|
return this.peerConfig.getBandwidth();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
|
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
|
||||||
if (this.peerConfigTracker != null){
|
if (this.peerConfigTracker != null){
|
||||||
|
|
|
@ -375,6 +375,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
// or data that weren't explicitly changed
|
// or data that weren't explicitly changed
|
||||||
existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
|
existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
|
||||||
existingConfig.getPeerData().putAll(newConfig.getPeerData());
|
existingConfig.getPeerData().putAll(newConfig.getPeerData());
|
||||||
|
existingConfig.setBandwidth(newConfig.getBandwidth());
|
||||||
try {
|
try {
|
||||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||||
ReplicationSerDeHelper.toByteArray(existingConfig));
|
ReplicationSerDeHelper.toByteArray(existingConfig));
|
||||||
|
|
|
@ -264,6 +264,10 @@ public final class ReplicationSerDeHelper {
|
||||||
if (tableCFsMap != null) {
|
if (tableCFsMap != null) {
|
||||||
peerConfig.setTableCFsMap(tableCFsMap);
|
peerConfig.setTableCFsMap(tableCFsMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (peer.hasBandwidth()) {
|
||||||
|
peerConfig.setBandwidth(peer.getBandwidth());
|
||||||
|
}
|
||||||
return peerConfig;
|
return peerConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,6 +312,7 @@ public final class ReplicationSerDeHelper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
builder.setBandwidth(peerConfig.getBandwidth());
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5867,6 +5867,16 @@ public final class ZooKeeperProtos {
|
||||||
*/
|
*/
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
|
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
|
||||||
int index);
|
int index);
|
||||||
|
|
||||||
|
// optional int64 bandwidth = 6;
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
boolean hasBandwidth();
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
long getBandwidth();
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Protobuf type {@code hbase.pb.ReplicationPeer}
|
* Protobuf type {@code hbase.pb.ReplicationPeer}
|
||||||
|
@ -5958,6 +5968,11 @@ public final class ZooKeeperProtos {
|
||||||
tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry));
|
tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 48: {
|
||||||
|
bitField0_ |= 0x00000004;
|
||||||
|
bandwidth_ = input.readInt64();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||||
|
@ -6216,12 +6231,29 @@ public final class ZooKeeperProtos {
|
||||||
return tableCfs_.get(index);
|
return tableCfs_.get(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional int64 bandwidth = 6;
|
||||||
|
public static final int BANDWIDTH_FIELD_NUMBER = 6;
|
||||||
|
private long bandwidth_;
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
public boolean hasBandwidth() {
|
||||||
|
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
public long getBandwidth() {
|
||||||
|
return bandwidth_;
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
clusterkey_ = "";
|
clusterkey_ = "";
|
||||||
replicationEndpointImpl_ = "";
|
replicationEndpointImpl_ = "";
|
||||||
data_ = java.util.Collections.emptyList();
|
data_ = java.util.Collections.emptyList();
|
||||||
configuration_ = java.util.Collections.emptyList();
|
configuration_ = java.util.Collections.emptyList();
|
||||||
tableCfs_ = java.util.Collections.emptyList();
|
tableCfs_ = java.util.Collections.emptyList();
|
||||||
|
bandwidth_ = 0L;
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -6272,6 +6304,9 @@ public final class ZooKeeperProtos {
|
||||||
for (int i = 0; i < tableCfs_.size(); i++) {
|
for (int i = 0; i < tableCfs_.size(); i++) {
|
||||||
output.writeMessage(5, tableCfs_.get(i));
|
output.writeMessage(5, tableCfs_.get(i));
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||||
|
output.writeInt64(6, bandwidth_);
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6301,6 +6336,10 @@ public final class ZooKeeperProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeMessageSize(5, tableCfs_.get(i));
|
.computeMessageSize(5, tableCfs_.get(i));
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeInt64Size(6, bandwidth_);
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -6340,6 +6379,11 @@ public final class ZooKeeperProtos {
|
||||||
.equals(other.getConfigurationList());
|
.equals(other.getConfigurationList());
|
||||||
result = result && getTableCfsList()
|
result = result && getTableCfsList()
|
||||||
.equals(other.getTableCfsList());
|
.equals(other.getTableCfsList());
|
||||||
|
result = result && (hasBandwidth() == other.hasBandwidth());
|
||||||
|
if (hasBandwidth()) {
|
||||||
|
result = result && (getBandwidth()
|
||||||
|
== other.getBandwidth());
|
||||||
|
}
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -6373,6 +6417,10 @@ public final class ZooKeeperProtos {
|
||||||
hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
|
hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + getTableCfsList().hashCode();
|
hash = (53 * hash) + getTableCfsList().hashCode();
|
||||||
}
|
}
|
||||||
|
if (hasBandwidth()) {
|
||||||
|
hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + hashLong(getBandwidth());
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
memoizedHashCode = hash;
|
memoizedHashCode = hash;
|
||||||
return hash;
|
return hash;
|
||||||
|
@ -6512,6 +6560,8 @@ public final class ZooKeeperProtos {
|
||||||
} else {
|
} else {
|
||||||
tableCfsBuilder_.clear();
|
tableCfsBuilder_.clear();
|
||||||
}
|
}
|
||||||
|
bandwidth_ = 0L;
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000020);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6575,6 +6625,10 @@ public final class ZooKeeperProtos {
|
||||||
} else {
|
} else {
|
||||||
result.tableCfs_ = tableCfsBuilder_.build();
|
result.tableCfs_ = tableCfsBuilder_.build();
|
||||||
}
|
}
|
||||||
|
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
|
||||||
|
to_bitField0_ |= 0x00000004;
|
||||||
|
}
|
||||||
|
result.bandwidth_ = bandwidth_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -6679,6 +6733,9 @@ public final class ZooKeeperProtos {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (other.hasBandwidth()) {
|
||||||
|
setBandwidth(other.getBandwidth());
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -7626,6 +7683,39 @@ public final class ZooKeeperProtos {
|
||||||
return tableCfsBuilder_;
|
return tableCfsBuilder_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional int64 bandwidth = 6;
|
||||||
|
private long bandwidth_ ;
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
public boolean hasBandwidth() {
|
||||||
|
return ((bitField0_ & 0x00000020) == 0x00000020);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
public long getBandwidth() {
|
||||||
|
return bandwidth_;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
public Builder setBandwidth(long value) {
|
||||||
|
bitField0_ |= 0x00000020;
|
||||||
|
bandwidth_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional int64 bandwidth = 6;</code>
|
||||||
|
*/
|
||||||
|
public Builder clearBandwidth() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000020);
|
||||||
|
bandwidth_ = 0L;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
|
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10915,24 +11005,24 @@ public final class ZooKeeperProtos {
|
||||||
"ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLE" +
|
"ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLE" +
|
||||||
"D\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007Tabl" +
|
"D\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007Tabl" +
|
||||||
"eCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Table" +
|
"eCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Table" +
|
||||||
"Name\022\020\n\010families\030\002 \003(\014\"\305\001\n\017ReplicationPe" +
|
"Name\022\020\n\010families\030\002 \003(\014\"\330\001\n\017ReplicationPe" +
|
||||||
"er\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replicationEnd" +
|
"er\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replicationEnd" +
|
||||||
"pointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase.pb" +
|
"pointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase.pb" +
|
||||||
".BytesBytesPair\022/\n\rconfiguration\030\004 \003(\0132\030" +
|
".BytesBytesPair\022/\n\rconfiguration\030\004 \003(\0132\030" +
|
||||||
".hbase.pb.NameStringPair\022$\n\ttable_cfs\030\005 " +
|
".hbase.pb.NameStringPair\022$\n\ttable_cfs\030\005 " +
|
||||||
"\003(\0132\021.hbase.pb.TableCF\"g\n\020ReplicationSta" +
|
"\003(\0132\021.hbase.pb.TableCF\022\021\n\tbandwidth\030\006 \001(" +
|
||||||
"te\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replication",
|
"\003\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\0162 .h",
|
||||||
"State.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DIS" +
|
"base.pb.ReplicationState.State\"\"\n\005State\022" +
|
||||||
"ABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010po" +
|
"\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replicatio" +
|
||||||
"sition\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlock_" +
|
"nHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017Repli" +
|
||||||
"owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_name\030" +
|
"cationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tTable" +
|
||||||
"\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_owner" +
|
"Lock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Tabl" +
|
||||||
"\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthread_i" +
|
"eName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb.Ser" +
|
||||||
"d\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 " +
|
"verName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030" +
|
||||||
"\001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchState\022" +
|
"\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_time\030\006 " +
|
||||||
"\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop.hb" +
|
"\001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010BE\n*o" +
|
||||||
"ase.protobuf.generatedB\017ZooKeeperProtosH",
|
"rg.apache.hadoop.hbase.protobuf.generate",
|
||||||
"\001\210\001\001\240\001\001"
|
"dB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -10986,7 +11076,7 @@ public final class ZooKeeperProtos {
|
||||||
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
|
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_hbase_pb_ReplicationPeer_descriptor,
|
internal_static_hbase_pb_ReplicationPeer_descriptor,
|
||||||
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", });
|
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Bandwidth", });
|
||||||
internal_static_hbase_pb_ReplicationState_descriptor =
|
internal_static_hbase_pb_ReplicationState_descriptor =
|
||||||
getDescriptor().getMessageTypes().get(8);
|
getDescriptor().getMessageTypes().get(8);
|
||||||
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
|
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
|
||||||
|
|
|
@ -135,6 +135,7 @@ message ReplicationPeer {
|
||||||
repeated BytesBytesPair data = 3;
|
repeated BytesBytesPair data = 3;
|
||||||
repeated NameStringPair configuration = 4;
|
repeated NameStringPair configuration = 4;
|
||||||
repeated TableCF table_cfs = 5;
|
repeated TableCF table_cfs = 5;
|
||||||
|
optional int64 bandwidth = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
|
@ -144,6 +145,8 @@ public class ReplicationSource extends Thread
|
||||||
private WALEntryFilter walEntryFilter;
|
private WALEntryFilter walEntryFilter;
|
||||||
// throttler
|
// throttler
|
||||||
private ReplicationThrottler throttler;
|
private ReplicationThrottler throttler;
|
||||||
|
private long defaultBandwidth;
|
||||||
|
private long currentBandwidth;
|
||||||
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
|
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
|
||||||
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
|
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
|
||||||
|
|
||||||
|
@ -179,8 +182,6 @@ public class ReplicationSource extends Thread
|
||||||
this.maxRetriesMultiplier =
|
this.maxRetriesMultiplier =
|
||||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||||
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
|
||||||
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
|
|
||||||
this.replicationQueues = replicationQueues;
|
this.replicationQueues = replicationQueues;
|
||||||
this.replicationPeers = replicationPeers;
|
this.replicationPeers = replicationPeers;
|
||||||
this.manager = manager;
|
this.manager = manager;
|
||||||
|
@ -196,6 +197,15 @@ public class ReplicationSource extends Thread
|
||||||
this.actualPeerId = replicationQueueInfo.getPeerId();
|
this.actualPeerId = replicationQueueInfo.getPeerId();
|
||||||
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
||||||
this.replicationEndpoint = replicationEndpoint;
|
this.replicationEndpoint = replicationEndpoint;
|
||||||
|
|
||||||
|
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
||||||
|
currentBandwidth = getCurrentBandwidth();
|
||||||
|
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
||||||
|
|
||||||
|
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
|
||||||
|
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
|
||||||
|
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
|
||||||
|
+ this.currentBandwidth);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decorateConf() {
|
private void decorateConf() {
|
||||||
|
@ -494,6 +504,13 @@ public class ReplicationSource extends Thread
|
||||||
return this.metrics;
|
return this.metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getCurrentBandwidth() {
|
||||||
|
ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
|
||||||
|
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
|
||||||
|
// user can set peer bandwidth to 0 to use default bandwidth
|
||||||
|
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
|
||||||
|
}
|
||||||
|
|
||||||
public class ReplicationSourceWorkerThread extends Thread {
|
public class ReplicationSourceWorkerThread extends Thread {
|
||||||
private ReplicationSource source;
|
private ReplicationSource source;
|
||||||
private String walGroupId;
|
private String walGroupId;
|
||||||
|
@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread
|
||||||
return distinctRowKeys + totalHFileEntries;
|
return distinctRowKeys + totalHFileEntries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkBandwidthChangeAndResetThrottler() {
|
||||||
|
long peerBandwidth = getCurrentBandwidth();
|
||||||
|
if (peerBandwidth != currentBandwidth) {
|
||||||
|
currentBandwidth = peerBandwidth;
|
||||||
|
throttler.setBandwidth((double) currentBandwidth / 10.0);
|
||||||
|
LOG.info("ReplicationSource : " + peerId
|
||||||
|
+ " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do the shipping logic
|
* Do the shipping logic
|
||||||
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
|
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
|
||||||
|
@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread
|
||||||
}
|
}
|
||||||
while (isWorkerActive()) {
|
while (isWorkerActive()) {
|
||||||
try {
|
try {
|
||||||
|
checkBandwidthChangeAndResetThrottler();
|
||||||
if (throttler.isEnabled()) {
|
if (throttler.isEnabled()) {
|
||||||
long sleepTicks = throttler.getNextSleepInterval(currentSize);
|
long sleepTicks = throttler.getNextSleepInterval(currentSize);
|
||||||
if (sleepTicks > 0) {
|
if (sleepTicks > 0) {
|
||||||
|
|
|
@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationThrottler {
|
public class ReplicationThrottler {
|
||||||
private final boolean enabled;
|
private boolean enabled;
|
||||||
private final double bandwidth;
|
private double bandwidth;
|
||||||
private long cyclePushSize;
|
private long cyclePushSize;
|
||||||
private long cycleStartTick;
|
private long cycleStartTick;
|
||||||
|
|
||||||
|
@ -118,4 +118,9 @@ public class ReplicationThrottler {
|
||||||
this.cycleStartTick = EnvironmentEdgeManager.currentTime();
|
this.cycleStartTick = EnvironmentEdgeManager.currentTime();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setBandwidth(double bandwidth) {
|
||||||
|
this.bandwidth = bandwidth;
|
||||||
|
this.enabled = this.bandwidth > 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -363,4 +363,21 @@ public class TestReplicationAdmin {
|
||||||
assertNull(admin.getPeerConfig(ID_ONE).getTableCFsMap());
|
assertNull(admin.getPeerConfig(ID_ONE).getTableCFsMap());
|
||||||
admin.removePeer(ID_ONE);
|
admin.removePeer(ID_ONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPeerBandwidth() throws ReplicationException {
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
|
rpc.setClusterKey(KEY_ONE);
|
||||||
|
admin.addPeer(ID_ONE, rpc);
|
||||||
|
admin.peerAdded(ID_ONE);
|
||||||
|
|
||||||
|
rpc = admin.getPeerConfig(ID_ONE);
|
||||||
|
assertEquals(0, rpc.getBandwidth());
|
||||||
|
|
||||||
|
rpc.setBandwidth(2097152);
|
||||||
|
admin.updatePeerConfig(ID_ONE, rpc);
|
||||||
|
|
||||||
|
assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
|
||||||
|
admin.removePeer(ID_ONE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,6 +182,15 @@ module Hbase
|
||||||
@replication_admin.removePeerTableCFs(id, map)
|
@replication_admin.removePeerTableCFs(id, map)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Set new bandwidth config for the specified peer
|
||||||
|
def set_peer_bandwidth(id, bandwidth)
|
||||||
|
rpc = get_peer_config(id)
|
||||||
|
unless rpc.nil?
|
||||||
|
rpc.setBandwidth(bandwidth)
|
||||||
|
@replication_admin.updatePeerConfig(id, rpc)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Enables a table's replication switch
|
# Enables a table's replication switch
|
||||||
def enable_tablerep(table_name)
|
def enable_tablerep(table_name)
|
||||||
|
|
|
@ -359,6 +359,7 @@ Shell.load_command_group(
|
||||||
disable_peer
|
disable_peer
|
||||||
show_peer_tableCFs
|
show_peer_tableCFs
|
||||||
set_peer_tableCFs
|
set_peer_tableCFs
|
||||||
|
set_peer_bandwidth
|
||||||
list_replicated_tables
|
list_replicated_tables
|
||||||
append_peer_tableCFs
|
append_peer_tableCFs
|
||||||
remove_peer_tableCFs
|
remove_peer_tableCFs
|
||||||
|
|
|
@ -34,13 +34,14 @@ EOF
|
||||||
peers = replication_admin.list_peers
|
peers = replication_admin.list_peers
|
||||||
|
|
||||||
formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
|
formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
|
||||||
"STATE", "TABLE_CFS"])
|
"STATE", "TABLE_CFS", "BANDWIDTH"])
|
||||||
|
|
||||||
peers.entrySet().each do |e|
|
peers.entrySet().each do |e|
|
||||||
state = replication_admin.get_peer_state(e.key)
|
state = replication_admin.get_peer_state(e.key)
|
||||||
tableCFs = replication_admin.show_peer_tableCFs(e.key)
|
tableCFs = replication_admin.show_peer_tableCFs(e.key)
|
||||||
formatter.row([ e.key, e.value.getClusterKey,
|
formatter.row([ e.key, e.value.getClusterKey,
|
||||||
e.value.getReplicationEndpointImpl, state, tableCFs ])
|
e.value.getReplicationEndpointImpl, state, tableCFs,
|
||||||
|
e.value.getBandwidth ])
|
||||||
end
|
end
|
||||||
|
|
||||||
formatter.footer(now)
|
formatter.footer(now)
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
#
|
||||||
|
# Copyright The Apache Software Foundation
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
module Shell
|
||||||
|
module Commands
|
||||||
|
class SetPeerBandwidth< Command
|
||||||
|
def help
|
||||||
|
return <<-EOF
|
||||||
|
Set the replication source per node bandwidth for the specified peer.
|
||||||
|
Examples:
|
||||||
|
|
||||||
|
# set bandwidth=2MB per regionserver for a peer
|
||||||
|
hbase> set_peer_bandwidth '1', 2097152
|
||||||
|
# unset bandwidth for a peer to use the default bandwidth configured in server-side
|
||||||
|
hbase> set_peer_bandwidth '1'
|
||||||
|
|
||||||
|
EOF
|
||||||
|
end
|
||||||
|
|
||||||
|
def command(id, bandwidth = 0)
|
||||||
|
replication_admin.set_peer_bandwidth(id, bandwidth)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -198,6 +198,24 @@ module Hbase
|
||||||
replication_admin.remove_peer(@peer_id)
|
replication_admin.remove_peer(@peer_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
|
||||||
|
cluster_key = "localhost:2181:/hbase-test"
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
replication_admin.add_peer(@peer_id, args)
|
||||||
|
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
|
||||||
|
# but here we have to do it ourselves
|
||||||
|
replication_admin.peer_added(@peer_id)
|
||||||
|
|
||||||
|
peer_config = replication_admin.get_peer_config(@peer_id)
|
||||||
|
assert_equal(0, peer_config.get_bandwidth)
|
||||||
|
replication_admin.set_peer_bandwidth(@peer_id, 2097152)
|
||||||
|
peer_config = replication_admin.get_peer_config(@peer_id)
|
||||||
|
assert_equal(2097152, peer_config.get_bandwidth)
|
||||||
|
|
||||||
|
#cleanup
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
define_test "get_peer_config: works with simple clusterKey peer" do
|
define_test "get_peer_config: works with simple clusterKey peer" do
|
||||||
cluster_key = "localhost:2181:/hbase-test"
|
cluster_key = "localhost:2181:/hbase-test"
|
||||||
args = { CLUSTER_KEY => cluster_key }
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
|
Loading…
Reference in New Issue