HBASE-24764 : Add support of adding base peer configs via hbase-site.xml for all replication peers (#2327)

Note: branch-1 has design difference compared to other branches in the replication sub-system. HMaster does not coordinate replication actions in branch-1 and hence each RS is responsible for initing peers and updating ZK states. As part of this change we are updating zk state of peers after reading from configuration, so if there is a divergence in configuration across RS the result can be can be non-deterministic and the last RS RPC will win.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
ankitjain64 2020-09-17 09:39:55 -07:00 committed by GitHub
parent 7801b45f47
commit f0acafc58e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 0 deletions

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.replication;
import com.google.common.base.Splitter;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -43,6 +46,9 @@ public class ReplicationPeerConfig {
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
private long bandwidth = 0;
public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
"hbase.replication.peer.default.config";
public ReplicationPeerConfig() {
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
this.configuration = new HashMap<String, String>(0);
@ -99,6 +105,35 @@ public class ReplicationPeerConfig {
return this;
}
/**
* Helper method to add base peer configs from Configuration to ReplicationPeerConfig
* if not present in latter.
*
* This merges the user supplied peer configuration
* {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
* provided as property hbase.replication.peer.base.configs in hbase configuration.
* Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
* of conf is retained if already present in ReplicationPeerConfig.
*
* @param conf Configuration
*/
public void addBasePeerConfigsIfNotPresent(Configuration conf) {
String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
if (basePeerConfigs.length() != 0) {
Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
.withKeyValueSeparator("=").split(basePeerConfigs);
for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
String configName = entry.getKey();
String configValue = entry.getValue();
// Only override if base config does not exist in existing replication peer configs
if (!this.getConfiguration().containsKey(configName)) {
this.getConfiguration().put(configName, configValue);
}
}
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");

View File

@ -123,6 +123,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
checkQueuesDeleted(id);
peerConfig.addBasePeerConfigsIfNotPresent(this.conf);
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
@ -451,6 +452,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
ReplicationPeerZKImpl previous =
((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
ReplicationPeerConfig peerConfig = peerClusters.get(peerId).getPeerConfig();
peerConfig.addBasePeerConfigsIfNotPresent(this.conf);
updatePeerConfig(peerId, peerConfig);
if (previous == null) {
LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
} else {

View File

@ -493,6 +493,86 @@ public class TestMasterReplication {
}
}
/**
* Tests that base replication peer configs are applied on peer creation
* and the configs are overridden if updated as part of updatePeerConfig()
*
*/
@Test
public void testBasePeerConfigsForPeerMutations()
throws Exception {
LOG.info("testBasePeerConfigsForPeerMutations");
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
String firstCustomPeerConfigValue = "test";
String firstCustomPeerConfigUpdatedValue = "test_updated";
String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
String secondCustomPeerConfigValue = "testSecond";
String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
try {
baseConfiguration.set(ReplicationPeerConfig.HBASE_REPLICATION_PEER_BASE_CONFIG,
firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
startMiniClusters(2);
addPeer("1", 0, 1);
addPeer("2", 0, 1);
ReplicationAdmin replicationAdmin = new ReplicationAdmin(configurations[0]);
ReplicationPeerConfig replicationPeerConfig1 = replicationAdmin.getPeerConfig("1");
ReplicationPeerConfig replicationPeerConfig2 = replicationAdmin.getPeerConfig("2");
// Validates base configs 1 is present for both peer.
assertEquals(firstCustomPeerConfigValue, replicationPeerConfig1.
getConfiguration().get(firstCustomPeerConfigKey));
assertEquals(firstCustomPeerConfigValue, replicationPeerConfig2.
getConfiguration().get(firstCustomPeerConfigKey));
// override value of configuration 1 for peer "1".
replicationPeerConfig1.getConfiguration().put(firstCustomPeerConfigKey,
firstCustomPeerConfigUpdatedValue);
// add configuration 2 for peer "2".
replicationPeerConfig2.getConfiguration().put(secondCustomPeerConfigKey,
secondCustomPeerConfigUpdatedValue);
replicationAdmin.updatePeerConfig("1", replicationPeerConfig1);
replicationAdmin.updatePeerConfig("2", replicationPeerConfig2);
// validates configuration is overridden by updateReplicationPeerConfig
assertEquals(firstCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("1").
getConfiguration().get(firstCustomPeerConfigKey));
assertEquals(secondCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("2").
getConfiguration().get(secondCustomPeerConfigKey));
// Add second config to base config and perform restart.
utilities[0].getConfiguration().set(ReplicationPeerConfig.
HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
.concat("=").concat(secondCustomPeerConfigValue));
utilities[0].shutdownMiniHBaseCluster();
utilities[0].restartHBaseCluster(1);
replicationAdmin = new ReplicationAdmin(configurations[0]);
// Both retains the value of base configuration 1 value as before restart.
// Peer 1 (Update value), Peer 2 (Base Value)
assertEquals(firstCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("1").
getConfiguration().get(firstCustomPeerConfigKey));
assertEquals(firstCustomPeerConfigValue, replicationAdmin.getPeerConfig("2").
getConfiguration().get(firstCustomPeerConfigKey));
// Peer 1 gets new base config as part of restart.
assertEquals(secondCustomPeerConfigValue, replicationAdmin.getPeerConfig("1").
getConfiguration().get(secondCustomPeerConfigKey));
// Peer 2 retains the updated value as before restart.
assertEquals(secondCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("2").
getConfiguration().get(secondCustomPeerConfigKey));
} finally {
shutDownMiniClusters();
baseConfiguration.unset(ReplicationPeerConfig.HBASE_REPLICATION_PEER_BASE_CONFIG);
}
}
@After
public void tearDown() throws IOException {
configurations = null;