diff --git a/bin/hbase b/bin/hbase index 31547b1ab51..e786571d623 100755 --- a/bin/hbase +++ b/bin/hbase @@ -109,6 +109,7 @@ show_usage() { echo " pre-upgrade Run Pre-Upgrade validator tool" echo " hbtop Run HBTop tool" echo " credential Run the Hadoop Credential Shell" + echo " copyreppeers Run CopyReplicationPeers tool" echo " CLASSNAME Run the class named CLASSNAME" } @@ -769,6 +770,8 @@ elif [ "$COMMAND" = "hbtop" ] ; then HBASE_OPTS="${HBASE_OPTS} ${HBASE_HBTOP_OPTS}" elif [ "$COMMAND" = "credential" ] ; then CLASS='org.apache.hadoop.security.alias.CredentialShell' +elif [ "$COMMAND" = "copyreppeers" ] ; then + CLASS='org.apache.hadoop.hbase.replication.ReplicationPeerMigrationTool' else CLASS=$COMMAND if [[ "$CLASS" =~ .*IntegrationTest.* ]] ; then diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java new file mode 100644 index 00000000000..634f4626da5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.hadoop.hbase.TableName; + +/** + * A helper tool for generating random {@link ReplicationPeerConfig} and do assertion. + */ +public final class ReplicationPeerConfigTestUtil { + + // Seed may be set with Random#setSeed + private static final Random RNG = new Random(); + + private ReplicationPeerConfigTestUtil() { + } + + private static Set randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private static Map> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + public static ReplicationPeerConfig getConfig(int seed) { + RNG.setSeed(seed); + return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) + .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) + .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) + .setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG)) + .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); + } + + private static void assertSetEquals(Set expected, Set actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach(s -> assertTrue(actual.contains(s))); + } + + private static void assertMapEquals(Map> expected, + Map> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach((expectedTn, expectedCFs) -> { + List actualCFs = actual.get(expectedTn); + if (expectedCFs == null || expectedCFs.size() == 0) { + assertTrue(actual.containsKey(expectedTn)); + assertTrue(actualCFs == null || actualCFs.size() == 0); + } else { + assertNotNull(actualCFs); + assertEquals(expectedCFs.size(), actualCFs.size()); + for (Iterator expectedIt = expectedCFs.iterator(), + actualIt = actualCFs.iterator(); expectedIt.hasNext();) { + assertEquals(expectedIt.next(), actualIt.next()); + } + } + }); + } + + public static void assertConfigEquals(ReplicationPeerConfig expected, + ReplicationPeerConfig actual) { + assertEquals(expected.getClusterKey(), actual.getClusterKey()); + assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); + assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); + assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); + assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); + assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); + assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); + assertEquals(expected.getBandwidth(), actual.getBandwidth()); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java index 30d6c1d41d9..bdac27067b0 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java @@ -17,19 +17,14 @@ */ package org.apache.hadoop.hbase.replication; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -260,34 +255,6 @@ public class TestReplicationPeerConfig { assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2)); } - private static final Random RNG = new Random(); // Seed may be set with Random#setSeed - - private Set randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - RNG.setSeed(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) - .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) - .setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG)) - .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); - } - @Test public void testBaseReplicationPeerConfig() throws ReplicationException { String customPeerConfigKey = "hbase.xxx.custom_config"; diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index 4b1a5e61093..3c0d2174931 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -74,6 +74,12 @@ test-jar test + + org.apache.hbase + hbase-client + test-jar + test + org.apache.hbase hbase-zookeeper diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/CopyReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/CopyReplicationPeers.java new file mode 100644 index 00000000000..13276ab492c --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/CopyReplicationPeers.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A tool for copying replication peer data across different replication peer storages. + *

+ * Notice that we will not delete the replication peer data from the source storage, as this tool + * can also be used by online migration. See HBASE-27110 for the whole design. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class CopyReplicationPeers extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class); + + public static final String NAME = "copyreppeers"; + + public CopyReplicationPeers(Configuration conf) { + super(conf); + } + + private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) { + Configuration conf = new Configuration(getConf()); + conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type); + return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); + } + + private ZKWatcher createZKWatcher() throws IOException { + return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() { + + private volatile boolean aborted; + + @Override + public boolean isAborted() { + return aborted; + } + + @Override + public void abort(String why, Throwable e) { + aborted = true; + LOG.error(why, e); + System.exit(1); + } + }); + } + + private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst) + throws ReplicationException { + LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(), + dst.getClass().getSimpleName()); + for (String peerId : src.listPeerIds()) { + LOG.info("Going to migrate {}", peerId); + ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId); + boolean enabled = src.isPeerEnabled(peerId); + dst.addPeer(peerId, peerConfig, enabled); + LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}", peerId, peerConfig, enabled); + } + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: bin/hbase " + NAME + + " "); + System.err.println("The possible values for replication storage type:"); + for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) { + System.err.println(" " + type.name().toLowerCase()); + } + return -1; + } + FileSystem fs = FileSystem.get(getConf()); + try (ZKWatcher zk = createZKWatcher()) { + ReplicationPeerStorage src = create(args[0], fs, zk); + ReplicationPeerStorage dst = create(args[1], fs, zk); + migrate(src, dst); + } + return 0; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args); + System.exit(ret); + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java index bd31b2958c4..e8191d66344 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java @@ -17,101 +17,20 @@ */ package org.apache.hadoop.hbase.replication; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.assertConfigEquals; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; -import org.apache.hadoop.hbase.TableName; import org.junit.Test; public abstract class ReplicationPeerStorageTestBase { - // Seed may be set with Random#setSeed - private static final Random RNG = new Random(); - protected static ReplicationPeerStorage STORAGE; - private Set randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - RNG.setSeed(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) - .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) - .setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG)) - .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); - } - - private void assertSetEquals(Set expected, Set actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach(s -> assertTrue(actual.contains(s))); - } - - private void assertMapEquals(Map> expected, - Map> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach((expectedTn, expectedCFs) -> { - List actualCFs = actual.get(expectedTn); - if (expectedCFs == null || expectedCFs.size() == 0) { - assertTrue(actual.containsKey(expectedTn)); - assertTrue(actualCFs == null || actualCFs.size() == 0); - } else { - assertNotNull(actualCFs); - assertEquals(expectedCFs.size(), actualCFs.size()); - for (Iterator expectedIt = expectedCFs.iterator(), - actualIt = actualCFs.iterator(); expectedIt.hasNext();) { - assertEquals(expectedIt.next(), actualIt.next()); - } - } - }); - } - - private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { - assertEquals(expected.getClusterKey(), actual.getClusterKey()); - assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); - assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); - assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); - assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); - assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); - assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); - assertEquals(expected.getBandwidth(), actual.getBandwidth()); - } - @Test public void test() throws ReplicationException { int peerCount = 10; diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestCopyReplicationPeers.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestCopyReplicationPeers.java new file mode 100644 index 00000000000..fdde24852fa --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestCopyReplicationPeers.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.assertConfigEquals; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig; +import static org.junit.Assert.assertEquals; + +import java.util.List; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestCopyReplicationPeers { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCopyReplicationPeers.class); + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static FileSystem FS; + + private static Path DIR; + + private static ReplicationPeerStorage SRC; + + private static ReplicationPeerStorage DST; + + @BeforeClass + public static void setUp() throws Exception { + DIR = UTIL.getDataTestDir("test_peer_migration"); + CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR); + FS = FileSystem.get(UTIL.getConfiguration()); + UTIL.startMiniZKCluster(); + SRC = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + DST = new FSReplicationPeerStorage(FS, UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniZKCluster(); + UTIL.cleanupTestDir(); + } + + @Test + public void testMigrate() throws Exception { + // invalid args + assertEquals(-1, + ToolRunner.run(new CopyReplicationPeers(UTIL.getConfiguration()), new String[] {})); + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + SRC.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + } + // migrate + assertEquals(0, ToolRunner.run(new CopyReplicationPeers(UTIL.getConfiguration()), + new String[] { SRC.getClass().getName(), DST.getClass().getName() })); + // verify the replication peer data in dst storage + List peerIds = DST.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), DST.getPeerConfig(peerId)); + } + } +}