From ed30909d27fd89bbc90fc83eadbb1e39db735eba Mon Sep 17 00:00:00 2001 From: Jan Hentschel Date: Sun, 16 Jun 2019 16:20:04 +0200 Subject: [PATCH] HBASE-19303 Removed ReplicationAdmin and all its usages Signed-off-by: stack --- .../client/replication/ReplicationAdmin.java | 396 ------ .../test/IntegrationTestReplication.java | 3 +- .../hbase/client/TestReplicaWithCluster.java | 5 +- .../replication/TestReplicationAdmin.java | 1185 ----------------- .../TestReplicationAdminWithClusters.java | 336 ----- ...cationAdminWithTwoDifferentZKClusters.java | 108 -- .../TestMultiSlaveReplication.java | 8 +- .../TestPerTableCFReplication.java | 27 +- .../replication/TestReplicationBase.java | 4 - .../TestReplicationDisableInactivePeer.java | 4 +- .../replication/TestReplicationEndpoint.java | 31 +- .../TestReplicationSmallTests.java | 5 +- .../replication/TestReplicationWithTags.java | 7 +- .../TestGlobalReplicationThrottler.java | 13 +- .../TestRegionReplicaReplicationEndpoint.java | 28 +- .../regionserver/TestReplicator.java | 14 +- 16 files changed, 71 insertions(+), 2103 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java deleted file mode 100644 index 722dc2aadb6..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ /dev/null @@ -1,396 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client.replication; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.regex.Pattern; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - -/** - *

- * This class provides the administrative interface to HBase cluster - * replication. - *

- *

- * Adding a new peer results in creating new outbound connections from every - * region server to a subset of region servers on the slave cluster. Each - * new stream of replication will start replicating from the beginning of the - * current WAL, meaning that edits from that past will be replicated. - *

- *

- * Removing a peer is a destructive and irreversible operation that stops - * all the replication streams for the given cluster and deletes the metadata - * used to keep track of the replication state. - *

- *

- * To see which commands are available in the shell, type - * replication. - *

- * - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin} instead. - */ -@InterfaceAudience.Public -@Deprecated -public class ReplicationAdmin implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationAdmin.class); - - public static final String TNAME = "tableName"; - public static final String CFNAME = "columnFamilyName"; - - // only Global for now, can add other type - // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. - public static final String REPLICATIONTYPE = "replicationType"; - public static final String REPLICATIONGLOBAL = Integer - .toString(HConstants.REPLICATION_SCOPE_GLOBAL); - - private final Connection connection; - private Admin admin; - - /** - * Constructor that creates a connection to the local ZooKeeper ensemble. - * @param conf Configuration to use - * @throws IOException if an internal replication error occurs - * @throws RuntimeException if replication isn't enabled. - */ - public ReplicationAdmin(Configuration conf) throws IOException { - this.connection = ConnectionFactory.createConnection(conf); - admin = connection.getAdmin(); - } - - /** - * Add a new remote slave cluster for replication. - * @param id a short name that identifies the cluster - * @param peerConfig configuration for the replication slave cluster - * @param tableCfs the table and column-family list which will be replicated for this peer. - * A map from tableName to column family names. An empty collection can be passed - * to indicate replicating all column families. Pass null for replicating all table and column - * families - * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, - * use {@link #addPeer(String, ReplicationPeerConfig)} instead. - */ - @Deprecated - public void addPeer(String id, ReplicationPeerConfig peerConfig, - Map> tableCfs) throws ReplicationException, - IOException { - if (tableCfs != null) { - peerConfig.setTableCFsMap(tableCfs); - } - this.admin.addReplicationPeer(id, peerConfig); - } - - /** - * Add a new remote slave cluster for replication. - * @param id a short name that identifies the cluster - * @param peerConfig configuration for the replication slave cluster - * @deprecated use - * {@link org.apache.hadoop.hbase.client.Admin#addReplicationPeer(String, ReplicationPeerConfig)} - * instead - */ - @Deprecated - public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException, - IOException { - this.admin.addReplicationPeer(id, peerConfig); - } - - /** - * @deprecated as release of 2.0.0, and it will be removed in 3.0.0 - * */ - @Deprecated - public static Map> parseTableCFsFromConfig(String tableCFsConfig) { - return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig); - } - - /** - * @deprecated use - * {@link org.apache.hadoop.hbase.client.Admin#updateReplicationPeerConfig(String, ReplicationPeerConfig)} - * instead - */ - @Deprecated - public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws IOException { - this.admin.updateReplicationPeerConfig(id, peerConfig); - } - - /** - * Removes a peer cluster and stops the replication to it. - * @param id a short name that identifies the cluster - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#removeReplicationPeer(String)} instead - */ - @Deprecated - public void removePeer(String id) throws IOException { - this.admin.removeReplicationPeer(id); - } - - /** - * Restart the replication stream to the specified peer. - * @param id a short name that identifies the cluster - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableReplicationPeer(String)} - * instead - */ - @Deprecated - public void enablePeer(String id) throws IOException { - this.admin.enableReplicationPeer(id); - } - - /** - * Stop the replication stream to the specified peer. - * @param id a short name that identifies the cluster - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableReplicationPeer(String)} - * instead - */ - @Deprecated - public void disablePeer(String id) throws IOException { - this.admin.disableReplicationPeer(id); - } - - /** - * Get the number of slave clusters the local cluster has. - * @return number of slave clusters - * @throws IOException - * @deprecated - */ - @Deprecated - public int getPeersCount() throws IOException { - return this.admin.listReplicationPeers().size(); - } - - /** - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead - */ - @Deprecated - public Map listPeerConfigs() throws IOException { - List peers = this.admin.listReplicationPeers(); - Map result = new TreeMap<>(); - for (ReplicationPeerDescription peer : peers) { - result.put(peer.getPeerId(), peer.getPeerConfig()); - } - return result; - } - - /** - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#getReplicationPeerConfig(String)} - * instead - */ - @Deprecated - public ReplicationPeerConfig getPeerConfig(String id) throws IOException { - return admin.getReplicationPeerConfig(id); - } - - /** - * Get the replicable table-cf config of the specified peer. - * @param id a short name that identifies the cluster - * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, - * use {@link #getPeerConfig(String)} instead. - * */ - @Deprecated - public String getPeerTableCFs(String id) throws IOException { - ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id); - return ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap()); - } - - /** - * Append the replicable table-cf config of the specified peer - * @param id a short that identifies the cluster - * @param tableCfs table-cfs config str - * @throws ReplicationException - * @throws IOException - * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, - * use {@link #appendPeerTableCFs(String, Map)} instead. - */ - @Deprecated - public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException, - IOException { - appendPeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)); - } - - /** - * Append the replicable table-cf config of the specified peer - * @param id a short that identifies the cluster - * @param tableCfs A map from tableName to column family names - * @throws ReplicationException - * @throws IOException - */ - @Deprecated - public void appendPeerTableCFs(String id, Map> tableCfs) - throws ReplicationException, IOException { - this.admin.appendReplicationPeerTableCFs(id, copyTableCFs(tableCfs)); - } - - /** - * Remove some table-cfs from table-cfs config of the specified peer - * @param id a short name that identifies the cluster - * @param tableCf table-cfs config str - * @throws ReplicationException - * @throws IOException - * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, - * use {@link #removePeerTableCFs(String, Map)} instead. - */ - @Deprecated - public void removePeerTableCFs(String id, String tableCf) throws ReplicationException, - IOException { - removePeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf)); - } - - /** - * Remove some table-cfs from config of the specified peer - * @param id a short name that identifies the cluster - * @param tableCfs A map from tableName to column family names - * @throws ReplicationException - * @throws IOException - */ - @Deprecated - public void removePeerTableCFs(String id, Map> tableCfs) - throws ReplicationException, IOException { - this.admin.removeReplicationPeerTableCFs(id, copyTableCFs(tableCfs)); - } - - private Map> - copyTableCFs(Map> tableCfs) { - Map> newTableCfs = new HashMap<>(); - if (tableCfs != null) { - tableCfs.forEach( - (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); - } - return newTableCfs; - } - - /** - * Set the replicable table-cf config of the specified peer - * @param id a short name that identifies the cluster - * @param tableCfs the table and column-family list which will be replicated for this peer. - * A map from tableName to column family names. An empty collection can be passed - * to indicate replicating all column families. Pass null for replicating all table and column - * families - */ - @Deprecated - public void setPeerTableCFs(String id, Map> tableCfs) - throws IOException { - ReplicationPeerConfig peerConfig = getPeerConfig(id); - peerConfig.setTableCFsMap(tableCfs); - updatePeerConfig(id, peerConfig); - } - - /** - * Get the state of the specified peer cluster - * @param id String format of the Short name that identifies the peer, - * an IllegalArgumentException is thrown if it doesn't exist - * @return true if replication is enabled to that peer, false if it isn't - */ - @Deprecated - public boolean getPeerState(String id) throws ReplicationException, IOException { - List peers = admin.listReplicationPeers(Pattern.compile(id)); - if (peers.isEmpty() || !id.equals(peers.get(0).getPeerId())) { - throw new ReplicationPeerNotFoundException(id); - } - return peers.get(0).isEnabled(); - } - - @Override - public void close() throws IOException { - if (this.connection != null) { - this.connection.close(); - } - admin.close(); - } - - /** - * Find all column families that are replicated from this cluster - * @return the full list of the replicated column families of this cluster as: - * tableName, family name, replicationType - * - * Currently replicationType is Global. In the future, more replication - * types may be extended here. For example - * 1) the replication may only apply to selected peers instead of all peers - * 2) the replicationType may indicate the host Cluster servers as Slave - * for the table:columnFam. - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicatedTableCFs()} instead - */ - @Deprecated - public List> listReplicated() throws IOException { - List> replicationColFams = new ArrayList<>(); - admin.listReplicatedTableCFs().forEach( - (tableCFs) -> { - String table = tableCFs.getTable().getNameAsString(); - tableCFs.getColumnFamilyMap() - .forEach( - (cf, scope) -> { - HashMap replicationEntry = new HashMap<>(); - replicationEntry.put(TNAME, table); - replicationEntry.put(CFNAME, cf); - replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL); - replicationColFams.add(replicationEntry); - }); - }); - return replicationColFams; - } - - /** - * Enable a table's replication switch. - * @param tableName name of the table - * @throws IOException if a remote or network exception occurs - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableTableReplication(TableName)} - * instead - */ - @Deprecated - public void enableTableRep(final TableName tableName) throws IOException { - admin.enableTableReplication(tableName); - } - - /** - * Disable a table's replication switch. - * @param tableName name of the table - * @throws IOException if a remote or network exception occurs - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableTableReplication(TableName)} - * instead - */ - @Deprecated - public void disableTableRep(final TableName tableName) throws IOException { - admin.disableTableReplication(tableName); - } - - /** - * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead - */ - @VisibleForTesting - @Deprecated - List listReplicationPeers() throws IOException { - return admin.listReplicationPeers(); - } -} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java index 2a09da7a505..6c0db1043ec 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java @@ -176,8 +176,7 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { /** * This tears down any tables that existed from before and rebuilds the tables and schemas on * the source cluster. It then sets up replication from the source to the sink cluster by using - * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} - * connection. + * the {@link org.apache.hadoop.hbase.client.Admin} connection. * * @throws Exception */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 566b1070234..cd59e924808 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -396,11 +395,11 @@ public class TestReplicaWithCluster { LOG.info("Setup second Zk"); HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(HTU2.getClusterKey()); - admin.addPeer("2", rpc, null); + admin.addReplicationPeer("2", rpc); admin.close(); Put p = new Put(row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java deleted file mode 100644 index d337906a3ad..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ /dev/null @@ -1,1185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client.replication; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Pattern; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ReplicationUtils; -import org.apache.hadoop.hbase.replication.SyncReplicationState; -import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Unit testing of ReplicationAdmin - */ -@Category({MediumTests.class, ClientTests.class}) -public class TestReplicationAdmin { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationAdmin.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); - - private final static HBaseTestingUtility TEST_UTIL = - new HBaseTestingUtility(); - - private final String ID_ONE = "1"; - private final String KEY_ONE = "127.0.0.1:2181:/hbase"; - private final String ID_SECOND = "2"; - private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; - - private static ReplicationAdmin admin; - private static Admin hbaseAdmin; - - @Rule - public TestName name = new TestName(); - - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - TEST_UTIL.startMiniCluster(); - admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); - hbaseAdmin = TEST_UTIL.getAdmin(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - if (admin != null) { - admin.close(); - } - TEST_UTIL.shutdownMiniCluster(); - } - - @After - public void tearDown() throws Exception { - for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { - hbaseAdmin.removeReplicationPeer(desc.getPeerId()); - } - ReplicationQueueStorage queueStorage = ReplicationStorageFactory - .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); - for (ServerName serverName : queueStorage.getListOfReplicators()) { - for (String queue : queueStorage.getAllQueues(serverName)) { - queueStorage.removeQueue(serverName, queue); - } - queueStorage.removeReplicatorIfQueueIsEmpty(serverName); - } - } - - @Test - public void testConcurrentPeerOperations() throws Exception { - int threadNum = 5; - AtomicLong successCount = new AtomicLong(0); - - // Test concurrent add peer operation - Thread[] addPeers = new Thread[threadNum]; - for (int i = 0; i < threadNum; i++) { - addPeers[i] = new Thread(() -> { - try { - hbaseAdmin.addReplicationPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); - successCount.incrementAndGet(); - } catch (Exception e) { - LOG.debug("Got exception when add replication peer", e); - } - }); - addPeers[i].start(); - } - for (Thread addPeer : addPeers) { - addPeer.join(); - } - assertEquals(1, successCount.get()); - - // Test concurrent remove peer operation - successCount.set(0); - Thread[] removePeers = new Thread[threadNum]; - for (int i = 0; i < threadNum; i++) { - removePeers[i] = new Thread(() -> { - try { - hbaseAdmin.removeReplicationPeer(ID_ONE); - successCount.incrementAndGet(); - } catch (Exception e) { - LOG.debug("Got exception when remove replication peer", e); - } - }); - removePeers[i].start(); - } - for (Thread removePeer : removePeers) { - removePeer.join(); - } - assertEquals(1, successCount.get()); - - // Test concurrent add peer operation again - successCount.set(0); - addPeers = new Thread[threadNum]; - for (int i = 0; i < threadNum; i++) { - addPeers[i] = new Thread(() -> { - try { - hbaseAdmin.addReplicationPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); - successCount.incrementAndGet(); - } catch (Exception e) { - LOG.debug("Got exception when add replication peer", e); - } - }); - addPeers[i].start(); - } - for (Thread addPeer : addPeers) { - addPeer.join(); - } - assertEquals(1, successCount.get()); - } - - @Test - public void testAddInvalidPeer() { - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_ONE); - try { - String invalidPeerId = "1-2"; - hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); - fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); - } catch (Exception e) { - // OK - } - - try { - String invalidClusterKey = "2181:/hbase"; - builder.setClusterKey(invalidClusterKey); - hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); - fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); - } catch (Exception e) { - // OK - } - } - - /** - * Simple testing of adding and removing peers, basically shows that - * all interactions with ZK work - * @throws Exception - */ - @Test - public void testAddRemovePeer() throws Exception { - ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder(); - rpc1.setClusterKey(KEY_ONE); - ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder(); - rpc2.setClusterKey(KEY_SECOND); - // Add a valid peer - hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); - // try adding the same (fails) - try { - hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); - } catch (Exception e) { - // OK! - } - assertEquals(1, hbaseAdmin.listReplicationPeers().size()); - // Try to remove an inexisting peer - try { - hbaseAdmin.removeReplicationPeer(ID_SECOND); - fail(); - } catch (Exception e) { - // OK! - } - assertEquals(1, hbaseAdmin.listReplicationPeers().size()); - // Add a second since multi-slave is supported - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build()); - } catch (Exception e) { - fail(); - } - assertEquals(2, hbaseAdmin.listReplicationPeers().size()); - // Remove the first peer we added - hbaseAdmin.removeReplicationPeer(ID_ONE); - assertEquals(1, hbaseAdmin.listReplicationPeers().size()); - hbaseAdmin.removeReplicationPeer(ID_SECOND); - assertEquals(0, hbaseAdmin.listReplicationPeers().size()); - } - - @Test - public void testRemovePeerWithNonDAState() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - TEST_UTIL.createTable(tableName, Bytes.toBytes("family")); - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - - Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); - TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_ONE)); - builder.setClusterKey(KEY_ONE); - builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), - TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString()); - builder.setReplicateAllUserTables(false); - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName, new ArrayList<>()); - builder.setTableCFsMap(tableCfs); - hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); - - // Transit sync replication state to ACTIVE. - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.ACTIVE); - assertEquals(SyncReplicationState.ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); - - try { - hbaseAdmin.removeReplicationPeer(ID_ONE); - fail("Can't remove a synchronous replication peer with state=ACTIVE"); - } catch (IOException e) { - // OK - } - - // Transit sync replication state to DA - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, - SyncReplicationState.DOWNGRADE_ACTIVE); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); - // Transit sync replication state to STANDBY - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.STANDBY); - assertEquals(SyncReplicationState.STANDBY, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); - - try { - hbaseAdmin.removeReplicationPeer(ID_ONE); - fail("Can't remove a synchronous replication peer with state=STANDBY"); - } catch (IOException e) { - // OK - } - - // Transit sync replication state to DA - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, - SyncReplicationState.DOWNGRADE_ACTIVE); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - assertEquals(0, hbaseAdmin.listReplicationPeers().size()); - } - - @Test - public void testAddPeerWithState() throws Exception { - ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); - rpc1.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); - assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); - hbaseAdmin.removeReplicationPeer(ID_ONE); - - ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(KEY_SECOND); - hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); - assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); - hbaseAdmin.removeReplicationPeer(ID_SECOND); - } - - /** - * Tests that the peer configuration used by ReplicationAdmin contains all - * the peer's properties. - */ - @Test - public void testPeerConfig() throws Exception { - ReplicationPeerConfig config = new ReplicationPeerConfig(); - config.setClusterKey(KEY_ONE); - config.getConfiguration().put("key1", "value1"); - config.getConfiguration().put("key2", "value2"); - hbaseAdmin.addReplicationPeer(ID_ONE, config); - - List peers = hbaseAdmin.listReplicationPeers(); - assertEquals(1, peers.size()); - ReplicationPeerDescription peerOne = peers.get(0); - assertNotNull(peerOne); - assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); - assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - } - - @Test - public void testAddPeerWithUnDeletedQueues() throws Exception { - ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); - rpc1.setClusterKey(KEY_ONE); - ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(KEY_SECOND); - Configuration conf = TEST_UTIL.getConfiguration(); - ReplicationQueueStorage queueStorage = - ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); - - ServerName serverName = ServerName.valueOf("server1", 8000, 1234); - // add queue for ID_ONE - queueStorage.addWAL(serverName, ID_ONE, "file1"); - try { - admin.addPeer(ID_ONE, rpc1, null); - fail(); - } catch (Exception e) { - // OK! - } - queueStorage.removeQueue(serverName, ID_ONE); - assertEquals(0, queueStorage.getAllQueues(serverName).size()); - - // add recovered queue for ID_ONE - queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); - try { - admin.addPeer(ID_ONE, rpc2, null); - fail(); - } catch (Exception e) { - // OK! - } - } - - /** - * basic checks that when we add a peer that it is enabled, and that we can disable - * @throws Exception - */ - @Test - public void testEnableDisable() throws Exception { - ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); - rpc1.setClusterKey(KEY_ONE); - admin.addPeer(ID_ONE, rpc1, null); - assertEquals(1, admin.getPeersCount()); - assertTrue(admin.getPeerState(ID_ONE)); - admin.disablePeer(ID_ONE); - - assertFalse(admin.getPeerState(ID_ONE)); - try { - admin.getPeerState(ID_SECOND); - } catch (ReplicationPeerNotFoundException e) { - // OK! - } - admin.removePeer(ID_ONE); - } - - @Test - public void testAppendPeerTableCFs() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); - final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); - final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); - final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); - final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); - - // Add a valid peer - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - // Update peer config, not replicate all user tables - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - rpc.setReplicateAllUserTables(false); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - - Map> tableCFs = new HashMap<>(); - tableCFs.put(tableName1, null); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - Map> result = - ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(1, result.size()); - assertEquals(true, result.containsKey(tableName1)); - assertNull(result.get(tableName1)); - - // append table t2 to replication - tableCFs.clear(); - tableCFs.put(tableName2, null); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(2, result.size()); - assertTrue("Should contain t1", result.containsKey(tableName1)); - assertTrue("Should contain t2", result.containsKey(tableName2)); - assertNull(result.get(tableName1)); - assertNull(result.get(tableName2)); - - // append table column family: f1 of t3 to replication - tableCFs.clear(); - tableCFs.put(tableName3, new ArrayList<>()); - tableCFs.get(tableName3).add("f1"); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(3, result.size()); - assertTrue("Should contain t1", result.containsKey(tableName1)); - assertTrue("Should contain t2", result.containsKey(tableName2)); - assertTrue("Should contain t3", result.containsKey(tableName3)); - assertNull(result.get(tableName1)); - assertNull(result.get(tableName2)); - assertEquals(1, result.get(tableName3).size()); - assertEquals("f1", result.get(tableName3).get(0)); - - tableCFs.clear(); - tableCFs.put(tableName4, new ArrayList<>()); - tableCFs.get(tableName4).add("f1"); - tableCFs.get(tableName4).add("f2"); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(4, result.size()); - assertTrue("Should contain t1", result.containsKey(tableName1)); - assertTrue("Should contain t2", result.containsKey(tableName2)); - assertTrue("Should contain t3", result.containsKey(tableName3)); - assertTrue("Should contain t4", result.containsKey(tableName4)); - assertNull(result.get(tableName1)); - assertNull(result.get(tableName2)); - assertEquals(1, result.get(tableName3).size()); - assertEquals("f1", result.get(tableName3).get(0)); - assertEquals(2, result.get(tableName4).size()); - assertEquals("f1", result.get(tableName4).get(0)); - assertEquals("f2", result.get(tableName4).get(1)); - - // append "table5" => [], then append "table5" => ["f1"] - tableCFs.clear(); - tableCFs.put(tableName5, new ArrayList<>()); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - tableCFs.clear(); - tableCFs.put(tableName5, new ArrayList<>()); - tableCFs.get(tableName5).add("f1"); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(5, result.size()); - assertTrue("Should contain t5", result.containsKey(tableName5)); - // null means replication all cfs of tab5 - assertNull(result.get(tableName5)); - - // append "table6" => ["f1"], then append "table6" => [] - tableCFs.clear(); - tableCFs.put(tableName6, new ArrayList<>()); - tableCFs.get(tableName6).add("f1"); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - tableCFs.clear(); - tableCFs.put(tableName6, new ArrayList<>()); - admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(6, result.size()); - assertTrue("Should contain t6", result.containsKey(tableName6)); - // null means replication all cfs of tab6 - assertNull(result.get(tableName6)); - - admin.removePeer(ID_ONE); - } - - @Test - public void testRemovePeerTableCFs() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); - final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); - final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); - - // Add a valid peer - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - // Update peer config, not replicate all user tables - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - rpc.setReplicateAllUserTables(false); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - - Map> tableCFs = new HashMap<>(); - try { - tableCFs.put(tableName3, null); - admin.removePeerTableCFs(ID_ONE, tableCFs); - assertTrue(false); - } catch (ReplicationException e) { - } - assertNull(admin.getPeerTableCFs(ID_ONE)); - - tableCFs.clear(); - tableCFs.put(tableName1, null); - tableCFs.put(tableName2, new ArrayList<>()); - tableCFs.get(tableName2).add("cf1"); - admin.setPeerTableCFs(ID_ONE, tableCFs); - try { - tableCFs.clear(); - tableCFs.put(tableName3, null); - admin.removePeerTableCFs(ID_ONE, tableCFs); - assertTrue(false); - } catch (ReplicationException e) { - } - Map> result = - ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(2, result.size()); - assertTrue("Should contain t1", result.containsKey(tableName1)); - assertTrue("Should contain t2", result.containsKey(tableName2)); - assertNull(result.get(tableName1)); - assertEquals(1, result.get(tableName2).size()); - assertEquals("cf1", result.get(tableName2).get(0)); - - try { - tableCFs.clear(); - tableCFs.put(tableName1, new ArrayList<>()); - tableCFs.get(tableName1).add("f1"); - admin.removePeerTableCFs(ID_ONE, tableCFs); - assertTrue(false); - } catch (ReplicationException e) { - } - tableCFs.clear(); - tableCFs.put(tableName1, null); - admin.removePeerTableCFs(ID_ONE, tableCFs); - result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); - assertEquals(1, result.size()); - assertEquals(1, result.get(tableName2).size()); - assertEquals("cf1", result.get(tableName2).get(0)); - - try { - tableCFs.clear(); - tableCFs.put(tableName2, null); - admin.removePeerTableCFs(ID_ONE, tableCFs); - fail(); - } catch (ReplicationException e) { - } - tableCFs.clear(); - tableCFs.put(tableName2, new ArrayList<>()); - tableCFs.get(tableName2).add("cf1"); - admin.removePeerTableCFs(ID_ONE, tableCFs); - assertNull(admin.getPeerTableCFs(ID_ONE)); - - tableCFs.clear(); - tableCFs.put(tableName4, new ArrayList<>()); - admin.setPeerTableCFs(ID_ONE, tableCFs); - admin.removePeerTableCFs(ID_ONE, tableCFs); - assertNull(admin.getPeerTableCFs(ID_ONE)); - - admin.removePeer(ID_ONE); - } - - @Test - public void testSetPeerNamespaces() throws Exception { - String ns1 = "ns1"; - String ns2 = "ns2"; - - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - rpc.setReplicateAllUserTables(false); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - Set namespaces = new HashSet<>(); - namespaces.add(ns1); - namespaces.add(ns2); - rpc.setNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); - assertEquals(2, namespaces.size()); - assertTrue(namespaces.contains(ns1)); - assertTrue(namespaces.contains(ns2)); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - namespaces = new HashSet<>(); - namespaces.add(ns1); - rpc.setNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); - assertEquals(1, namespaces.size()); - assertTrue(namespaces.contains(ns1)); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - } - - @Test - public void testSetReplicateAllUserTables() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - assertTrue(rpc.replicateAllUserTables()); - - rpc.setReplicateAllUserTables(false); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - assertFalse(rpc.replicateAllUserTables()); - - rpc.setReplicateAllUserTables(true); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - assertTrue(rpc.replicateAllUserTables()); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - } - - @Test - public void testPeerExcludeNamespaces() throws Exception { - String ns1 = "ns1"; - String ns2 = "ns2"; - - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - assertTrue(rpc.replicateAllUserTables()); - - Set namespaces = new HashSet(); - namespaces.add(ns1); - namespaces.add(ns2); - rpc.setExcludeNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); - assertEquals(2, namespaces.size()); - assertTrue(namespaces.contains(ns1)); - assertTrue(namespaces.contains(ns2)); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - namespaces = new HashSet(); - namespaces.add(ns1); - rpc.setExcludeNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); - assertEquals(1, namespaces.size()); - assertTrue(namespaces.contains(ns1)); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - } - - @Test - public void testPeerExcludeTableCFs() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - TableName tab1 = TableName.valueOf("t1"); - TableName tab2 = TableName.valueOf("t2"); - TableName tab3 = TableName.valueOf("t3"); - TableName tab4 = TableName.valueOf("t4"); - - // Add a valid peer - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - assertTrue(rpc.replicateAllUserTables()); - - Map> tableCFs = new HashMap>(); - tableCFs.put(tab1, null); - rpc.setExcludeTableCFsMap(tableCFs); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - Map> result = - hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); - assertEquals(1, result.size()); - assertEquals(true, result.containsKey(tab1)); - assertNull(result.get(tab1)); - - tableCFs.put(tab2, new ArrayList()); - tableCFs.get(tab2).add("f1"); - rpc.setExcludeTableCFsMap(tableCFs); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); - assertEquals(2, result.size()); - assertTrue("Should contain t1", result.containsKey(tab1)); - assertTrue("Should contain t2", result.containsKey(tab2)); - assertNull(result.get(tab1)); - assertEquals(1, result.get(tab2).size()); - assertEquals("f1", result.get(tab2).get(0)); - - tableCFs.clear(); - tableCFs.put(tab3, new ArrayList()); - tableCFs.put(tab4, new ArrayList()); - tableCFs.get(tab4).add("f1"); - tableCFs.get(tab4).add("f2"); - rpc.setExcludeTableCFsMap(tableCFs); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); - assertEquals(2, result.size()); - assertTrue("Should contain t3", result.containsKey(tab3)); - assertTrue("Should contain t4", result.containsKey(tab4)); - assertNull(result.get(tab3)); - assertEquals(2, result.get(tab4).size()); - assertEquals("f1", result.get(tab4).get(0)); - assertEquals("f2", result.get(tab4).get(1)); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - } - - @Test - public void testPeerConfigConflict() throws Exception { - // Default replicate_all flag is true - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - - String ns1 = "ns1"; - Set namespaces = new HashSet(); - namespaces.add(ns1); - - TableName tab1 = TableName.valueOf("ns2:tabl"); - Map> tableCfs = new HashMap>(); - tableCfs.put(tab1, new ArrayList()); - - try { - rpc.setNamespaces(namespaces); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - fail("Should throw Exception." - + " When replicate all flag is true, no need to config namespaces"); - } catch (IOException e) { - // OK - rpc.setNamespaces(null); - } - - try { - rpc.setTableCFsMap(tableCfs); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - fail("Should throw Exception." - + " When replicate all flag is true, no need to config table-cfs"); - } catch (IOException e) { - // OK - rpc.setTableCFsMap(null); - } - - // Set replicate_all flag to true - rpc.setReplicateAllUserTables(false); - try { - rpc.setExcludeNamespaces(namespaces); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - fail("Should throw Exception." - + " When replicate all flag is false, no need to config exclude namespaces"); - } catch (IOException e) { - // OK - rpc.setExcludeNamespaces(null); - } - - try { - rpc.setExcludeTableCFsMap(tableCfs); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - fail("Should throw Exception." - + " When replicate all flag is false, no need to config exclude table-cfs"); - } catch (IOException e) { - // OK - rpc.setExcludeTableCFsMap(null); - } - - rpc.setNamespaces(namespaces); - rpc.setTableCFsMap(tableCfs); - // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - // Default replicate_all flag is true - ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(KEY_SECOND); - rpc2.setExcludeNamespaces(namespaces); - rpc2.setExcludeTableCFsMap(tableCfs); - // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude - // table-cfs config - hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); - - hbaseAdmin.removeReplicationPeer(ID_ONE); - hbaseAdmin.removeReplicationPeer(ID_SECOND); - } - - @Test - public void testNamespacesAndTableCfsConfigConflict() throws Exception { - String ns1 = "ns1"; - String ns2 = "ns2"; - final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); - final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); - - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - rpc.setReplicateAllUserTables(false); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - Set namespaces = new HashSet(); - namespaces.add(ns1); - rpc.setNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - try { - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName1, new ArrayList<>()); - rpc.setTableCFsMap(tableCfs); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - fail("Should throw ReplicationException" + " Because table " + tableName1 - + " conflict with namespace " + ns1); - } catch (Exception e) { - // OK - } - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName2, new ArrayList<>()); - rpc.setTableCFsMap(tableCfs); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - try { - namespaces.clear(); - namespaces.add(ns2); - rpc.setNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); - fail("Should throw ReplicationException" + " Because namespace " + ns2 - + " conflict with table " + tableName2); - } catch (Exception e) { - // OK - } - - ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(KEY_SECOND); - hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); - - rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - Set excludeNamespaces = new HashSet(); - excludeNamespaces.add(ns1); - rpc2.setExcludeNamespaces(excludeNamespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); - rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - try { - Map> excludeTableCfs = new HashMap<>(); - excludeTableCfs.put(tableName1, new ArrayList<>()); - rpc2.setExcludeTableCFsMap(excludeTableCfs); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); - fail("Should throw ReplicationException" + " Because exclude table " + tableName1 - + " conflict with exclude namespace " + ns1); - } catch (Exception e) { - // OK - } - - rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - Map> excludeTableCfs = new HashMap<>(); - excludeTableCfs.put(tableName2, new ArrayList<>()); - rpc2.setExcludeTableCFsMap(excludeTableCfs); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); - rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - try { - namespaces.clear(); - namespaces.add(ns2); - rpc2.setNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); - fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 - + " conflict with exclude table " + tableName2); - } catch (Exception e) { - // OK - } - - hbaseAdmin.removeReplicationPeer(ID_ONE); - hbaseAdmin.removeReplicationPeer(ID_SECOND); - } - - @Test - public void testPeerBandwidth() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - - rpc = admin.getPeerConfig(ID_ONE); - assertEquals(0, rpc.getBandwidth()); - - rpc.setBandwidth(2097152); - admin.updatePeerConfig(ID_ONE, rpc); - - assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); - admin.removePeer(ID_ONE); - } - - @Test - public void testPeerClusterKey() throws Exception { - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); - - try { - builder.setClusterKey(KEY_SECOND); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); - fail("Change cluster key on an existing peer is not allowed"); - } catch (Exception e) { - // OK - } - } - - @Test - public void testPeerReplicationEndpointImpl() throws Exception { - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_ONE); - builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); - hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); - - try { - builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); - fail("Change replication endpoint implementation class on an existing peer is not allowed"); - } catch (Exception e) { - // OK - } - - try { - builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_ONE); - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); - fail("Change replication endpoint implementation class on an existing peer is not allowed"); - } catch (Exception e) { - // OK - } - - builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_SECOND); - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - - try { - builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change replication endpoint implementation class on an existing peer is not allowed"); - } catch (Exception e) { - // OK - } - } - - @Test - public void testPeerRemoteWALDir() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - - String rootDir = "hdfs://srv1:9999/hbase"; - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_ONE); - hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); - - ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - assertNull(rpc.getRemoteWALDir()); - - builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); - try { - hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); - fail("Change remote wal dir is not allowed"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_SECOND); - builder.setRemoteWALDir("whatever"); - - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - fail("Only support replicated table config for sync replication"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - builder.setReplicateAllUserTables(false); - Set namespaces = new HashSet(); - namespaces.add("ns1"); - builder.setNamespaces(namespaces); - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - fail("Only support replicated table config for sync replication"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - builder.setNamespaces(null); - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - fail("Only support replicated table config for sync replication, and tables can't be empty"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName, Arrays.asList("cf1")); - builder.setTableCFsMap(tableCfs); - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - fail("Only support replicated table config for sync replication"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - tableCfs = new HashMap<>(); - tableCfs.put(tableName, new ArrayList<>()); - builder.setTableCFsMap(tableCfs); - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - fail("The remote WAL dir must be absolute"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - builder.setRemoteWALDir("/hbase/remoteWALs"); - try { - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - fail("The remote WAL dir must be qualified"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - builder.setRemoteWALDir(rootDir); - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - assertEquals(rootDir, rpc.getRemoteWALDir()); - - try { - builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change remote wal dir is not allowed"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - try { - builder.setRemoteWALDir(null); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change remote wal dir is not allowed"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - try { - builder = ReplicationPeerConfig.newBuilder(rpc); - tableCfs = new HashMap<>(); - tableCfs.put(TableName.valueOf("ns1:" + name.getMethodName()), new ArrayList<>()); - builder.setTableCFsMap(tableCfs); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated table config on an existing synchronous peer is not allowed"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - } - - @Test - public void testTransitSyncReplicationPeerState() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - TEST_UTIL.createTable(tableName, Bytes.toBytes("family")); - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_ONE); - builder.setReplicateAllUserTables(false); - hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); - assertEquals(SyncReplicationState.NONE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); - - try { - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, - SyncReplicationState.DOWNGRADE_ACTIVE); - fail("Can't transit sync replication state if replication peer don't config remote wal dir"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - - Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); - builder = ReplicationPeerConfig.newBuilder(); - builder.setClusterKey(KEY_SECOND); - builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), - TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString()); - builder.setReplicateAllUserTables(false); - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName, new ArrayList<>()); - builder.setTableCFsMap(tableCfs); - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - // Disable and enable peer don't affect SyncReplicationState - hbaseAdmin.disableReplicationPeer(ID_SECOND); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - hbaseAdmin.enableReplicationPeer(ID_SECOND); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - try { - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); - fail("Can't transit sync replication state to ACTIVE if remote wal dir does not exist"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - TEST_UTIL.getTestFileSystem() - .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND)); - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); - assertEquals(SyncReplicationState.ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY); - assertEquals(SyncReplicationState.STANDBY, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, - SyncReplicationState.DOWNGRADE_ACTIVE); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); - assertEquals(SyncReplicationState.ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, - SyncReplicationState.DOWNGRADE_ACTIVE); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY); - assertEquals(SyncReplicationState.STANDBY, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - try { - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); - fail("Can't transit sync replication state from STANDBY to ACTIVE"); - } catch (Exception e) { - // OK - LOG.info("Expected error:", e); - } - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, - SyncReplicationState.DOWNGRADE_ACTIVE); - assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, - hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - hbaseAdmin.removeReplicationPeer(ID_ONE); - hbaseAdmin.removeReplicationPeer(ID_SECOND); - assertEquals(0, hbaseAdmin.listReplicationPeers().size()); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java deleted file mode 100644 index 16fef673c54..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ /dev/null @@ -1,336 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client.replication; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.TestReplicationBase; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -/** - * Unit testing of ReplicationAdmin with clusters - */ -@Category({ MediumTests.class, ClientTests.class }) -public class TestReplicationAdminWithClusters extends TestReplicationBase { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class); - - static Connection connection1; - static Connection connection2; - static Admin admin1; - static Admin admin2; - static ReplicationAdmin adminExt; - - @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TestReplicationBase.setUpBeforeClass(); - connection1 = ConnectionFactory.createConnection(CONF1); - connection2 = ConnectionFactory.createConnection(CONF2); - admin1 = connection1.getAdmin(); - admin2 = connection2.getAdmin(); - adminExt = new ReplicationAdmin(CONF1); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - admin1.close(); - admin2.close(); - adminExt.close(); - connection1.close(); - connection2.close(); - TestReplicationBase.tearDownAfterClass(); - } - - @Test - public void disableNotFullReplication() throws Exception { - HTableDescriptor table = new HTableDescriptor(admin2.getDescriptor(tableName)); - HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily"); - table.addFamily(f); - admin1.disableTable(tableName); - admin1.modifyTable(table); - admin1.enableTable(tableName); - - admin1.disableTableReplication(tableName); - table = new HTableDescriptor(admin1.getDescriptor(tableName)); - for (HColumnDescriptor fam : table.getColumnFamilies()) { - assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); - } - - admin1.deleteColumnFamily(table.getTableName(), f.getName()); - } - - @Test - public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { - admin1.disableTableReplication(tableName); - admin2.disableTable(tableName); - admin2.deleteTable(tableName); - assertFalse(admin2.tableExists(tableName)); - admin1.enableTableReplication(tableName); - assertTrue(admin2.tableExists(tableName)); - } - - @Test - public void testEnableReplicationWhenReplicationNotEnabled() throws Exception { - HTableDescriptor table = new HTableDescriptor(admin1.getDescriptor(tableName)); - for (HColumnDescriptor fam : table.getColumnFamilies()) { - fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL); - } - admin1.disableTable(tableName); - admin1.modifyTable(table); - admin1.enableTable(tableName); - - admin2.disableTable(tableName); - admin2.modifyTable(table); - admin2.enableTable(tableName); - - admin1.enableTableReplication(tableName); - table = new HTableDescriptor(admin1.getDescriptor(tableName)); - for (HColumnDescriptor fam : table.getColumnFamilies()) { - assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); - } - } - - @Test - public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { - HTableDescriptor table = new HTableDescriptor(admin2.getDescriptor(tableName)); - HColumnDescriptor f = new HColumnDescriptor("newFamily"); - table.addFamily(f); - admin2.disableTable(tableName); - admin2.modifyTable(table); - admin2.enableTable(tableName); - - try { - admin1.enableTableReplication(tableName); - fail("Exception should be thrown if table descriptors in the clusters are not same."); - } catch (RuntimeException ignored) { - - } - admin1.disableTable(tableName); - admin1.modifyTable(table); - admin1.enableTable(tableName); - admin1.enableTableReplication(tableName); - table = new HTableDescriptor(admin1.getDescriptor(tableName)); - for (HColumnDescriptor fam : table.getColumnFamilies()) { - assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); - } - - admin1.deleteColumnFamily(tableName, f.getName()); - admin2.deleteColumnFamily(tableName, f.getName()); - } - - @Test - public void testDisableAndEnableReplication() throws Exception { - admin1.disableTableReplication(tableName); - HTableDescriptor table = new HTableDescriptor(admin1.getDescriptor(tableName)); - for (HColumnDescriptor fam : table.getColumnFamilies()) { - assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); - } - admin1.enableTableReplication(tableName); - table = new HTableDescriptor(admin1.getDescriptor(tableName)); - for (HColumnDescriptor fam : table.getColumnFamilies()) { - assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); - } - } - - @Test - public void testEnableReplicationForTableWithRegionReplica() throws Exception { - TableName tn = TableName.valueOf(name.getMethodName()); - TableDescriptor td = TableDescriptorBuilder.newBuilder(tn) - .setRegionReplication(5) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build()) - .build(); - - admin1.createTable(td); - - try { - admin1.enableTableReplication(tn); - td = admin1.getDescriptor(tn); - for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) { - assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); - } - } finally { - UTIL1.deleteTable(tn); - UTIL2.deleteTable(tn); - } - } - - @Test(expected = TableNotFoundException.class) - public void testDisableReplicationForNonExistingTable() throws Exception { - admin1.disableTableReplication(TableName.valueOf(name.getMethodName())); - } - - @Test(expected = TableNotFoundException.class) - public void testEnableReplicationForNonExistingTable() throws Exception { - admin1.enableTableReplication(TableName.valueOf(name.getMethodName())); - } - - @Test(expected = IllegalArgumentException.class) - public void testDisableReplicationWhenTableNameAsNull() throws Exception { - admin1.disableTableReplication(null); - } - - @Test(expected = IllegalArgumentException.class) - public void testEnableReplicationWhenTableNameAsNull() throws Exception { - admin1.enableTableReplication(null); - } - - /* - * Test enable table replication should create table only in user explicit specified table-cfs. - * HBASE-14717 - */ - @Test - public void testEnableReplicationForExplicitSetTableCfs() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - String peerId = "2"; - if (admin2.isTableAvailable(TestReplicationBase.tableName)) { - admin2.disableTable(TestReplicationBase.tableName); - admin2.deleteTable(TestReplicationBase.tableName); - } - assertFalse("Table should not exists in the peer cluster", - admin2.isTableAvailable(TestReplicationBase.tableName)); - - // update peer config - ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); - rpc.setReplicateAllUserTables(false); - admin1.updateReplicationPeerConfig(peerId, rpc); - - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName, null); - try { - adminExt.setPeerTableCFs(peerId, tableCfs); - admin1.enableTableReplication(TestReplicationBase.tableName); - assertFalse("Table should not be created if user has set table cfs explicitly for the " - + "peer and this is not part of that collection", - admin2.isTableAvailable(TestReplicationBase.tableName)); - - tableCfs.put(TestReplicationBase.tableName, null); - adminExt.setPeerTableCFs(peerId, tableCfs); - admin1.enableTableReplication(TestReplicationBase.tableName); - assertTrue( - "Table should be created if user has explicitly added table into table cfs collection", - admin2.isTableAvailable(TestReplicationBase.tableName)); - } finally { - adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); - admin1.disableTableReplication(TestReplicationBase.tableName); - - rpc = admin1.getReplicationPeerConfig(peerId); - rpc.setReplicateAllUserTables(true); - admin1.updateReplicationPeerConfig(peerId, rpc); - } - } - - @Test - public void testReplicationPeerConfigUpdateCallback() throws Exception { - String peerId = "1"; - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(UTIL2.getClusterKey()); - rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); - rpc.getConfiguration().put("key1", "value1"); - - admin1.addReplicationPeer(peerId, rpc); - - rpc.getConfiguration().put("key1", "value2"); - admin.updatePeerConfig(peerId, rpc); - if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { - synchronized (TestUpdatableReplicationEndpoint.class) { - TestUpdatableReplicationEndpoint.class.wait(2000L); - } - } - - assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); - - admin.removePeer(peerId); - } - - public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint { - private static boolean calledBack = false; - public static boolean hasCalledBack(){ - return calledBack; - } - @Override - public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){ - calledBack = true; - notifyAll(); - } - - @Override - public void start() { - startAsync(); - } - - @Override - public void stop() { - stopAsync(); - } - - @Override - protected void doStart() { - notifyStarted(); - } - - @Override - protected void doStop() { - notifyStopped(); - } - - @Override - public UUID getPeerUUID() { - return UTIL1.getRandomUUID(); - } - - @Override - public boolean replicate(ReplicateContext replicateContext) { - return false; - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java deleted file mode 100644 index c04ee4b80e5..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client.replication; - -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MediumTests.class, ClientTests.class }) -public class TestReplicationAdminWithTwoDifferentZKClusters { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationAdminWithTwoDifferentZKClusters.class); - - private static Configuration conf1 = HBaseConfiguration.create(); - private static Configuration conf2; - - private static HBaseTestingUtility utility1; - private static HBaseTestingUtility utility2; - private static ReplicationAdmin admin; - - private static final TableName tableName = TableName.valueOf("test"); - private static final byte[] famName = Bytes.toBytes("f"); - private static final String peerId = "peer1"; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniCluster(); - admin = new ReplicationAdmin(conf1); - - conf2 = HBaseConfiguration.create(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182); - - utility2 = new HBaseTestingUtility(conf2); - utility2.startMiniCluster(); - - ReplicationPeerConfig config = new ReplicationPeerConfig(); - config.setClusterKey(utility2.getClusterKey()); - admin.addPeer(peerId, config, null); - - HTableDescriptor table = new HTableDescriptor(tableName); - HColumnDescriptor fam = new HColumnDescriptor(famName); - fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - table.addFamily(fam); - - utility1.getAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - utility1.waitUntilAllRegionsAssigned(tableName); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - admin.removePeer(peerId); - admin.close(); - utility1.deleteTable(tableName); - utility2.deleteTable(tableName); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); - } - - /* - * Test for HBASE-15393 - */ - @Test - public void testEnableTableReplication() throws Exception { - admin.enableTableRep(tableName); - assertTrue(utility2.getAdmin().tableExists(tableName)); - } - - @Test - public void testDisableTableReplication() throws Exception { - admin.disableTableRep(tableName); - assertTrue(utility2.getAdmin().tableExists(tableName)); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index df1bbdcee86..e7a1691f5ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -36,12 +36,12 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -136,7 +136,7 @@ public class TestMultiSlaveReplication { MiniHBaseCluster master = utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin(); utility1.getAdmin().createTable(table); utility2.getAdmin().createTable(table); @@ -147,7 +147,7 @@ public class TestMultiSlaveReplication { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); - admin1.addPeer("1", rpc, null); + admin1.addReplicationPeer("1", rpc); // put "row" and wait 'til it got around, then delete putAndWait(row, famName, htable1, htable2); @@ -165,7 +165,7 @@ public class TestMultiSlaveReplication { rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility3.getClusterKey()); - admin1.addPeer("2", rpc, null); + admin1.addReplicationPeer("2", rpc); // put a row, check it was replicated to all clusters putAndWait(row1, famName, htable1, htable2, htable3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index cb5c12570b7..bc90563216b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.testclassification.FlakeyTests; @@ -374,7 +373,7 @@ public class TestPerTableCFReplication { @Test public void testPerTableCFReplication() throws Exception { LOG.info("testPerTableCFReplication"); - ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1); + Admin replicationAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2); Connection connection3 = ConnectionFactory.createConnection(conf3); @@ -406,25 +405,25 @@ public class TestPerTableCFReplication { Table htab3C = connection3.getTable(tabCName); // A. add cluster2/cluster3 as peers to cluster1 - ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(utility2.getClusterKey()); - rpc2.setReplicateAllUserTables(false); Map> tableCFs = new HashMap<>(); tableCFs.put(tabCName, null); tableCFs.put(tabBName, new ArrayList<>()); tableCFs.get(tabBName).add("f1"); tableCFs.get(tabBName).add("f3"); - replicationAdmin.addPeer("2", rpc2, tableCFs); + ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder() + .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(false) + .setTableCFsMap(tableCFs).build(); + replicationAdmin.addReplicationPeer("2", rpc2); - ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); - rpc3.setClusterKey(utility3.getClusterKey()); - rpc3.setReplicateAllUserTables(false); tableCFs.clear(); tableCFs.put(tabAName, null); tableCFs.put(tabBName, new ArrayList<>()); tableCFs.get(tabBName).add("f1"); tableCFs.get(tabBName).add("f2"); - replicationAdmin.addPeer("3", rpc3, tableCFs); + ReplicationPeerConfig rpc3 = ReplicationPeerConfig.newBuilder() + .setClusterKey(utility3.getClusterKey()).setReplicateAllUserTables(false) + .setTableCFsMap(tableCFs).build(); + replicationAdmin.addReplicationPeer("3", rpc3); // A1. tableA can only replicated to cluster3 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); @@ -474,13 +473,17 @@ public class TestPerTableCFReplication { tableCFs.put(tabCName, new ArrayList<>()); tableCFs.get(tabCName).add("f2"); tableCFs.get(tabCName).add("f3"); - replicationAdmin.setPeerTableCFs("2", tableCFs); + replicationAdmin.updateReplicationPeerConfig("2", + ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2")) + .setTableCFsMap(tableCFs).build()); tableCFs.clear(); tableCFs.put(tabBName, null); tableCFs.put(tabCName, new ArrayList<>()); tableCFs.get(tabCName).add("f3"); - replicationAdmin.setPeerTableCFs("3", tableCFs); + replicationAdmin.updateReplicationPeerConfig("3", + ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3")) + .setTableCFsMap(tableCFs).build()); // B1. cf 'f1' of tableA can only replicated to cluster2 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 219f9b4eac6..f576e75e098 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; @@ -69,7 +68,6 @@ public class TestReplicationBase { protected static Configuration CONF_WITH_LOCALFS; - protected static ReplicationAdmin admin; protected static Admin hbaseAdmin; protected static Table htable1; @@ -215,7 +213,6 @@ public class TestReplicationBase { protected static void startClusters() throws Exception { UTIL1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); - admin = new ReplicationAdmin(CONF1); LOG.info("Setup first Zk"); UTIL2.setZkCluster(miniZK); @@ -342,7 +339,6 @@ public class TestReplicationBase { public static void tearDownAfterClass() throws Exception { htable2.close(); htable1.close(); - admin.close(); UTIL2.shutdownMiniCluster(); UTIL1.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java index 4c034b1f671..4a5cfc134fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java @@ -65,7 +65,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { Thread.sleep(SLEEP_TIME * NB_RETRIES); // disable and start the peer - admin.disablePeer("2"); + hbaseAdmin.disableReplicationPeer("2"); StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); UTIL2.startMiniHBaseCluster(option); Get get = new Get(rowkey); @@ -80,7 +80,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { } // Test enable replication - admin.enablePeer("2"); + hbaseAdmin.enableReplicationPeer("2"); // wait since the sleep interval would be long Thread.sleep(SLEEP_TIME * NB_RETRIES); for (int i = 0; i < NB_RETRIES; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index b909d8f1bd5..8039db3f1d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -133,9 +133,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. - admin.addPeer("testCustomReplicationEndpoint", + hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) - .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())); // check whether the class has been constructed and started Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @@ -166,22 +166,22 @@ public class TestReplicationEndpoint extends TestReplicationBase { doAssert(Bytes.toBytes("row42")); - admin.removePeer("testCustomReplicationEndpoint"); + hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint"); } @Test public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); - int peerCount = admin.getPeersCount(); + int peerCount = hbaseAdmin.listReplicationPeers().size(); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; - admin.addPeer(id, + hbaseAdmin.addReplicationPeer(id, new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) - .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); + .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName())); // This test is flakey and then there is so much stuff flying around in here its, hard to // debug. Peer needs to be up for the edit to make it across. This wait on // peer count seems to be a hack that has us not progress till peer is up. - if (admin.getPeersCount() <= peerCount) { + if (hbaseAdmin.listReplicationPeers().size() <= peerCount) { LOG.info("Waiting on peercount to go up from " + peerCount); Threads.sleep(100); } @@ -202,7 +202,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { throw ReplicationEndpointReturningFalse.ex.get(); } - admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); + hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate"); } @Test @@ -227,10 +227,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { } } - admin.addPeer(id, + hbaseAdmin.addReplicationPeer(id, new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) - .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), - null); + .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName())); final int numEdits = totEdits; Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate() { @@ -247,7 +246,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } }); - admin.removePeer("testInterClusterReplication"); + hbaseAdmin.removeReplicationPeer("testInterClusterReplication"); UTIL1.deleteTableData(tableName); } @@ -260,7 +259,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, EverythingPassesWALEntryFilter.class.getName() + "," + EverythingPassesWALEntryFilterSubclass.class.getName()); - admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); + hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(CONF1)) { doPut(connection, Bytes.toBytes("row1")); @@ -278,7 +277,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); //make sure our reflectively created filter is in the filter chain Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); - admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); + hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint"); } @Test(expected = IOException.class) @@ -289,7 +288,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { // test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, "IAmNotARealWalEntryFilter"); - admin.addPeer("testWALEntryFilterAddValidation", rpc); + hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc); } @Test(expected = IOException.class) @@ -300,7 +299,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { // test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, "IAmNotARealWalEntryFilter"); - admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); + hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index b8b96788f75..40a3f2a80f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -239,7 +239,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } /** - * Integration test for TestReplicationAdmin, removes and re-add a peer cluster + * Removes and re-add a peer cluster */ @Test public void testAddAndRemoveClusters() throws Exception { @@ -352,8 +352,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { * Test for HBASE-8663 *

* Create two new Tables with colfamilies enabled for replication then run - * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note: - * TestReplicationAdmin is a better place for this testing but it would need mocks. + * {@link Admin#listReplicatedTableCFs()}. Finally verify the table:colfamilies. */ @Test public void testVerifyListReplicatedTable() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index 8c5299e9027..8b6e1216767 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -81,7 +80,7 @@ public class TestReplicationWithTags { private static Configuration conf1 = HBaseConfiguration.create(); private static Configuration conf2; - private static ReplicationAdmin replicationAdmin; + private static Admin replicationAdmin; private static Connection connection1; private static Connection connection2; @@ -121,7 +120,7 @@ public class TestReplicationWithTags { // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); - replicationAdmin = new ReplicationAdmin(conf1); + replicationAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. @@ -143,7 +142,7 @@ public class TestReplicationWithTags { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); - replicationAdmin.addPeer("2", rpc, null); + replicationAdmin.addReplicationPeer("2", rpc); HTableDescriptor table = new HTableDescriptor(TABLE_NAME); HColumnDescriptor fam = new HColumnDescriptor(FAMILY); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java index 6078f55e1a0..a5349cf4474 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java @@ -28,12 +28,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -98,17 +99,17 @@ public class TestGlobalReplicationThrottler { utility2.setZkCluster(miniZK); new ZKWatcher(conf2, "cluster2", null, true); - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin(); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); utility1.startMiniCluster(); utility2.startMiniCluster(); - admin1.addPeer("peer1", rpc, null); - admin1.addPeer("peer2", rpc, null); - admin1.addPeer("peer3", rpc, null); - numOfPeer = admin1.getPeersCount(); + admin1.addReplicationPeer("peer1", rpc); + admin1.addReplicationPeer("peer2", rpc); + admin1.addReplicationPeer("peer3", rpc); + numOfPeer = admin1.listReplicationPeers().size(); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 669e3d2a124..9fa2aca703e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -42,12 +42,12 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; @@ -124,18 +124,18 @@ public class TestRegionReplicaReplicationEndpoint { public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { // create a table with region replicas. Check whether the replication peer is created // and replication started. - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); String peerId = "region_replica_replication"; ReplicationPeerConfig peerConfig = null; try { - peerConfig = admin.getPeerConfig(peerId); + peerConfig = admin.getReplicationPeerConfig(peerId); } catch (ReplicationPeerNotFoundException e) { LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); } if (peerConfig != null) { - admin.removePeer(peerId); + admin.removeReplicationPeer(peerId); peerConfig = null; } @@ -143,7 +143,7 @@ public class TestRegionReplicaReplicationEndpoint { "testReplicationPeerIsCreated_no_region_replicas"); HTU.getAdmin().createTable(htd); try { - peerConfig = admin.getPeerConfig(peerId); + peerConfig = admin.getReplicationPeerConfig(peerId); fail("Should throw ReplicationException, because replication peer id=" + peerId + " not exist"); } catch (ReplicationPeerNotFoundException e) { @@ -155,7 +155,7 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getAdmin().createTable(htd); // assert peer configuration is correct - peerConfig = admin.getPeerConfig(peerId); + peerConfig = admin.getReplicationPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); @@ -168,18 +168,18 @@ public class TestRegionReplicaReplicationEndpoint { public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { // modify a table by adding region replicas. Check whether the replication peer is created // and replication started. - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); String peerId = "region_replica_replication"; ReplicationPeerConfig peerConfig = null; try { - peerConfig = admin.getPeerConfig(peerId); + peerConfig = admin.getReplicationPeerConfig(peerId); } catch (ReplicationPeerNotFoundException e) { LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); } if (peerConfig != null) { - admin.removePeer(peerId); + admin.removeReplicationPeer(peerId); peerConfig = null; } @@ -189,7 +189,7 @@ public class TestRegionReplicaReplicationEndpoint { // assert that replication peer is not created yet try { - peerConfig = admin.getPeerConfig(peerId); + peerConfig = admin.getReplicationPeerConfig(peerId); fail("Should throw ReplicationException, because replication peer id=" + peerId + " not exist"); } catch (ReplicationPeerNotFoundException e) { @@ -202,7 +202,7 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getAdmin().enableTable(htd.getTableName()); // assert peer configuration is correct - peerConfig = admin.getPeerConfig(peerId); + peerConfig = admin.getReplicationPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); @@ -405,8 +405,8 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getAdmin().createTable(htd); // both tables are created, now pause replication - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); - admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); + admin.disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId()); // now that the replication is disabled, write to the table to be dropped, then drop the table. @@ -465,7 +465,7 @@ public class TestRegionReplicaReplicationEndpoint { HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication - admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + admin.enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId()); verifyReplication(tableName, regionReplication, 0, 1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index bff363f986e..099fbc38b28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -70,10 +70,9 @@ public class TestReplicator extends TestReplicationBase { truncateTable(UTIL2, tableName); // Replace the peer set up for us by the base class with a wrapper for this test - admin.addPeer("testReplicatorBatching", + hbaseAdmin.addReplicationPeer("testReplicatorBatching", new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) - .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), - null); + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())); ReplicationEndpointForTest.setBatchCount(0); ReplicationEndpointForTest.setEntriesCount(0); @@ -109,7 +108,7 @@ public class TestReplicator extends TestReplicationBase { ReplicationEndpointForTest.getBatchCount()); assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); } finally { - admin.removePeer("testReplicatorBatching"); + hbaseAdmin.removeReplicationPeer("testReplicatorBatching"); } } @@ -120,10 +119,9 @@ public class TestReplicator extends TestReplicationBase { truncateTable(UTIL2, tableName); // Replace the peer set up for us by the base class with a wrapper for this test - admin.addPeer("testReplicatorWithErrors", + hbaseAdmin.addReplicationPeer("testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) - .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), - null); + .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName())); FailureInjectingReplicationEndpointForTest.setBatchCount(0); FailureInjectingReplicationEndpointForTest.setEntriesCount(0); @@ -157,7 +155,7 @@ public class TestReplicator extends TestReplicationBase { assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); } finally { - admin.removePeer("testReplicatorWithErrors"); + hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors"); } }