HBASE-25383: Ability to update and remove peer base config
Closes #2778 Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
15d229eb35
commit
f600856a3b
|
@ -27,7 +27,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||||
|
@ -40,12 +39,12 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
@ -246,7 +245,7 @@ public final class ReplicationPeerConfigUtil {
|
||||||
/**
|
/**
|
||||||
* @param bytes Content of a peer znode.
|
* @param bytes Content of a peer znode.
|
||||||
* @return ClusterKey parsed from the passed bytes.
|
* @return ClusterKey parsed from the passed bytes.
|
||||||
* @throws DeserializationException
|
* @throws DeserializationException deserialization exception
|
||||||
*/
|
*/
|
||||||
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
|
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
|
||||||
throws DeserializationException {
|
throws DeserializationException {
|
||||||
|
@ -390,7 +389,7 @@ public final class ReplicationPeerConfigUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param peerConfig
|
* @param peerConfig peer config of replication peer
|
||||||
* @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
|
* @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
|
||||||
* for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
|
* for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
|
||||||
* /hbase/replication/peers/PEER_ID
|
* /hbase/replication/peers/PEER_ID
|
||||||
|
@ -454,37 +453,42 @@ public final class ReplicationPeerConfigUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to add base peer configs from Configuration to ReplicationPeerConfig
|
* Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig
|
||||||
* if not present in latter.
|
|
||||||
*
|
*
|
||||||
* This merges the user supplied peer configuration
|
* This merges the user supplied peer configuration
|
||||||
* {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
|
* {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
|
||||||
* provided as property hbase.replication.peer.base.configs in hbase configuration.
|
* provided as property hbase.replication.peer.base.configs in hbase configuration.
|
||||||
* Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
|
* Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1;k3=""".
|
||||||
* of conf is retained if already present in ReplicationPeerConfig.
|
* If value is empty, it will remove the existing key-value from peer config.
|
||||||
*
|
*
|
||||||
* @param conf Configuration
|
* @param conf Configuration
|
||||||
* @return ReplicationPeerConfig containing updated configs.
|
* @return ReplicationPeerConfig containing updated configs.
|
||||||
*/
|
*/
|
||||||
public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf,
|
public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf,
|
||||||
ReplicationPeerConfig receivedPeerConfig) {
|
ReplicationPeerConfig receivedPeerConfig) {
|
||||||
String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
|
|
||||||
ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
|
ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
|
||||||
newBuilder(receivedPeerConfig);
|
newBuilder(receivedPeerConfig);
|
||||||
Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
|
|
||||||
|
|
||||||
|
Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
|
||||||
|
String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
|
||||||
if (basePeerConfigs.length() != 0) {
|
if (basePeerConfigs.length() != 0) {
|
||||||
Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
|
Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
|
||||||
.withKeyValueSeparator("=").split(basePeerConfigs);
|
.withKeyValueSeparator("=").split(basePeerConfigs);
|
||||||
for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
|
for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) {
|
||||||
String configName = entry.getKey();
|
String configName = entry.getKey();
|
||||||
String configValue = entry.getValue();
|
String configValue = entry.getValue();
|
||||||
// Only override if base config does not exist in existing peer configs
|
// If the config is provided with empty value, for eg. k1="",
|
||||||
if (!receivedPeerConfigMap.containsKey(configName)) {
|
// we remove it from peer config. Providing config with empty value
|
||||||
|
// is required so that it doesn't remove any other config unknowingly.
|
||||||
|
if (Strings.isNullOrEmpty(configValue)) {
|
||||||
|
copiedPeerConfigBuilder.removeConfiguration(configName);
|
||||||
|
} else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) {
|
||||||
|
// update the configuration if exact config and value doesn't exists
|
||||||
copiedPeerConfigBuilder.putConfiguration(configName, configValue);
|
copiedPeerConfigBuilder.putConfiguration(configName, configValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return copiedPeerConfigBuilder.build();
|
return copiedPeerConfigBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -294,6 +294,12 @@ public class ReplicationPeerConfig {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplicationPeerConfigBuilder removeConfiguration(String key) {
|
||||||
|
this.configuration.remove(key);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) {
|
public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) {
|
||||||
this.peerData.put(key, value);
|
this.peerData.put(key, value);
|
||||||
|
|
|
@ -52,6 +52,15 @@ public interface ReplicationPeerConfigBuilder {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
ReplicationPeerConfigBuilder putConfiguration(String key, String value);
|
ReplicationPeerConfigBuilder putConfiguration(String key, String value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a "raw" configuration property for this replication peer. For experts only.
|
||||||
|
* @param key Configuration property key to ve removed
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
ReplicationPeerConfigBuilder removeConfiguration(String key);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds all of the provided "raw" configuration entries to {@code this}.
|
* Adds all of the provided "raw" configuration entries to {@code this}.
|
||||||
* @param configuration A collection of raw configuration entries
|
* @param configuration A collection of raw configuration entries
|
||||||
|
|
|
@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -35,7 +34,6 @@ import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||||
|
@ -45,6 +43,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -73,6 +72,11 @@ public class TestZKReplicationPeerStorage {
|
||||||
UTIL.shutdownMiniZKCluster();
|
UTIL.shutdownMiniZKCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanCustomConfigurations() {
|
||||||
|
UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
private Set<String> randNamespaces(Random rand) {
|
private Set<String> randNamespaces(Random rand) {
|
||||||
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
|
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
|
||||||
.collect(toSet());
|
.collect(toSet());
|
||||||
|
@ -220,8 +224,7 @@ public class TestZKReplicationPeerStorage {
|
||||||
STORAGE.getNewSyncReplicationStateNode(peerId)));
|
STORAGE.getNewSyncReplicationStateNode(peerId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test public void testBaseReplicationPeerConfig() throws ReplicationException{
|
||||||
public void testBaseReplicationPeerConfig() {
|
|
||||||
String customPeerConfigKey = "hbase.xxx.custom_config";
|
String customPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
String customPeerConfigValue = "test";
|
String customPeerConfigValue = "test";
|
||||||
String customPeerConfigUpdatedValue = "testUpdated";
|
String customPeerConfigUpdatedValue = "testUpdated";
|
||||||
|
@ -241,7 +244,7 @@ public class TestZKReplicationPeerStorage {
|
||||||
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
|
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
|
||||||
|
|
||||||
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
|
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
|
||||||
addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
|
updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
|
||||||
|
|
||||||
// validates base configs are present in replicationPeerConfig
|
// validates base configs are present in replicationPeerConfig
|
||||||
assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
|
assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
|
||||||
|
@ -249,17 +252,63 @@ public class TestZKReplicationPeerStorage {
|
||||||
assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
|
assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
|
||||||
get(customPeerConfigSecondKey));
|
get(customPeerConfigSecondKey));
|
||||||
|
|
||||||
// validates base configs does not override value if config already present
|
// validates base configs get updated values even if config already present
|
||||||
|
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
|
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
|
||||||
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
|
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
|
||||||
|
|
||||||
ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
|
ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
|
||||||
addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig);
|
updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
|
||||||
|
|
||||||
assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate.
|
assertEquals(customPeerConfigUpdatedValue, replicationPeerConfigAfterValueUpdate.
|
||||||
getConfiguration().get(customPeerConfigKey));
|
getConfiguration().get(customPeerConfigKey));
|
||||||
assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate.
|
assertEquals(customPeerConfigSecondUpdatedValue, replicationPeerConfigAfterValueUpdate.
|
||||||
getConfiguration().get(customPeerConfigSecondKey));
|
getConfiguration().get(customPeerConfigSecondKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
|
||||||
|
String customPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
String customPeerConfigValue = "test";
|
||||||
|
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
|
||||||
|
|
||||||
|
// custom config not present
|
||||||
|
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
|
||||||
|
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
customPeerConfigKey.concat("=").concat(customPeerConfigValue));
|
||||||
|
|
||||||
|
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
|
||||||
|
updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
|
||||||
|
|
||||||
|
// validates base configs are present in replicationPeerConfig
|
||||||
|
assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
|
||||||
|
get(customPeerConfigKey));
|
||||||
|
|
||||||
|
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
|
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
customPeerConfigKey.concat("=").concat(""));
|
||||||
|
|
||||||
|
ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil.
|
||||||
|
updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
|
||||||
|
|
||||||
|
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
|
||||||
|
throws ReplicationException {
|
||||||
|
String customPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
|
||||||
|
|
||||||
|
// custom config not present
|
||||||
|
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
customPeerConfigKey.concat("=").concat(""));
|
||||||
|
|
||||||
|
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
|
||||||
|
updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
|
||||||
|
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,7 +233,7 @@ public class ReplicationPeerManager {
|
||||||
// this should be a retry, just return
|
// this should be a retry, just return
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
|
peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
|
||||||
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
|
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
|
||||||
SyncReplicationState syncReplicationState =
|
SyncReplicationState syncReplicationState =
|
||||||
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
|
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
|
||||||
|
@ -547,7 +547,7 @@ public class ReplicationPeerManager {
|
||||||
for (String peerId : peerStorage.listPeerIds()) {
|
for (String peerId : peerStorage.listPeerIds()) {
|
||||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||||
|
|
||||||
peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
|
peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
|
||||||
peerStorage.updatePeerConfig(peerId, peerConfig);
|
peerStorage.updatePeerConfig(peerId, peerConfig);
|
||||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||||
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
|
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
|
||||||
|
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -29,7 +28,6 @@ import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -200,8 +198,8 @@ public class TestMasterReplication {
|
||||||
// Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
|
// Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
|
||||||
// to cluster '1'.
|
// to cluster '1'.
|
||||||
byte[][][] hfileRanges =
|
byte[][][] hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
|
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
|
||||||
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
|
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
|
||||||
int numOfRows = 100;
|
int numOfRows = 100;
|
||||||
int[] expectedCounts =
|
int[] expectedCounts =
|
||||||
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
|
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
|
||||||
|
@ -212,10 +210,10 @@ public class TestMasterReplication {
|
||||||
// Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
|
// Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
|
||||||
// to cluster '0'.
|
// to cluster '0'.
|
||||||
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
|
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
|
||||||
new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
|
new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
|
||||||
numOfRows = 200;
|
numOfRows = 200;
|
||||||
int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
|
int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
|
||||||
hfileRanges.length * numOfRows + expectedCounts[1] };
|
hfileRanges.length * numOfRows + expectedCounts[1] };
|
||||||
|
|
||||||
loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
|
loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
|
||||||
famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
|
famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
|
||||||
|
@ -314,12 +312,12 @@ public class TestMasterReplication {
|
||||||
// Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
|
// Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
|
||||||
// to cluster '1'.
|
// to cluster '1'.
|
||||||
byte[][][] hfileRanges =
|
byte[][][] hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
|
new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
|
||||||
new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
|
new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
|
||||||
int numOfRows = 100;
|
int numOfRows = 100;
|
||||||
|
|
||||||
int[] expectedCounts =
|
int[] expectedCounts =
|
||||||
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
|
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
|
||||||
|
|
||||||
loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
|
loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
|
||||||
famName, htables, hfileRanges, numOfRows, expectedCounts, true);
|
famName, htables, hfileRanges, numOfRows, expectedCounts, true);
|
||||||
|
@ -335,11 +333,11 @@ public class TestMasterReplication {
|
||||||
// Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
|
// Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
|
||||||
// to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
|
// to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
|
||||||
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
|
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
|
||||||
new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
|
new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
|
||||||
numOfRows = 200;
|
numOfRows = 200;
|
||||||
|
|
||||||
int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
|
int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
|
||||||
hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
|
hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
|
||||||
|
|
||||||
loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
|
loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
|
||||||
famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
|
famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
|
||||||
|
@ -370,8 +368,8 @@ public class TestMasterReplication {
|
||||||
|
|
||||||
// Load 100 rows for each hfile range in cluster '0' for table CF 'f'
|
// Load 100 rows for each hfile range in cluster '0' for table CF 'f'
|
||||||
byte[][][] hfileRanges =
|
byte[][][] hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
|
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
|
||||||
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
|
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
|
||||||
int numOfRows = 100;
|
int numOfRows = 100;
|
||||||
int[] expectedCounts =
|
int[] expectedCounts =
|
||||||
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
|
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
|
||||||
|
@ -381,11 +379,11 @@ public class TestMasterReplication {
|
||||||
|
|
||||||
// Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
|
// Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
|
||||||
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
|
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
|
||||||
new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
|
new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
|
||||||
numOfRows = 100;
|
numOfRows = 100;
|
||||||
|
|
||||||
int[] newExpectedCounts =
|
int[] newExpectedCounts =
|
||||||
new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
|
new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
|
||||||
|
|
||||||
loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
|
loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
|
||||||
hfileRanges, numOfRows, newExpectedCounts, false);
|
hfileRanges, numOfRows, newExpectedCounts, false);
|
||||||
|
@ -449,7 +447,7 @@ public class TestMasterReplication {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBasePeerConfigsForPeerMutations()
|
public void testBasePeerConfigsForReplicationPeer()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
LOG.info("testBasePeerConfigsForPeerMutations");
|
LOG.info("testBasePeerConfigsForPeerMutations");
|
||||||
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
|
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
@ -502,18 +500,15 @@ public class TestMasterReplication {
|
||||||
utilities[0].restartHBaseCluster(1);
|
utilities[0].restartHBaseCluster(1);
|
||||||
admin = utilities[0].getAdmin();
|
admin = utilities[0].getAdmin();
|
||||||
|
|
||||||
// Both retains the value of base configuration 1 value as before restart.
|
// Configurations should be updated after restart again
|
||||||
// Peer 1 (Update value), Peer 2 (Base Value)
|
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
|
||||||
Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
|
|
||||||
getConfiguration().get(firstCustomPeerConfigKey));
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
|
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
|
||||||
getConfiguration().get(firstCustomPeerConfigKey));
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
|
||||||
// Peer 1 gets new base config as part of restart.
|
|
||||||
Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
|
Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
|
||||||
getConfiguration().get(secondCustomPeerConfigKey));
|
getConfiguration().get(secondCustomPeerConfigKey));
|
||||||
// Peer 2 retains the updated value as before restart.
|
Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
|
||||||
Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
|
|
||||||
getConfiguration().get(secondCustomPeerConfigKey));
|
getConfiguration().get(secondCustomPeerConfigKey));
|
||||||
} finally {
|
} finally {
|
||||||
shutDownMiniClusters();
|
shutDownMiniClusters();
|
||||||
|
@ -521,6 +516,64 @@ public class TestMasterReplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasePeerConfigsRemovalForReplicationPeer()
|
||||||
|
throws Exception {
|
||||||
|
LOG.info("testBasePeerConfigsForPeerMutations");
|
||||||
|
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
String firstCustomPeerConfigValue = "test";
|
||||||
|
|
||||||
|
try {
|
||||||
|
baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
|
||||||
|
startMiniClusters(2);
|
||||||
|
addPeer("1", 0, 1);
|
||||||
|
Admin admin = utilities[0].getAdmin();
|
||||||
|
|
||||||
|
// Validates base configs 1 is present for both peer.
|
||||||
|
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
|
||||||
|
utilities[0].getConfiguration().unset(ReplicationPeerConfigUtil.
|
||||||
|
HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
|
utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
|
||||||
|
HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").concat(""));
|
||||||
|
|
||||||
|
|
||||||
|
utilities[0].shutdownMiniHBaseCluster();
|
||||||
|
utilities[0].restartHBaseCluster(1);
|
||||||
|
admin = utilities[0].getAdmin();
|
||||||
|
|
||||||
|
// Configurations should be removed after restart again
|
||||||
|
Assert.assertNull(admin.getReplicationPeerConfig("1")
|
||||||
|
.getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
} finally {
|
||||||
|
shutDownMiniClusters();
|
||||||
|
baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveBasePeerConfigWithoutExistingConfigForReplicationPeer()
|
||||||
|
throws Exception {
|
||||||
|
LOG.info("testBasePeerConfigsForPeerMutations");
|
||||||
|
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
|
||||||
|
|
||||||
|
try {
|
||||||
|
baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
|
||||||
|
firstCustomPeerConfigKey.concat("=").concat(""));
|
||||||
|
startMiniClusters(2);
|
||||||
|
addPeer("1", 0, 1);
|
||||||
|
Admin admin = utilities[0].getAdmin();
|
||||||
|
|
||||||
|
Assert.assertNull("Config should not be there", admin.getReplicationPeerConfig("1").
|
||||||
|
getConfiguration().get(firstCustomPeerConfigKey));
|
||||||
|
} finally {
|
||||||
|
shutDownMiniClusters();
|
||||||
|
baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws IOException {
|
public void tearDown() throws IOException {
|
||||||
configurations = null;
|
configurations = null;
|
||||||
|
@ -743,11 +796,11 @@ public class TestMasterReplication {
|
||||||
|
|
||||||
// listen for successful log rolls
|
// listen for successful log rolls
|
||||||
final WALActionsListener listener = new WALActionsListener() {
|
final WALActionsListener listener = new WALActionsListener() {
|
||||||
@Override
|
@Override
|
||||||
public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
|
public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
region.getWAL().registerWALActionsListener(listener);
|
region.getWAL().registerWALActionsListener(listener);
|
||||||
|
|
||||||
// request a roll
|
// request a roll
|
||||||
|
|
Loading…
Reference in New Issue