HBASE-27728 Implement a tool to migrate replication peer data between different storage implementation (#5179)

Signed-off-by: Liangjun He <heliangjun@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
(cherry picked from commit 31c4aea48c)

Conflicts:
	hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
	hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
This commit is contained in:
Duo Zhang 2023-04-17 22:13:19 +08:00
parent 1ae057b89f
commit 04a4ac5ae9
7 changed files with 333 additions and 117 deletions

View File

@ -109,6 +109,7 @@ show_usage() {
echo " pre-upgrade Run Pre-Upgrade validator tool"
echo " hbtop Run HBTop tool"
echo " credential Run the Hadoop Credential Shell"
echo " copyreppeers Run CopyReplicationPeers tool"
echo " CLASSNAME Run the class named CLASSNAME"
}
@ -769,6 +770,8 @@ elif [ "$COMMAND" = "hbtop" ] ; then
HBASE_OPTS="${HBASE_OPTS} ${HBASE_HBTOP_OPTS}"
elif [ "$COMMAND" = "credential" ] ; then
CLASS='org.apache.hadoop.security.alias.CredentialShell'
elif [ "$COMMAND" = "copyreppeers" ] ; then
CLASS='org.apache.hadoop.hbase.replication.ReplicationPeerMigrationTool'
else
CLASS=$COMMAND
if [[ "$CLASS" =~ .*IntegrationTest.* ]] ; then

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;
/**
* A helper tool for generating random {@link ReplicationPeerConfig} and do assertion.
*/
public final class ReplicationPeerConfigTestUtil {
// Seed may be set with Random#setSeed
private static final Random RNG = new Random();
private ReplicationPeerConfigTestUtil() {
}
private static Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private static Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
public static ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG))
.setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG))
.setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build();
}
private static void assertSetEquals(Set<String> expected, Set<String> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach(s -> assertTrue(actual.contains(s)));
}
private static void assertMapEquals(Map<TableName, List<String>> expected,
Map<TableName, List<String>> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach((expectedTn, expectedCFs) -> {
List<String> actualCFs = actual.get(expectedTn);
if (expectedCFs == null || expectedCFs.size() == 0) {
assertTrue(actual.containsKey(expectedTn));
assertTrue(actualCFs == null || actualCFs.size() == 0);
} else {
assertNotNull(actualCFs);
assertEquals(expectedCFs.size(), actualCFs.size());
for (Iterator<String> expectedIt = expectedCFs.iterator(),
actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
assertEquals(expectedIt.next(), actualIt.next());
}
}
});
}
public static void assertConfigEquals(ReplicationPeerConfig expected,
ReplicationPeerConfig actual) {
assertEquals(expected.getClusterKey(), actual.getClusterKey());
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
}
}

View File

@ -17,19 +17,14 @@
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -260,34 +255,6 @@ public class TestReplicationPeerConfig {
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}
private static final Random RNG = new Random(); // Seed may be set with Random#setSeed
private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
private ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG))
.setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG))
.setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build();
}
@Test
public void testBaseReplicationPeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";

View File

@ -74,6 +74,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A tool for copying replication peer data across different replication peer storages.
* <p/>
* Notice that we will not delete the replication peer data from the source storage, as this tool
* can also be used by online migration. See HBASE-27110 for the whole design.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class CopyReplicationPeers extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class);
public static final String NAME = "copyreppeers";
public CopyReplicationPeers(Configuration conf) {
super(conf);
}
private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) {
Configuration conf = new Configuration(getConf());
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type);
return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
}
private ZKWatcher createZKWatcher() throws IOException {
return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() {
private volatile boolean aborted;
@Override
public boolean isAborted() {
return aborted;
}
@Override
public void abort(String why, Throwable e) {
aborted = true;
LOG.error(why, e);
System.exit(1);
}
});
}
private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst)
throws ReplicationException {
LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(),
dst.getClass().getSimpleName());
for (String peerId : src.listPeerIds()) {
LOG.info("Going to migrate {}", peerId);
ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId);
boolean enabled = src.isPeerEnabled(peerId);
dst.addPeer(peerId, peerConfig, enabled);
LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}", peerId, peerConfig, enabled);
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: bin/hbase " + NAME
+ " <SRC_REPLICATION_PEER_STORAGE> <DST_REPLICATION_PEER_STORAGE>");
System.err.println("The possible values for replication storage type:");
for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) {
System.err.println(" " + type.name().toLowerCase());
}
return -1;
}
FileSystem fs = FileSystem.get(getConf());
try (ZKWatcher zk = createZKWatcher()) {
ReplicationPeerStorage src = create(args[0], fs, zk);
ReplicationPeerStorage dst = create(args[1], fs, zk);
migrate(src, dst);
}
return 0;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args);
System.exit(ret);
}
}

View File

@ -17,101 +17,20 @@
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.assertConfigEquals;
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;
import org.junit.Test;
public abstract class ReplicationPeerStorageTestBase {
// Seed may be set with Random#setSeed
private static final Random RNG = new Random();
protected static ReplicationPeerStorage STORAGE;
private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
private ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG))
.setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG))
.setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build();
}
private void assertSetEquals(Set<String> expected, Set<String> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach(s -> assertTrue(actual.contains(s)));
}
private void assertMapEquals(Map<TableName, List<String>> expected,
Map<TableName, List<String>> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach((expectedTn, expectedCFs) -> {
List<String> actualCFs = actual.get(expectedTn);
if (expectedCFs == null || expectedCFs.size() == 0) {
assertTrue(actual.containsKey(expectedTn));
assertTrue(actualCFs == null || actualCFs.size() == 0);
} else {
assertNotNull(actualCFs);
assertEquals(expectedCFs.size(), actualCFs.size());
for (Iterator<String> expectedIt = expectedCFs.iterator(),
actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
assertEquals(expectedIt.next(), actualIt.next());
}
}
});
}
private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
assertEquals(expected.getClusterKey(), actual.getClusterKey());
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
}
@Test
public void test() throws ReplicationException {
int peerCount = 10;

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.assertConfigEquals;
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig;
import static org.junit.Assert.assertEquals;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestCopyReplicationPeers {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCopyReplicationPeers.class);
private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
private static FileSystem FS;
private static Path DIR;
private static ReplicationPeerStorage SRC;
private static ReplicationPeerStorage DST;
@BeforeClass
public static void setUp() throws Exception {
DIR = UTIL.getDataTestDir("test_peer_migration");
CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR);
FS = FileSystem.get(UTIL.getConfiguration());
UTIL.startMiniZKCluster();
SRC = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
DST = new FSReplicationPeerStorage(FS, UTIL.getConfiguration());
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniZKCluster();
UTIL.cleanupTestDir();
}
@Test
public void testMigrate() throws Exception {
// invalid args
assertEquals(-1,
ToolRunner.run(new CopyReplicationPeers(UTIL.getConfiguration()), new String[] {}));
int peerCount = 10;
for (int i = 0; i < peerCount; i++) {
SRC.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
}
// migrate
assertEquals(0, ToolRunner.run(new CopyReplicationPeers(UTIL.getConfiguration()),
new String[] { SRC.getClass().getName(), DST.getClass().getName() }));
// verify the replication peer data in dst storage
List<String> peerIds = DST.listPeerIds();
assertEquals(peerCount, peerIds.size());
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed), DST.getPeerConfig(peerId));
}
}
}