From 4b3df0f926f6ec64bd6e071b1ae18ff635b4cc98 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 14 Dec 2016 16:34:55 +0800 Subject: [PATCH] HBASE-17296 Provide per peer throttling for replication Signed-off-by: Phil Yang --- .../hbase/replication/ReplicationPeer.java | 6 + .../replication/ReplicationPeerConfig.java | 13 +- .../replication/ReplicationPeerZKImpl.java | 6 + .../replication/ReplicationPeersZKImpl.java | 1 + .../replication/ReplicationSerDeHelper.java | 5 + .../protobuf/generated/ZooKeeperProtos.java | 120 +++++++++++++++--- .../src/main/protobuf/ZooKeeper.proto | 1 + .../regionserver/ReplicationSource.java | 32 ++++- .../regionserver/ReplicationThrottler.java | 9 +- .../replication/TestReplicationAdmin.java | 17 +++ .../src/main/ruby/hbase/replication_admin.rb | 9 ++ hbase-shell/src/main/ruby/shell.rb | 1 + .../main/ruby/shell/commands/list_peers.rb | 5 +- .../ruby/shell/commands/set_peer_bandwidth.rb | 42 ++++++ .../test/ruby/hbase/replication_admin_test.rb | 18 +++ 15 files changed, 263 insertions(+), 22 deletions(-) create mode 100644 hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 9a6af4f253e..200d81c47ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -70,6 +70,12 @@ public interface ReplicationPeer { */ public Map> getTableCFs(); + /** + * Get the per node bandwidth upper limit for this peer + * @return the bandwidth up limit + */ + public long getPeerBandwidth(); + /** * Setup a callback for chanages to the replication peer config * @param listener Listener for config changes, usually a replication endpoint diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index e2c7bc7357e..eee521c8b77 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -41,6 +41,7 @@ public class ReplicationPeerConfig { private final Map peerData; private final Map configuration; private Map> tableCFsMap = null; + private long bandwidth = 0; public ReplicationPeerConfig() { this.peerData = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -89,13 +90,23 @@ public class ReplicationPeerConfig { this.tableCFsMap = tableCFsMap; } + public long getBandwidth() { + return this.bandwidth; + } + + public ReplicationPeerConfig setBandwidth(long bandwidth) { + this.bandwidth = bandwidth; + return this; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); builder.append("replicationEndpointImpl=").append(replicationEndpointImpl); if (tableCFsMap != null) { - builder.append(tableCFsMap.toString()); + builder.append(tableCFsMap.toString()).append(","); } + builder.append("bandwidth=").append(bandwidth); return builder.toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 049a142b456..b79a982520a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -163,6 +163,12 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep return this.tableCFs; } + + @Override + public long getPeerBandwidth() { + return this.peerConfig.getBandwidth(); + } + @Override public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { if (this.peerConfigTracker != null){ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 331e8bf31c4..9e3c92eb6b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -375,6 +375,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // or data that weren't explicitly changed existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); existingConfig.getPeerData().putAll(newConfig.getPeerData()); + existingConfig.setBandwidth(newConfig.getBandwidth()); try { ZKUtil.setData(this.zookeeper, getPeerNode(id), ReplicationSerDeHelper.toByteArray(existingConfig)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java index cdb95f7f89d..ae511e8b3cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java @@ -264,6 +264,10 @@ public final class ReplicationSerDeHelper { if (tableCFsMap != null) { peerConfig.setTableCFsMap(tableCFsMap); } + + if (peer.hasBandwidth()) { + peerConfig.setBandwidth(peer.getBandwidth()); + } return peerConfig; } @@ -308,6 +312,7 @@ public final class ReplicationSerDeHelper { } } + builder.setBandwidth(peerConfig.getBandwidth()); return builder.build(); } } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index 955995f57f2..fb06a78333e 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -5867,6 +5867,16 @@ public final class ZooKeeperProtos { */ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder( int index); + + // optional int64 bandwidth = 6; + /** + * optional int64 bandwidth = 6; + */ + boolean hasBandwidth(); + /** + * optional int64 bandwidth = 6; + */ + long getBandwidth(); } /** * Protobuf type {@code hbase.pb.ReplicationPeer} @@ -5958,6 +5968,11 @@ public final class ZooKeeperProtos { tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry)); break; } + case 48: { + bitField0_ |= 0x00000004; + bandwidth_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -6216,12 +6231,29 @@ public final class ZooKeeperProtos { return tableCfs_.get(index); } + // optional int64 bandwidth = 6; + public static final int BANDWIDTH_FIELD_NUMBER = 6; + private long bandwidth_; + /** + * optional int64 bandwidth = 6; + */ + public boolean hasBandwidth() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 bandwidth = 6; + */ + public long getBandwidth() { + return bandwidth_; + } + private void initFields() { clusterkey_ = ""; replicationEndpointImpl_ = ""; data_ = java.util.Collections.emptyList(); configuration_ = java.util.Collections.emptyList(); tableCfs_ = java.util.Collections.emptyList(); + bandwidth_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6272,6 +6304,9 @@ public final class ZooKeeperProtos { for (int i = 0; i < tableCfs_.size(); i++) { output.writeMessage(5, tableCfs_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(6, bandwidth_); + } getUnknownFields().writeTo(output); } @@ -6301,6 +6336,10 @@ public final class ZooKeeperProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, tableCfs_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(6, bandwidth_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6340,6 +6379,11 @@ public final class ZooKeeperProtos { .equals(other.getConfigurationList()); result = result && getTableCfsList() .equals(other.getTableCfsList()); + result = result && (hasBandwidth() == other.hasBandwidth()); + if (hasBandwidth()) { + result = result && (getBandwidth() + == other.getBandwidth()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -6373,6 +6417,10 @@ public final class ZooKeeperProtos { hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER; hash = (53 * hash) + getTableCfsList().hashCode(); } + if (hasBandwidth()) { + hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getBandwidth()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -6512,6 +6560,8 @@ public final class ZooKeeperProtos { } else { tableCfsBuilder_.clear(); } + bandwidth_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -6575,6 +6625,10 @@ public final class ZooKeeperProtos { } else { result.tableCfs_ = tableCfsBuilder_.build(); } + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000004; + } + result.bandwidth_ = bandwidth_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6679,6 +6733,9 @@ public final class ZooKeeperProtos { } } } + if (other.hasBandwidth()) { + setBandwidth(other.getBandwidth()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -7626,6 +7683,39 @@ public final class ZooKeeperProtos { return tableCfsBuilder_; } + // optional int64 bandwidth = 6; + private long bandwidth_ ; + /** + * optional int64 bandwidth = 6; + */ + public boolean hasBandwidth() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int64 bandwidth = 6; + */ + public long getBandwidth() { + return bandwidth_; + } + /** + * optional int64 bandwidth = 6; + */ + public Builder setBandwidth(long value) { + bitField0_ |= 0x00000020; + bandwidth_ = value; + onChanged(); + return this; + } + /** + * optional int64 bandwidth = 6; + */ + public Builder clearBandwidth() { + bitField0_ = (bitField0_ & ~0x00000020); + bandwidth_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer) } @@ -10915,24 +11005,24 @@ public final class ZooKeeperProtos { "ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLE" + "D\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007Tabl" + "eCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Table" + - "Name\022\020\n\010families\030\002 \003(\014\"\305\001\n\017ReplicationPe" + + "Name\022\020\n\010families\030\002 \003(\014\"\330\001\n\017ReplicationPe" + "er\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replicationEnd" + "pointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase.pb" + ".BytesBytesPair\022/\n\rconfiguration\030\004 \003(\0132\030" + ".hbase.pb.NameStringPair\022$\n\ttable_cfs\030\005 " + - "\003(\0132\021.hbase.pb.TableCF\"g\n\020ReplicationSta" + - "te\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replication", - "State.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DIS" + - "ABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010po" + - "sition\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlock_" + - "owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_name\030" + - "\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_owner" + - "\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthread_i" + - "d\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 " + - "\001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchState\022" + - "\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\017ZooKeeperProtosH", - "\001\210\001\001\240\001\001" + "\003(\0132\021.hbase.pb.TableCF\022\021\n\tbandwidth\030\006 \001(" + + "\003\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\0162 .h", + "base.pb.ReplicationState.State\"\"\n\005State\022" + + "\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replicatio" + + "nHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017Repli" + + "cationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tTable" + + "Lock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Tabl" + + "eName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb.Ser" + + "verName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030" + + "\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_time\030\006 " + + "\001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010BE\n*o" + + "rg.apache.hadoop.hbase.protobuf.generate", + "dB\017ZooKeeperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -10986,7 +11076,7 @@ public final class ZooKeeperProtos { internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ReplicationPeer_descriptor, - new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", }); + new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Bandwidth", }); internal_static_hbase_pb_ReplicationState_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto index 60ed2291787..a632552daf7 100644 --- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -135,6 +135,7 @@ message ReplicationPeer { repeated BytesBytesPair data = 3; repeated NameStringPair configuration = 4; repeated TableCF table_cfs = 5; + optional int64 bandwidth = 6; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7fd7d94438f..63549d0a190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -144,6 +145,8 @@ public class ReplicationSource extends Thread private WALEntryFilter walEntryFilter; // throttler private ReplicationThrottler throttler; + private long defaultBandwidth; + private long currentBandwidth; private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); @@ -179,8 +182,6 @@ public class ReplicationSource extends Thread this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); - long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); - this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; @@ -196,6 +197,15 @@ public class ReplicationSource extends Thread this.actualPeerId = replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; + + defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); + currentBandwidth = getCurrentBandwidth(); + this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); + + LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity + + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" + + this.currentBandwidth); } private void decorateConf() { @@ -494,6 +504,13 @@ public class ReplicationSource extends Thread return this.metrics; } + private long getCurrentBandwidth() { + ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId); + long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0; + // user can set peer bandwidth to 0 to use default bandwidth + return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; + } + public class ReplicationSourceWorkerThread extends Thread { private ReplicationSource source; private String walGroupId; @@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread return distinctRowKeys + totalHFileEntries; } + private void checkBandwidthChangeAndResetThrottler() { + long peerBandwidth = getCurrentBandwidth(); + if (peerBandwidth != currentBandwidth) { + currentBandwidth = peerBandwidth; + throttler.setBandwidth((double) currentBandwidth / 10.0); + LOG.info("ReplicationSource : " + peerId + + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); + } + } + /** * Do the shipping logic * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) @@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread } while (isWorkerActive()) { try { + checkBandwidthChangeAndResetThrottler(); if (throttler.isEnabled()) { long sleepTicks = throttler.getNextSleepInterval(currentSize); if (sleepTicks > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java index c756576529a..8da9352406f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; */ @InterfaceAudience.Private public class ReplicationThrottler { - private final boolean enabled; - private final double bandwidth; + private boolean enabled; + private double bandwidth; private long cyclePushSize; private long cycleStartTick; @@ -118,4 +118,9 @@ public class ReplicationThrottler { this.cycleStartTick = EnvironmentEdgeManager.currentTime(); } } + + public void setBandwidth(double bandwidth) { + this.bandwidth = bandwidth; + this.enabled = this.bandwidth > 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index b5627c05a8e..cf7b23615c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -363,4 +363,21 @@ public class TestReplicationAdmin { assertNull(admin.getPeerConfig(ID_ONE).getTableCFsMap()); admin.removePeer(ID_ONE); } + + @Test + public void testPeerBandwidth() throws ReplicationException { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addPeer(ID_ONE, rpc); + admin.peerAdded(ID_ONE); + + rpc = admin.getPeerConfig(ID_ONE); + assertEquals(0, rpc.getBandwidth()); + + rpc.setBandwidth(2097152); + admin.updatePeerConfig(ID_ONE, rpc); + + assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); + admin.removePeer(ID_ONE); + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 2e240e1eead..f0da3aea2bc 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -182,6 +182,15 @@ module Hbase @replication_admin.removePeerTableCFs(id, map) end + # Set new bandwidth config for the specified peer + def set_peer_bandwidth(id, bandwidth) + rpc = get_peer_config(id) + unless rpc.nil? + rpc.setBandwidth(bandwidth) + @replication_admin.updatePeerConfig(id, rpc) + end + end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 963e3695883..f94e3340b02 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -359,6 +359,7 @@ Shell.load_command_group( disable_peer show_peer_tableCFs set_peer_tableCFs + set_peer_bandwidth list_replicated_tables append_peer_tableCFs remove_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 6444c7958fa..82b5237b43b 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -34,13 +34,14 @@ EOF peers = replication_admin.list_peers formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME", - "STATE", "TABLE_CFS"]) + "STATE", "TABLE_CFS", "BANDWIDTH"]) peers.entrySet().each do |e| state = replication_admin.get_peer_state(e.key) tableCFs = replication_admin.show_peer_tableCFs(e.key) formatter.row([ e.key, e.value.getClusterKey, - e.value.getReplicationEndpointImpl, state, tableCFs ]) + e.value.getReplicationEndpointImpl, state, tableCFs, + e.value.getBandwidth ]) end formatter.footer(now) diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb new file mode 100644 index 00000000000..d9495af7d4b --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb @@ -0,0 +1,42 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerBandwidth< Command + def help + return <<-EOF +Set the replication source per node bandwidth for the specified peer. +Examples: + + # set bandwidth=2MB per regionserver for a peer + hbase> set_peer_bandwidth '1', 2097152 + # unset bandwidth for a peer to use the default bandwidth configured in server-side + hbase> set_peer_bandwidth '1' + +EOF + end + + def command(id, bandwidth = 0) + replication_admin.set_peer_bandwidth(id, bandwidth) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 84bdf565aac..b73739b6274 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -198,6 +198,24 @@ module Hbase replication_admin.remove_peer(@peer_id) end + define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do + cluster_key = "localhost:2181:/hbase-test" + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args) + # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added + # but here we have to do it ourselves + replication_admin.peer_added(@peer_id) + + peer_config = replication_admin.get_peer_config(@peer_id) + assert_equal(0, peer_config.get_bandwidth) + replication_admin.set_peer_bandwidth(@peer_id, 2097152) + peer_config = replication_admin.get_peer_config(@peer_id) + assert_equal(2097152, peer_config.get_bandwidth) + + #cleanup + replication_admin.remove_peer(@peer_id) + end + define_test "get_peer_config: works with simple clusterKey peer" do cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key }