From 2333596279b63c045e5fd5be09b2fce8ce5c9980 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 13 Dec 2016 04:20:20 -0800 Subject: [PATCH] HBASE-17296 Provide per peer throttling for replication (Guanghao Zhang) --- .../replication/ReplicationSerDeHelper.java | 4 + .../hbase/replication/ReplicationPeer.java | 6 + .../replication/ReplicationPeerConfig.java | 13 +- .../replication/ReplicationPeerZKImpl.java | 5 + .../replication/ReplicationPeersZKImpl.java | 1 + .../protobuf/generated/ClientProtos.java | 30 ++--- .../protobuf/generated/ZooKeeperProtos.java | 115 +++++++++++++++-- .../src/main/protobuf/ZooKeeper.proto | 1 + .../protobuf/generated/ClientProtos.java | 30 ++--- .../protobuf/generated/ZooKeeperProtos.java | 117 ++++++++++++++++-- .../src/main/protobuf/ZooKeeper.proto | 1 + .../regionserver/ReplicationSource.java | 32 ++++- .../regionserver/ReplicationThrottler.java | 9 +- .../replication/TestReplicationAdmin.java | 17 +++ .../src/main/ruby/hbase/replication_admin.rb | 9 ++ hbase-shell/src/main/ruby/shell.rb | 1 + .../main/ruby/shell/commands/list_peers.rb | 5 +- .../ruby/shell/commands/set_peer_bandwidth.rb | 42 +++++++ .../test/ruby/hbase/replication_admin_test.rb | 18 +++ 19 files changed, 393 insertions(+), 63 deletions(-) create mode 100644 hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java index 6ac441738f8..dd83fb15eee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -287,6 +287,9 @@ public final class ReplicationSerDeHelper { } peerConfig.setNamespaces(namespaces); } + if (peer.hasBandwidth()) { + peerConfig.setBandwidth(peer.getBandwidth()); + } return peerConfig; } @@ -326,6 +329,7 @@ public final class ReplicationSerDeHelper { } } + builder.setBandwidth(peerConfig.getBandwidth()); return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index bd2b7009d12..4f18048aa6d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -78,6 +78,12 @@ public interface ReplicationPeer { */ public Set getNamespaces(); + /** + * Get the per node bandwidth upper limit for this peer + * @return the bandwidth upper limit + */ + public long getPeerBandwidth(); + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 64f6d1bc106..790f0216c04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -43,6 +43,7 @@ public class ReplicationPeerConfig { private final Map configuration; private Map> tableCFsMap = null; private Set namespaces = null; + private long bandwidth = 0; public ReplicationPeerConfig() { this.peerData = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -102,6 +103,15 @@ public class ReplicationPeerConfig { return this; } + public long getBandwidth() { + return this.bandwidth; + } + + public ReplicationPeerConfig setBandwidth(long bandwidth) { + this.bandwidth = bandwidth; + return this; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); @@ -110,8 +120,9 @@ public class ReplicationPeerConfig { builder.append("namespaces=").append(namespaces.toString()).append(","); } if (tableCFsMap != null) { - builder.append("tableCFs=").append(tableCFsMap.toString()); + builder.append("tableCFs=").append(tableCFsMap.toString()).append(","); } + builder.append("bandwidth=").append(bandwidth); return builder.toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 5302b1bd007..c58bd712049 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -173,6 +173,11 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase return this.peerConfig.getNamespaces(); } + @Override + public long getPeerBandwidth() { + return this.peerConfig.getBandwidth(); + } + @Override public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { if (this.peerConfigTracker != null){ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index d12c4e90aa3..9a617a70b02 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -367,6 +367,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re existingConfig.getPeerData().putAll(newConfig.getPeerData()); existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); existingConfig.setNamespaces(newConfig.getNamespaces()); + existingConfig.setBandwidth(newConfig.getBandwidth()); try { ZKUtil.setData(this.zookeeper, getPeerNode(id), diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java index e9458df6dae..eab62ebbb23 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java @@ -19372,7 +19372,7 @@ public final class ClientProtos { /** *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
* @@ -19382,7 +19382,7 @@ public final class ClientProtos { /** *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
* @@ -19392,7 +19392,7 @@ public final class ClientProtos { /** *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
* @@ -19902,7 +19902,7 @@ public final class ClientProtos { /** *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
* @@ -19914,7 +19914,7 @@ public final class ClientProtos { /** *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
* @@ -19926,7 +19926,7 @@ public final class ClientProtos { /** *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
* @@ -21436,7 +21436,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21448,7 +21448,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21464,7 +21464,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21486,7 +21486,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21506,7 +21506,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21532,7 +21532,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21551,7 +21551,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21565,7 +21565,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* @@ -21582,7 +21582,7 @@ public final class ClientProtos { /** *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
* diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java index 6baf8457a3b..90ec659c77b 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java @@ -4994,6 +4994,15 @@ public final class ZooKeeperProtos { * repeated bytes namespaces = 6; */ org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString getNamespaces(int index); + + /** + * optional int64 bandwidth = 7; + */ + boolean hasBandwidth(); + /** + * optional int64 bandwidth = 7; + */ + long getBandwidth(); } /** *
@@ -5018,6 +5027,7 @@ public final class ZooKeeperProtos {
       configuration_ = java.util.Collections.emptyList();
       tableCfs_ = java.util.Collections.emptyList();
       namespaces_ = java.util.Collections.emptyList();
+      bandwidth_ = 0L;
     }
 
     @java.lang.Override
@@ -5095,6 +5105,11 @@ public final class ZooKeeperProtos {
               namespaces_.add(input.readBytes());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000004;
+              bandwidth_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5358,6 +5373,21 @@ public final class ZooKeeperProtos {
       return namespaces_.get(index);
     }
 
+    public static final int BANDWIDTH_FIELD_NUMBER = 7;
+    private long bandwidth_;
+    /**
+     * optional int64 bandwidth = 7;
+     */
+    public boolean hasBandwidth() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * optional int64 bandwidth = 7;
+     */
+    public long getBandwidth() {
+      return bandwidth_;
+    }
+
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
@@ -5410,6 +5440,9 @@ public final class ZooKeeperProtos {
       for (int i = 0; i < namespaces_.size(); i++) {
         output.writeBytes(6, namespaces_.get(i));
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(7, bandwidth_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -5445,6 +5478,10 @@ public final class ZooKeeperProtos {
         size += dataSize;
         size += 1 * getNamespacesList().size();
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, bandwidth_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -5480,6 +5517,11 @@ public final class ZooKeeperProtos {
           .equals(other.getTableCfsList());
       result = result && getNamespacesList()
           .equals(other.getNamespacesList());
+      result = result && (hasBandwidth() == other.hasBandwidth());
+      if (hasBandwidth()) {
+        result = result && (getBandwidth()
+            == other.getBandwidth());
+      }
       result = result && unknownFields.equals(other.unknownFields);
       return result;
     }
@@ -5515,6 +5557,11 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
         hash = (53 * hash) + getNamespacesList().hashCode();
       }
+      if (hasBandwidth()) {
+        hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+        hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
+            getBandwidth());
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5665,6 +5712,8 @@ public final class ZooKeeperProtos {
         }
         namespaces_ = java.util.Collections.emptyList();
         bitField0_ = (bitField0_ & ~0x00000020);
+        bandwidth_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -5729,6 +5778,10 @@ public final class ZooKeeperProtos {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.namespaces_ = namespaces_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.bandwidth_ = bandwidth_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5869,6 +5922,9 @@ public final class ZooKeeperProtos {
           }
           onChanged();
         }
+        if (other.hasBandwidth()) {
+          setBandwidth(other.getBandwidth());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -6888,6 +6944,38 @@ public final class ZooKeeperProtos {
         onChanged();
         return this;
       }
+
+      private long bandwidth_ ;
+      /**
+       * optional int64 bandwidth = 7;
+       */
+      public boolean hasBandwidth() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * optional int64 bandwidth = 7;
+       */
+      public long getBandwidth() {
+        return bandwidth_;
+      }
+      /**
+       * optional int64 bandwidth = 7;
+       */
+      public Builder setBandwidth(long value) {
+        bitField0_ |= 0x00000040;
+        bandwidth_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional int64 bandwidth = 7;
+       */
+      public Builder clearBandwidth() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        bandwidth_ = 0L;
+        onChanged();
+        return this;
+      }
       public final Builder setUnknownFields(
           final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
         return super.setUnknownFields(unknownFields);
@@ -9803,23 +9891,24 @@ public final class ZooKeeperProtos {
       "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
       "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
       "ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
-      "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
+      "bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
       "nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
       "EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
       ".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
       "\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
       "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
-      "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
-      "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
-      "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
-      "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
-      "TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
-      ".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
-      "b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
-      "ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
-      "me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
-      "BL\n1org.apache.hadoop.hbase.shaded.proto" +
-      "buf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
+      "\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
+      "tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
+      "onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
+      "ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
+      "position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
+      "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
+      "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
+      "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
+      "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
+      "te\022\017\n\007enabled\030\001 \001(\010BL\n1org.apache.hadoop" +
+      ".hbase.shaded.protobuf.generatedB\017ZooKee" +
+      "perProtosH\001\210\001\001\240\001\001"
     };
     org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
         new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
@@ -9876,7 +9965,7 @@ public final class ZooKeeperProtos {
     internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
       org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
         internal_static_hbase_pb_ReplicationPeer_descriptor,
-        new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
+        new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
     internal_static_hbase_pb_ReplicationState_descriptor =
       getDescriptor().getMessageTypes().get(7);
     internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
diff --git a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
index c66639ba5b7..323862c98df 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
@@ -122,6 +122,7 @@ message ReplicationPeer {
   repeated NameStringPair configuration = 4;
   repeated TableCF table_cfs = 5;
   repeated bytes namespaces = 6;
+  optional int64 bandwidth = 7;
 }
 
 /**
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index c35617b9dcc..d7e2b6f26b1 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -19037,7 +19037,7 @@ public final class ClientProtos {
      *
      * 
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
*/ @@ -19047,7 +19047,7 @@ public final class ClientProtos { * *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
*/ @@ -19057,7 +19057,7 @@ public final class ClientProtos { * *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
*/ @@ -19591,7 +19591,7 @@ public final class ClientProtos { * *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
*/ @@ -19603,7 +19603,7 @@ public final class ClientProtos { * *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
*/ @@ -19615,7 +19615,7 @@ public final class ClientProtos { * *
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * 
*/ @@ -21117,7 +21117,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21129,7 +21129,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21145,7 +21145,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21167,7 +21167,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21187,7 +21187,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21212,7 +21212,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21231,7 +21231,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21245,7 +21245,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ @@ -21261,7 +21261,7 @@ public final class ClientProtos { * *
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * 
*/ diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index 36cd8b997ac..0095043585c 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -4796,6 +4796,16 @@ public final class ZooKeeperProtos { * repeated bytes namespaces = 6; */ com.google.protobuf.ByteString getNamespaces(int index); + + // optional int64 bandwidth = 7; + /** + * optional int64 bandwidth = 7; + */ + boolean hasBandwidth(); + /** + * optional int64 bandwidth = 7; + */ + long getBandwidth(); } /** * Protobuf type {@code hbase.pb.ReplicationPeer} @@ -4895,6 +4905,11 @@ public final class ZooKeeperProtos { namespaces_.add(input.readBytes()); break; } + case 56: { + bitField0_ |= 0x00000004; + bandwidth_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -5179,6 +5194,22 @@ public final class ZooKeeperProtos { return namespaces_.get(index); } + // optional int64 bandwidth = 7; + public static final int BANDWIDTH_FIELD_NUMBER = 7; + private long bandwidth_; + /** + * optional int64 bandwidth = 7; + */ + public boolean hasBandwidth() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 bandwidth = 7; + */ + public long getBandwidth() { + return bandwidth_; + } + private void initFields() { clusterkey_ = ""; replicationEndpointImpl_ = ""; @@ -5186,6 +5217,7 @@ public final class ZooKeeperProtos { configuration_ = java.util.Collections.emptyList(); tableCfs_ = java.util.Collections.emptyList(); namespaces_ = java.util.Collections.emptyList(); + bandwidth_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5239,6 +5271,9 @@ public final class ZooKeeperProtos { for (int i = 0; i < namespaces_.size(); i++) { output.writeBytes(6, namespaces_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(7, bandwidth_); + } getUnknownFields().writeTo(output); } @@ -5277,6 +5312,10 @@ public final class ZooKeeperProtos { size += dataSize; size += 1 * getNamespacesList().size(); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(7, bandwidth_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5318,6 +5357,11 @@ public final class ZooKeeperProtos { .equals(other.getTableCfsList()); result = result && getNamespacesList() .equals(other.getNamespacesList()); + result = result && (hasBandwidth() == other.hasBandwidth()); + if (hasBandwidth()) { + result = result && (getBandwidth() + == other.getBandwidth()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5355,6 +5399,10 @@ public final class ZooKeeperProtos { hash = (37 * hash) + NAMESPACES_FIELD_NUMBER; hash = (53 * hash) + getNamespacesList().hashCode(); } + if (hasBandwidth()) { + hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getBandwidth()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -5496,6 +5544,8 @@ public final class ZooKeeperProtos { } namespaces_ = java.util.Collections.emptyList(); bitField0_ = (bitField0_ & ~0x00000020); + bandwidth_ = 0L; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -5564,6 +5614,10 @@ public final class ZooKeeperProtos { bitField0_ = (bitField0_ & ~0x00000020); } result.namespaces_ = namespaces_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000004; + } + result.bandwidth_ = bandwidth_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5678,6 +5732,9 @@ public final class ZooKeeperProtos { } onChanged(); } + if (other.hasBandwidth()) { + setBandwidth(other.getBandwidth()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6697,6 +6754,39 @@ public final class ZooKeeperProtos { return this; } + // optional int64 bandwidth = 7; + private long bandwidth_ ; + /** + * optional int64 bandwidth = 7; + */ + public boolean hasBandwidth() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int64 bandwidth = 7; + */ + public long getBandwidth() { + return bandwidth_; + } + /** + * optional int64 bandwidth = 7; + */ + public Builder setBandwidth(long value) { + bitField0_ |= 0x00000040; + bandwidth_ = value; + onChanged(); + return this; + } + /** + * optional int64 bandwidth = 7; + */ + public Builder clearBandwidth() { + bitField0_ = (bitField0_ & ~0x00000040); + bandwidth_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer) } @@ -9446,23 +9536,24 @@ public final class ZooKeeperProtos { "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" + "ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta", - "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" + + "bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" + "nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" + "EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" + ".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" + "\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" + "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" + - "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" + - "2 .hbase.pb.ReplicationState.State\"\"\n\005St" + - "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" + - "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t", - "TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" + - ".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" + - "b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" + - "ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" + - "me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" + - "BE\n*org.apache.hadoop.hbase.protobuf.gen" + - "eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" + "\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" + + "tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" + + "onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" + + "ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010", + "position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" + + "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" + + "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" + + "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" + + "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" + + "te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" + + ".hbase.protobuf.generatedB\017ZooKeeperProt" + + "osH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9510,7 +9601,7 @@ public final class ZooKeeperProtos { internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ReplicationPeer_descriptor, - new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", }); + new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", }); internal_static_hbase_pb_ReplicationState_descriptor = getDescriptor().getMessageTypes().get(7); internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto index a0c9d01c68c..6f13e4a5807 100644 --- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -122,6 +122,7 @@ message ReplicationPeer { repeated NameStringPair configuration = 4; repeated TableCF table_cfs = 5; repeated bytes namespaces = 6; + optional int64 bandwidth = 7; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 388efbf9d30..a6fe0fb54fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -144,6 +145,8 @@ public class ReplicationSource extends Thread private WALEntryFilter walEntryFilter; // throttler private ReplicationThrottler throttler; + private long defaultBandwidth; + private long currentBandwidth; private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); @@ -179,8 +182,6 @@ public class ReplicationSource extends Thread this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); - long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); - this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; @@ -196,6 +197,15 @@ public class ReplicationSource extends Thread this.actualPeerId = replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; + + defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); + currentBandwidth = getCurrentBandwidth(); + this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); + + LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity + + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" + + this.currentBandwidth); } private void decorateConf() { @@ -494,6 +504,13 @@ public class ReplicationSource extends Thread return this.metrics; } + private long getCurrentBandwidth() { + ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId); + long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0; + // user can set peer bandwidth to 0 to use default bandwidth + return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; + } + public class ReplicationSourceWorkerThread extends Thread { ReplicationSource source; String walGroupId; @@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread return distinctRowKeys + totalHFileEntries; } + private void checkBandwidthChangeAndResetThrottler() { + long peerBandwidth = getCurrentBandwidth(); + if (peerBandwidth != currentBandwidth) { + currentBandwidth = peerBandwidth; + throttler.setBandwidth((double) currentBandwidth / 10.0); + LOG.info("ReplicationSource : " + peerId + + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); + } + } + /** * Do the shipping logic * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) @@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread } while (isWorkerActive()) { try { + checkBandwidthChangeAndResetThrottler(); if (throttler.isEnabled()) { long sleepTicks = throttler.getNextSleepInterval(currentSize); if (sleepTicks > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java index c756576529a..8da9352406f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; */ @InterfaceAudience.Private public class ReplicationThrottler { - private final boolean enabled; - private final double bandwidth; + private boolean enabled; + private double bandwidth; private long cyclePushSize; private long cycleStartTick; @@ -118,4 +118,9 @@ public class ReplicationThrottler { this.cycleStartTick = EnvironmentEdgeManager.currentTime(); } } + + public void setBandwidth(double bandwidth) { + this.bandwidth = bandwidth; + this.enabled = this.bandwidth > 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index c0d18ddc438..7363fb97785 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -469,4 +469,21 @@ public class TestReplicationAdmin { admin.removePeer(ID_ONE); } + + @Test + public void testPeerBandwidth() throws ReplicationException { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addPeer(ID_ONE, rpc); + admin.peerAdded(ID_ONE); + + rpc = admin.getPeerConfig(ID_ONE); + assertEquals(0, rpc.getBandwidth()); + + rpc.setBandwidth(2097152); + admin.updatePeerConfig(ID_ONE, rpc); + + assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); + admin.removePeer(ID_ONE); + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 8aa158b5e0d..5fd23d396c4 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -252,6 +252,15 @@ module Hbase end end + # Set new bandwidth config for the specified peer + def set_peer_bandwidth(id, bandwidth) + rpc = get_peer_config(id) + unless rpc.nil? + rpc.setBandwidth(bandwidth) + @replication_admin.updatePeerConfig(id, rpc) + end + end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 02f81916682..4b111f17de4 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -375,6 +375,7 @@ Shell.load_command_group( remove_peer_namespaces show_peer_tableCFs set_peer_tableCFs + set_peer_bandwidth list_replicated_tables append_peer_tableCFs remove_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index ed6b57525f6..7d531589223 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -33,14 +33,15 @@ EOF peers = replication_admin.list_peers formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME", - "STATE", "NAMESPACES", "TABLE_CFS"]) + "STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"]) peers.entrySet().each do |e| state = replication_admin.get_peer_state(e.key) namespaces = replication_admin.show_peer_namespaces(e.value) tableCFs = replication_admin.show_peer_tableCFs(e.key) formatter.row([ e.key, e.value.getClusterKey, - e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ]) + e.value.getReplicationEndpointImpl, state, namespaces, tableCFs, + e.value.getBandwidth ]) end formatter.footer() diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb new file mode 100644 index 00000000000..d9495af7d4b --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb @@ -0,0 +1,42 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerBandwidth< Command + def help + return <<-EOF +Set the replication source per node bandwidth for the specified peer. +Examples: + + # set bandwidth=2MB per regionserver for a peer + hbase> set_peer_bandwidth '1', 2097152 + # unset bandwidth for a peer to use the default bandwidth configured in server-side + hbase> set_peer_bandwidth '1' + +EOF + end + + def command(id, bandwidth = 0) + replication_admin.set_peer_bandwidth(id, bandwidth) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 11ff6031e5b..cd1fe35a942 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -371,6 +371,24 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do + cluster_key = "localhost:2181:/hbase-test" + args = { CLUSTER_KEY => cluster_key } + command(:add_peer, @peer_id, args) + # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added + # but here we have to do it ourselves + replication_admin.peer_added(@peer_id) + + peer_config = command(:get_peer_config, @peer_id) + assert_equal(0, peer_config.get_bandwidth) + command(:set_peer_bandwidth, @peer_id, 2097152) + peer_config = command(:get_peer_config, @peer_id) + assert_equal(2097152, peer_config.get_bandwidth) + + #cleanup + command(:remove_peer, @peer_id) + end + define_test "get_peer_config: works with simple clusterKey peer" do cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key }