HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
aa3e9dedf0
commit
7df1b92528
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -59,6 +60,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
public final class ReplicationPeerConfigUtil {
|
public final class ReplicationPeerConfigUtil {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
|
||||||
|
public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
|
||||||
|
"hbase.replication.peer.base.config";
|
||||||
|
|
||||||
private ReplicationPeerConfigUtil() {}
|
private ReplicationPeerConfigUtil() {}
|
||||||
|
|
||||||
|
@ -424,6 +427,41 @@ public final class ReplicationPeerConfigUtil {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to add base peer configs from Configuration to ReplicationPeerConfig
|
||||||
|
* if not present in latter.
|
||||||
|
*
|
||||||
|
* This merges the user supplied peer configuration
|
||||||
|
* {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
|
||||||
|
* provided as property hbase.replication.peer.base.configs in hbase configuration.
|
||||||
|
* Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
|
||||||
|
* of conf is retained if already present in ReplicationPeerConfig.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return ReplicationPeerConfig containing updated configs.
|
||||||
|
*/
|
||||||
|
public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf,
|
||||||
|
ReplicationPeerConfig receivedPeerConfig) {
|
||||||
|
String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
|
||||||
|
ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
|
||||||
|
newBuilder(receivedPeerConfig);
|
||||||
|
Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
|
||||||
|
|
||||||
|
if (basePeerConfigs.length() != 0) {
|
||||||
|
Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
|
||||||
|
.withKeyValueSeparator("=").split(basePeerConfigs);
|
||||||
|
for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
|
||||||
|
String configName = entry.getKey();
|
||||||
|
String configValue = entry.getValue();
|
||||||
|
// Only override if base config does not exist in existing peer configs
|
||||||
|
if (!receivedPeerConfigMap.containsKey(configName)) {
|
||||||
|
copiedPeerConfigBuilder.putConfiguration(configName, configValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return copiedPeerConfigBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
|
public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
|
||||||
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
|
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
|
||||||
throws ReplicationException {
|
throws ReplicationException {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toSet;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -33,9 +34,12 @@ import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -175,4 +179,47 @@ public class TestZKReplicationPeerStorage {
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBaseReplicationPeerConfig() {
|
||||||
|
String customPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
String customPeerConfigValue = "test";
|
||||||
|
String customPeerConfigUpdatedValue = "testUpdated";
|
||||||
|
|
||||||
|
String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
|
||||||
|
String customPeerConfigSecondValue = "testSecond";
|
||||||
|
String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
|
||||||
|
|
||||||
|
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
|
||||||
|
|
||||||
|
// custom config not present
|
||||||
|
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
|
||||||
|
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";").
|
||||||
|
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
|
||||||
|
|
||||||
|
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
|
||||||
|
addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
|
||||||
|
|
||||||
|
// validates base configs are present in replicationPeerConfig
|
||||||
|
assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
|
||||||
|
get(customPeerConfigKey));
|
||||||
|
assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
|
||||||
|
get(customPeerConfigSecondKey));
|
||||||
|
|
||||||
|
// validates base configs does not override value if config already present
|
||||||
|
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
|
||||||
|
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
|
||||||
|
|
||||||
|
ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
|
||||||
|
addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig);
|
||||||
|
|
||||||
|
assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate.
|
||||||
|
getConfiguration().get(customPeerConfigKey));
|
||||||
|
assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate.
|
||||||
|
getConfiguration().get(customPeerConfigSecondKey));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
@ -169,6 +170,7 @@ public class ReplicationPeerManager {
|
||||||
// this should be a retry, just return
|
// this should be a retry, just return
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
|
||||||
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
|
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
|
||||||
peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
|
peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
|
||||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
|
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
|
||||||
|
@ -402,6 +404,9 @@ public class ReplicationPeerManager {
|
||||||
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
|
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
|
||||||
for (String peerId : peerStorage.listPeerIds()) {
|
for (String peerId : peerStorage.listPeerIds()) {
|
||||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||||
|
|
||||||
|
peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
|
||||||
|
peerStorage.updatePeerConfig(peerId, peerConfig);
|
||||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
|
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -71,6 +72,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -441,6 +443,84 @@ public class TestMasterReplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that base replication peer configs are applied on peer creation
|
||||||
|
* and the configs are overriden if updated as part of updateReplicationPeerConfig()
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBasePeerConfigsForPeerMutations()
|
||||||
|
throws Exception {
|
||||||
|
LOG.info("testBasePeerConfigsForPeerMutations");
|
||||||
|
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
String firstCustomPeerConfigValue = "test";
|
||||||
|
String firstCustomPeerConfigUpdatedValue = "test_updated";
|
||||||
|
|
||||||
|
String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
|
||||||
|
String secondCustomPeerConfigValue = "testSecond";
|
||||||
|
String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
|
||||||
|
try {
|
||||||
|
baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
|
||||||
|
startMiniClusters(2);
|
||||||
|
addPeer("1", 0, 1);
|
||||||
|
addPeer("2", 0, 1);
|
||||||
|
Admin admin = utilities[0].getAdmin();
|
||||||
|
|
||||||
|
// Validates base configs 1 is present for both peer.
|
||||||
|
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
|
||||||
|
// override value of configuration 1 for peer "1".
|
||||||
|
ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig.
|
||||||
|
newBuilder(admin.getReplicationPeerConfig("1")).
|
||||||
|
putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
|
||||||
|
|
||||||
|
// add configuration 2 for peer "2".
|
||||||
|
ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig.
|
||||||
|
newBuilder(admin.getReplicationPeerConfig("2")).
|
||||||
|
putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
|
||||||
|
|
||||||
|
admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
|
||||||
|
admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
|
||||||
|
|
||||||
|
// validates configuration is overridden by updateReplicationPeerConfig
|
||||||
|
Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
|
||||||
|
getConfiguration().get(secondCustomPeerConfigKey));
|
||||||
|
|
||||||
|
// Add second config to base config and perform restart.
|
||||||
|
utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
|
||||||
|
HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
|
||||||
|
concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
|
||||||
|
.concat("=").concat(secondCustomPeerConfigValue));
|
||||||
|
|
||||||
|
utilities[0].shutdownMiniHBaseCluster();
|
||||||
|
utilities[0].restartHBaseCluster(1);
|
||||||
|
admin = utilities[0].getAdmin();
|
||||||
|
|
||||||
|
// Both retains the value of base configuration 1 value as before restart.
|
||||||
|
// Peer 1 (Update value), Peer 2 (Base Value)
|
||||||
|
Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
|
||||||
|
// Peer 1 gets new base config as part of restart.
|
||||||
|
Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
|
||||||
|
getConfiguration().get(secondCustomPeerConfigKey));
|
||||||
|
// Peer 2 retains the updated value as before restart.
|
||||||
|
Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
|
||||||
|
getConfiguration().get(secondCustomPeerConfigKey));
|
||||||
|
} finally {
|
||||||
|
shutDownMiniClusters();
|
||||||
|
baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws IOException {
|
public void tearDown() throws IOException {
|
||||||
configurations = null;
|
configurations = null;
|
||||||
|
|
Loading…
Reference in New Issue