HBASE-19303 Removed ReplicationAdmin and all its usages
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
47a96444da
commit
ed30909d27
|
@ -1,396 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.replication;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This class provides the administrative interface to HBase cluster
|
||||
* replication.
|
||||
* </p>
|
||||
* <p>
|
||||
* Adding a new peer results in creating new outbound connections from every
|
||||
* region server to a subset of region servers on the slave cluster. Each
|
||||
* new stream of replication will start replicating from the beginning of the
|
||||
* current WAL, meaning that edits from that past will be replicated.
|
||||
* </p>
|
||||
* <p>
|
||||
* Removing a peer is a destructive and irreversible operation that stops
|
||||
* all the replication streams for the given cluster and deletes the metadata
|
||||
* used to keep track of the replication state.
|
||||
* </p>
|
||||
* <p>
|
||||
* To see which commands are available in the shell, type
|
||||
* <code>replication</code>.
|
||||
* </p>
|
||||
*
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin} instead.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@Deprecated
|
||||
public class ReplicationAdmin implements Closeable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationAdmin.class);
|
||||
|
||||
public static final String TNAME = "tableName";
|
||||
public static final String CFNAME = "columnFamilyName";
|
||||
|
||||
// only Global for now, can add other type
|
||||
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
|
||||
public static final String REPLICATIONTYPE = "replicationType";
|
||||
public static final String REPLICATIONGLOBAL = Integer
|
||||
.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
|
||||
private final Connection connection;
|
||||
private Admin admin;
|
||||
|
||||
/**
|
||||
* Constructor that creates a connection to the local ZooKeeper ensemble.
|
||||
* @param conf Configuration to use
|
||||
* @throws IOException if an internal replication error occurs
|
||||
* @throws RuntimeException if replication isn't enabled.
|
||||
*/
|
||||
public ReplicationAdmin(Configuration conf) throws IOException {
|
||||
this.connection = ConnectionFactory.createConnection(conf);
|
||||
admin = connection.getAdmin();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new remote slave cluster for replication.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param tableCfs the table and column-family list which will be replicated for this peer.
|
||||
* A map from tableName to column family names. An empty collection can be passed
|
||||
* to indicate replicating all column families. Pass null for replicating all table and column
|
||||
* families
|
||||
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
|
||||
* use {@link #addPeer(String, ReplicationPeerConfig)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public void addPeer(String id, ReplicationPeerConfig peerConfig,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
|
||||
IOException {
|
||||
if (tableCfs != null) {
|
||||
peerConfig.setTableCFsMap(tableCfs);
|
||||
}
|
||||
this.admin.addReplicationPeer(id, peerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new remote slave cluster for replication.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @deprecated use
|
||||
* {@link org.apache.hadoop.hbase.client.Admin#addReplicationPeer(String, ReplicationPeerConfig)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException,
|
||||
IOException {
|
||||
this.admin.addReplicationPeer(id, peerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0
|
||||
* */
|
||||
@Deprecated
|
||||
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
|
||||
return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use
|
||||
* {@link org.apache.hadoop.hbase.client.Admin#updateReplicationPeerConfig(String, ReplicationPeerConfig)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws IOException {
|
||||
this.admin.updateReplicationPeerConfig(id, peerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a peer cluster and stops the replication to it.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#removeReplicationPeer(String)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void removePeer(String id) throws IOException {
|
||||
this.admin.removeReplicationPeer(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the replication stream to the specified peer.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableReplicationPeer(String)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void enablePeer(String id) throws IOException {
|
||||
this.admin.enableReplicationPeer(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the replication stream to the specified peer.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableReplicationPeer(String)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void disablePeer(String id) throws IOException {
|
||||
this.admin.disableReplicationPeer(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of slave clusters the local cluster has.
|
||||
* @return number of slave clusters
|
||||
* @throws IOException
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public int getPeersCount() throws IOException {
|
||||
return this.admin.listReplicationPeers().size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public Map<String, ReplicationPeerConfig> listPeerConfigs() throws IOException {
|
||||
List<ReplicationPeerDescription> peers = this.admin.listReplicationPeers();
|
||||
Map<String, ReplicationPeerConfig> result = new TreeMap<>();
|
||||
for (ReplicationPeerDescription peer : peers) {
|
||||
result.put(peer.getPeerId(), peer.getPeerConfig());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#getReplicationPeerConfig(String)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public ReplicationPeerConfig getPeerConfig(String id) throws IOException {
|
||||
return admin.getReplicationPeerConfig(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replicable table-cf config of the specified peer.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
|
||||
* use {@link #getPeerConfig(String)} instead.
|
||||
* */
|
||||
@Deprecated
|
||||
public String getPeerTableCFs(String id) throws IOException {
|
||||
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
|
||||
return ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap());
|
||||
}
|
||||
|
||||
/**
|
||||
* Append the replicable table-cf config of the specified peer
|
||||
* @param id a short that identifies the cluster
|
||||
* @param tableCfs table-cfs config str
|
||||
* @throws ReplicationException
|
||||
* @throws IOException
|
||||
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
|
||||
* use {@link #appendPeerTableCFs(String, Map)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException,
|
||||
IOException {
|
||||
appendPeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Append the replicable table-cf config of the specified peer
|
||||
* @param id a short that identifies the cluster
|
||||
* @param tableCfs A map from tableName to column family names
|
||||
* @throws ReplicationException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Deprecated
|
||||
public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws ReplicationException, IOException {
|
||||
this.admin.appendReplicationPeerTableCFs(id, copyTableCFs(tableCfs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove some table-cfs from table-cfs config of the specified peer
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param tableCf table-cfs config str
|
||||
* @throws ReplicationException
|
||||
* @throws IOException
|
||||
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
|
||||
* use {@link #removePeerTableCFs(String, Map)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public void removePeerTableCFs(String id, String tableCf) throws ReplicationException,
|
||||
IOException {
|
||||
removePeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove some table-cfs from config of the specified peer
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param tableCfs A map from tableName to column family names
|
||||
* @throws ReplicationException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Deprecated
|
||||
public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws ReplicationException, IOException {
|
||||
this.admin.removeReplicationPeerTableCFs(id, copyTableCFs(tableCfs));
|
||||
}
|
||||
|
||||
private Map<TableName, List<String>>
|
||||
copyTableCFs(Map<TableName, ? extends Collection<String>> tableCfs) {
|
||||
Map<TableName, List<String>> newTableCfs = new HashMap<>();
|
||||
if (tableCfs != null) {
|
||||
tableCfs.forEach(
|
||||
(table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
|
||||
}
|
||||
return newTableCfs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the replicable table-cf config of the specified peer
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param tableCfs the table and column-family list which will be replicated for this peer.
|
||||
* A map from tableName to column family names. An empty collection can be passed
|
||||
* to indicate replicating all column families. Pass null for replicating all table and column
|
||||
* families
|
||||
*/
|
||||
@Deprecated
|
||||
public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws IOException {
|
||||
ReplicationPeerConfig peerConfig = getPeerConfig(id);
|
||||
peerConfig.setTableCFsMap(tableCfs);
|
||||
updatePeerConfig(id, peerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the state of the specified peer cluster
|
||||
* @param id String format of the Short name that identifies the peer,
|
||||
* an IllegalArgumentException is thrown if it doesn't exist
|
||||
* @return true if replication is enabled to that peer, false if it isn't
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean getPeerState(String id) throws ReplicationException, IOException {
|
||||
List<ReplicationPeerDescription> peers = admin.listReplicationPeers(Pattern.compile(id));
|
||||
if (peers.isEmpty() || !id.equals(peers.get(0).getPeerId())) {
|
||||
throw new ReplicationPeerNotFoundException(id);
|
||||
}
|
||||
return peers.get(0).isEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (this.connection != null) {
|
||||
this.connection.close();
|
||||
}
|
||||
admin.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all column families that are replicated from this cluster
|
||||
* @return the full list of the replicated column families of this cluster as:
|
||||
* tableName, family name, replicationType
|
||||
*
|
||||
* Currently replicationType is Global. In the future, more replication
|
||||
* types may be extended here. For example
|
||||
* 1) the replication may only apply to selected peers instead of all peers
|
||||
* 2) the replicationType may indicate the host Cluster servers as Slave
|
||||
* for the table:columnFam.
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicatedTableCFs()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public List<HashMap<String, String>> listReplicated() throws IOException {
|
||||
List<HashMap<String, String>> replicationColFams = new ArrayList<>();
|
||||
admin.listReplicatedTableCFs().forEach(
|
||||
(tableCFs) -> {
|
||||
String table = tableCFs.getTable().getNameAsString();
|
||||
tableCFs.getColumnFamilyMap()
|
||||
.forEach(
|
||||
(cf, scope) -> {
|
||||
HashMap<String, String> replicationEntry = new HashMap<>();
|
||||
replicationEntry.put(TNAME, table);
|
||||
replicationEntry.put(CFNAME, cf);
|
||||
replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
|
||||
replicationColFams.add(replicationEntry);
|
||||
});
|
||||
});
|
||||
return replicationColFams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable a table's replication switch.
|
||||
* @param tableName name of the table
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableTableReplication(TableName)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void enableTableRep(final TableName tableName) throws IOException {
|
||||
admin.enableTableReplication(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disable a table's replication switch.
|
||||
* @param tableName name of the table
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableTableReplication(TableName)}
|
||||
* instead
|
||||
*/
|
||||
@Deprecated
|
||||
public void disableTableRep(final TableName tableName) throws IOException {
|
||||
admin.disableTableReplication(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Deprecated
|
||||
List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
|
||||
return admin.listReplicationPeers();
|
||||
}
|
||||
}
|
|
@ -176,8 +176,7 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
|
|||
/**
|
||||
* This tears down any tables that existed from before and rebuilds the tables and schemas on
|
||||
* the source cluster. It then sets up replication from the source to the sink cluster by using
|
||||
* the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin}
|
||||
* connection.
|
||||
* the {@link org.apache.hadoop.hbase.client.Admin} connection.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.RegionLocations;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
|
@ -396,11 +395,11 @@ public class TestReplicaWithCluster {
|
|||
LOG.info("Setup second Zk");
|
||||
HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
|
||||
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
|
||||
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin();
|
||||
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(HTU2.getClusterKey());
|
||||
admin.addPeer("2", rpc, null);
|
||||
admin.addReplicationPeer("2", rpc);
|
||||
admin.close();
|
||||
|
||||
Put p = new Put(row);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,336 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.replication;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
/**
|
||||
* Unit testing of ReplicationAdmin with clusters
|
||||
*/
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class);
|
||||
|
||||
static Connection connection1;
|
||||
static Connection connection2;
|
||||
static Admin admin1;
|
||||
static Admin admin2;
|
||||
static ReplicationAdmin adminExt;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TestReplicationBase.setUpBeforeClass();
|
||||
connection1 = ConnectionFactory.createConnection(CONF1);
|
||||
connection2 = ConnectionFactory.createConnection(CONF2);
|
||||
admin1 = connection1.getAdmin();
|
||||
admin2 = connection2.getAdmin();
|
||||
adminExt = new ReplicationAdmin(CONF1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
admin1.close();
|
||||
admin2.close();
|
||||
adminExt.close();
|
||||
connection1.close();
|
||||
connection2.close();
|
||||
TestReplicationBase.tearDownAfterClass();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void disableNotFullReplication() throws Exception {
|
||||
HTableDescriptor table = new HTableDescriptor(admin2.getDescriptor(tableName));
|
||||
HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily");
|
||||
table.addFamily(f);
|
||||
admin1.disableTable(tableName);
|
||||
admin1.modifyTable(table);
|
||||
admin1.enableTable(tableName);
|
||||
|
||||
admin1.disableTableReplication(tableName);
|
||||
table = new HTableDescriptor(admin1.getDescriptor(tableName));
|
||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||
assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
|
||||
}
|
||||
|
||||
admin1.deleteColumnFamily(table.getTableName(), f.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
|
||||
admin1.disableTableReplication(tableName);
|
||||
admin2.disableTable(tableName);
|
||||
admin2.deleteTable(tableName);
|
||||
assertFalse(admin2.tableExists(tableName));
|
||||
admin1.enableTableReplication(tableName);
|
||||
assertTrue(admin2.tableExists(tableName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableReplicationWhenReplicationNotEnabled() throws Exception {
|
||||
HTableDescriptor table = new HTableDescriptor(admin1.getDescriptor(tableName));
|
||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
|
||||
}
|
||||
admin1.disableTable(tableName);
|
||||
admin1.modifyTable(table);
|
||||
admin1.enableTable(tableName);
|
||||
|
||||
admin2.disableTable(tableName);
|
||||
admin2.modifyTable(table);
|
||||
admin2.enableTable(tableName);
|
||||
|
||||
admin1.enableTableReplication(tableName);
|
||||
table = new HTableDescriptor(admin1.getDescriptor(tableName));
|
||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
|
||||
HTableDescriptor table = new HTableDescriptor(admin2.getDescriptor(tableName));
|
||||
HColumnDescriptor f = new HColumnDescriptor("newFamily");
|
||||
table.addFamily(f);
|
||||
admin2.disableTable(tableName);
|
||||
admin2.modifyTable(table);
|
||||
admin2.enableTable(tableName);
|
||||
|
||||
try {
|
||||
admin1.enableTableReplication(tableName);
|
||||
fail("Exception should be thrown if table descriptors in the clusters are not same.");
|
||||
} catch (RuntimeException ignored) {
|
||||
|
||||
}
|
||||
admin1.disableTable(tableName);
|
||||
admin1.modifyTable(table);
|
||||
admin1.enableTable(tableName);
|
||||
admin1.enableTableReplication(tableName);
|
||||
table = new HTableDescriptor(admin1.getDescriptor(tableName));
|
||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
|
||||
}
|
||||
|
||||
admin1.deleteColumnFamily(tableName, f.getName());
|
||||
admin2.deleteColumnFamily(tableName, f.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableAndEnableReplication() throws Exception {
|
||||
admin1.disableTableReplication(tableName);
|
||||
HTableDescriptor table = new HTableDescriptor(admin1.getDescriptor(tableName));
|
||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||
assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
|
||||
}
|
||||
admin1.enableTableReplication(tableName);
|
||||
table = new HTableDescriptor(admin1.getDescriptor(tableName));
|
||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableReplicationForTableWithRegionReplica() throws Exception {
|
||||
TableName tn = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(tn)
|
||||
.setRegionReplication(5)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build())
|
||||
.build();
|
||||
|
||||
admin1.createTable(td);
|
||||
|
||||
try {
|
||||
admin1.enableTableReplication(tn);
|
||||
td = admin1.getDescriptor(tn);
|
||||
for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) {
|
||||
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
|
||||
}
|
||||
} finally {
|
||||
UTIL1.deleteTable(tn);
|
||||
UTIL2.deleteTable(tn);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = TableNotFoundException.class)
|
||||
public void testDisableReplicationForNonExistingTable() throws Exception {
|
||||
admin1.disableTableReplication(TableName.valueOf(name.getMethodName()));
|
||||
}
|
||||
|
||||
@Test(expected = TableNotFoundException.class)
|
||||
public void testEnableReplicationForNonExistingTable() throws Exception {
|
||||
admin1.enableTableReplication(TableName.valueOf(name.getMethodName()));
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testDisableReplicationWhenTableNameAsNull() throws Exception {
|
||||
admin1.disableTableReplication(null);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testEnableReplicationWhenTableNameAsNull() throws Exception {
|
||||
admin1.enableTableReplication(null);
|
||||
}
|
||||
|
||||
/*
|
||||
* Test enable table replication should create table only in user explicit specified table-cfs.
|
||||
* HBASE-14717
|
||||
*/
|
||||
@Test
|
||||
public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
String peerId = "2";
|
||||
if (admin2.isTableAvailable(TestReplicationBase.tableName)) {
|
||||
admin2.disableTable(TestReplicationBase.tableName);
|
||||
admin2.deleteTable(TestReplicationBase.tableName);
|
||||
}
|
||||
assertFalse("Table should not exists in the peer cluster",
|
||||
admin2.isTableAvailable(TestReplicationBase.tableName));
|
||||
|
||||
// update peer config
|
||||
ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
|
||||
rpc.setReplicateAllUserTables(false);
|
||||
admin1.updateReplicationPeerConfig(peerId, rpc);
|
||||
|
||||
Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
|
||||
tableCfs.put(tableName, null);
|
||||
try {
|
||||
adminExt.setPeerTableCFs(peerId, tableCfs);
|
||||
admin1.enableTableReplication(TestReplicationBase.tableName);
|
||||
assertFalse("Table should not be created if user has set table cfs explicitly for the "
|
||||
+ "peer and this is not part of that collection",
|
||||
admin2.isTableAvailable(TestReplicationBase.tableName));
|
||||
|
||||
tableCfs.put(TestReplicationBase.tableName, null);
|
||||
adminExt.setPeerTableCFs(peerId, tableCfs);
|
||||
admin1.enableTableReplication(TestReplicationBase.tableName);
|
||||
assertTrue(
|
||||
"Table should be created if user has explicitly added table into table cfs collection",
|
||||
admin2.isTableAvailable(TestReplicationBase.tableName));
|
||||
} finally {
|
||||
adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
|
||||
admin1.disableTableReplication(TestReplicationBase.tableName);
|
||||
|
||||
rpc = admin1.getReplicationPeerConfig(peerId);
|
||||
rpc.setReplicateAllUserTables(true);
|
||||
admin1.updateReplicationPeerConfig(peerId, rpc);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationPeerConfigUpdateCallback() throws Exception {
|
||||
String peerId = "1";
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(UTIL2.getClusterKey());
|
||||
rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
|
||||
rpc.getConfiguration().put("key1", "value1");
|
||||
|
||||
admin1.addReplicationPeer(peerId, rpc);
|
||||
|
||||
rpc.getConfiguration().put("key1", "value2");
|
||||
admin.updatePeerConfig(peerId, rpc);
|
||||
if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
|
||||
synchronized (TestUpdatableReplicationEndpoint.class) {
|
||||
TestUpdatableReplicationEndpoint.class.wait(2000L);
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
|
||||
|
||||
admin.removePeer(peerId);
|
||||
}
|
||||
|
||||
public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
|
||||
private static boolean calledBack = false;
|
||||
public static boolean hasCalledBack(){
|
||||
return calledBack;
|
||||
}
|
||||
@Override
|
||||
public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){
|
||||
calledBack = true;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
notifyStarted();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
notifyStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UUID getPeerUUID() {
|
||||
return UTIL1.getRandomUUID();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicate(ReplicateContext replicateContext) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,108 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.replication;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestReplicationAdminWithTwoDifferentZKClusters {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationAdminWithTwoDifferentZKClusters.class);
|
||||
|
||||
private static Configuration conf1 = HBaseConfiguration.create();
|
||||
private static Configuration conf2;
|
||||
|
||||
private static HBaseTestingUtility utility1;
|
||||
private static HBaseTestingUtility utility2;
|
||||
private static ReplicationAdmin admin;
|
||||
|
||||
private static final TableName tableName = TableName.valueOf("test");
|
||||
private static final byte[] famName = Bytes.toBytes("f");
|
||||
private static final String peerId = "peer1";
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
utility1 = new HBaseTestingUtility(conf1);
|
||||
utility1.startMiniCluster();
|
||||
admin = new ReplicationAdmin(conf1);
|
||||
|
||||
conf2 = HBaseConfiguration.create(conf1);
|
||||
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||
conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182);
|
||||
|
||||
utility2 = new HBaseTestingUtility(conf2);
|
||||
utility2.startMiniCluster();
|
||||
|
||||
ReplicationPeerConfig config = new ReplicationPeerConfig();
|
||||
config.setClusterKey(utility2.getClusterKey());
|
||||
admin.addPeer(peerId, config, null);
|
||||
|
||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
table.addFamily(fam);
|
||||
|
||||
utility1.getAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
utility1.waitUntilAllRegionsAssigned(tableName);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
admin.removePeer(peerId);
|
||||
admin.close();
|
||||
utility1.deleteTable(tableName);
|
||||
utility2.deleteTable(tableName);
|
||||
utility2.shutdownMiniCluster();
|
||||
utility1.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/*
|
||||
* Test for HBASE-15393
|
||||
*/
|
||||
@Test
|
||||
public void testEnableTableReplication() throws Exception {
|
||||
admin.enableTableRep(tableName);
|
||||
assertTrue(utility2.getAdmin().tableExists(tableName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableTableReplication() throws Exception {
|
||||
admin.disableTableRep(tableName);
|
||||
assertTrue(utility2.getAdmin().tableExists(tableName));
|
||||
}
|
||||
}
|
|
@ -36,12 +36,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
|
@ -136,7 +136,7 @@ public class TestMultiSlaveReplication {
|
|||
MiniHBaseCluster master = utility1.startMiniCluster();
|
||||
utility2.startMiniCluster();
|
||||
utility3.startMiniCluster();
|
||||
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
|
||||
Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin();
|
||||
|
||||
utility1.getAdmin().createTable(table);
|
||||
utility2.getAdmin().createTable(table);
|
||||
|
@ -147,7 +147,7 @@ public class TestMultiSlaveReplication {
|
|||
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(utility2.getClusterKey());
|
||||
admin1.addPeer("1", rpc, null);
|
||||
admin1.addReplicationPeer("1", rpc);
|
||||
|
||||
// put "row" and wait 'til it got around, then delete
|
||||
putAndWait(row, famName, htable1, htable2);
|
||||
|
@ -165,7 +165,7 @@ public class TestMultiSlaveReplication {
|
|||
|
||||
rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(utility3.getClusterKey());
|
||||
admin1.addPeer("2", rpc, null);
|
||||
admin1.addReplicationPeer("2", rpc);
|
||||
|
||||
// put a row, check it was replicated to all clusters
|
||||
putAndWait(row1, famName, htable1, htable2, htable3);
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||
|
@ -374,7 +373,7 @@ public class TestPerTableCFReplication {
|
|||
@Test
|
||||
public void testPerTableCFReplication() throws Exception {
|
||||
LOG.info("testPerTableCFReplication");
|
||||
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
|
||||
Admin replicationAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
|
||||
Connection connection1 = ConnectionFactory.createConnection(conf1);
|
||||
Connection connection2 = ConnectionFactory.createConnection(conf2);
|
||||
Connection connection3 = ConnectionFactory.createConnection(conf3);
|
||||
|
@ -406,25 +405,25 @@ public class TestPerTableCFReplication {
|
|||
Table htab3C = connection3.getTable(tabCName);
|
||||
|
||||
// A. add cluster2/cluster3 as peers to cluster1
|
||||
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
|
||||
rpc2.setClusterKey(utility2.getClusterKey());
|
||||
rpc2.setReplicateAllUserTables(false);
|
||||
Map<TableName, List<String>> tableCFs = new HashMap<>();
|
||||
tableCFs.put(tabCName, null);
|
||||
tableCFs.put(tabBName, new ArrayList<>());
|
||||
tableCFs.get(tabBName).add("f1");
|
||||
tableCFs.get(tabBName).add("f3");
|
||||
replicationAdmin.addPeer("2", rpc2, tableCFs);
|
||||
ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(false)
|
||||
.setTableCFsMap(tableCFs).build();
|
||||
replicationAdmin.addReplicationPeer("2", rpc2);
|
||||
|
||||
ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
|
||||
rpc3.setClusterKey(utility3.getClusterKey());
|
||||
rpc3.setReplicateAllUserTables(false);
|
||||
tableCFs.clear();
|
||||
tableCFs.put(tabAName, null);
|
||||
tableCFs.put(tabBName, new ArrayList<>());
|
||||
tableCFs.get(tabBName).add("f1");
|
||||
tableCFs.get(tabBName).add("f2");
|
||||
replicationAdmin.addPeer("3", rpc3, tableCFs);
|
||||
ReplicationPeerConfig rpc3 = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(utility3.getClusterKey()).setReplicateAllUserTables(false)
|
||||
.setTableCFsMap(tableCFs).build();
|
||||
replicationAdmin.addReplicationPeer("3", rpc3);
|
||||
|
||||
// A1. tableA can only replicated to cluster3
|
||||
putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
|
||||
|
@ -474,13 +473,17 @@ public class TestPerTableCFReplication {
|
|||
tableCFs.put(tabCName, new ArrayList<>());
|
||||
tableCFs.get(tabCName).add("f2");
|
||||
tableCFs.get(tabCName).add("f3");
|
||||
replicationAdmin.setPeerTableCFs("2", tableCFs);
|
||||
replicationAdmin.updateReplicationPeerConfig("2",
|
||||
ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2"))
|
||||
.setTableCFsMap(tableCFs).build());
|
||||
|
||||
tableCFs.clear();
|
||||
tableCFs.put(tabBName, null);
|
||||
tableCFs.put(tabCName, new ArrayList<>());
|
||||
tableCFs.get(tabCName).add("f3");
|
||||
replicationAdmin.setPeerTableCFs("3", tableCFs);
|
||||
replicationAdmin.updateReplicationPeerConfig("3",
|
||||
ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3"))
|
||||
.setTableCFsMap(tableCFs).build());
|
||||
|
||||
// B1. cf 'f1' of tableA can only replicated to cluster2
|
||||
putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
|
@ -69,7 +68,6 @@ public class TestReplicationBase {
|
|||
|
||||
protected static Configuration CONF_WITH_LOCALFS;
|
||||
|
||||
protected static ReplicationAdmin admin;
|
||||
protected static Admin hbaseAdmin;
|
||||
|
||||
protected static Table htable1;
|
||||
|
@ -215,7 +213,6 @@ public class TestReplicationBase {
|
|||
protected static void startClusters() throws Exception {
|
||||
UTIL1.startMiniZKCluster();
|
||||
MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
|
||||
admin = new ReplicationAdmin(CONF1);
|
||||
LOG.info("Setup first Zk");
|
||||
|
||||
UTIL2.setZkCluster(miniZK);
|
||||
|
@ -342,7 +339,6 @@ public class TestReplicationBase {
|
|||
public static void tearDownAfterClass() throws Exception {
|
||||
htable2.close();
|
||||
htable1.close();
|
||||
admin.close();
|
||||
UTIL2.shutdownMiniCluster();
|
||||
UTIL1.shutdownMiniCluster();
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
|
|||
Thread.sleep(SLEEP_TIME * NB_RETRIES);
|
||||
|
||||
// disable and start the peer
|
||||
admin.disablePeer("2");
|
||||
hbaseAdmin.disableReplicationPeer("2");
|
||||
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
|
||||
UTIL2.startMiniHBaseCluster(option);
|
||||
Get get = new Get(rowkey);
|
||||
|
@ -80,7 +80,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
|
|||
}
|
||||
|
||||
// Test enable replication
|
||||
admin.enablePeer("2");
|
||||
hbaseAdmin.enableReplicationPeer("2");
|
||||
// wait since the sleep interval would be long
|
||||
Thread.sleep(SLEEP_TIME * NB_RETRIES);
|
||||
for (int i = 0; i < NB_RETRIES; i++) {
|
||||
|
|
|
@ -133,9 +133,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
@Test
|
||||
public void testCustomReplicationEndpoint() throws Exception {
|
||||
// test installing a custom replication endpoint other than the default one.
|
||||
admin.addPeer("testCustomReplicationEndpoint",
|
||||
hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
|
||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()));
|
||||
|
||||
// check whether the class has been constructed and started
|
||||
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
|
||||
|
@ -166,22 +166,22 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
|
||||
doAssert(Bytes.toBytes("row42"));
|
||||
|
||||
admin.removePeer("testCustomReplicationEndpoint");
|
||||
hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
|
||||
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
|
||||
Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
|
||||
int peerCount = admin.getPeersCount();
|
||||
int peerCount = hbaseAdmin.listReplicationPeers().size();
|
||||
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
|
||||
admin.addPeer(id,
|
||||
hbaseAdmin.addReplicationPeer(id,
|
||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
|
||||
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()));
|
||||
// This test is flakey and then there is so much stuff flying around in here its, hard to
|
||||
// debug. Peer needs to be up for the edit to make it across. This wait on
|
||||
// peer count seems to be a hack that has us not progress till peer is up.
|
||||
if (admin.getPeersCount() <= peerCount) {
|
||||
if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
|
||||
LOG.info("Waiting on peercount to go up from " + peerCount);
|
||||
Threads.sleep(100);
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
throw ReplicationEndpointReturningFalse.ex.get();
|
||||
}
|
||||
|
||||
admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
|
||||
hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -227,10 +227,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
}
|
||||
}
|
||||
|
||||
admin.addPeer(id,
|
||||
hbaseAdmin.addReplicationPeer(id,
|
||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
|
||||
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
|
||||
null);
|
||||
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()));
|
||||
|
||||
final int numEdits = totEdits;
|
||||
Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
|
||||
|
@ -247,7 +246,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
}
|
||||
});
|
||||
|
||||
admin.removePeer("testInterClusterReplication");
|
||||
hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
|
||||
UTIL1.deleteTableData(tableName);
|
||||
}
|
||||
|
||||
|
@ -260,7 +259,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||
EverythingPassesWALEntryFilter.class.getName() + "," +
|
||||
EverythingPassesWALEntryFilterSubclass.class.getName());
|
||||
admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
|
||||
hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
|
||||
// now replicate some data.
|
||||
try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
|
||||
doPut(connection, Bytes.toBytes("row1"));
|
||||
|
@ -278,7 +277,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
|
||||
//make sure our reflectively created filter is in the filter chain
|
||||
Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
|
||||
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
|
||||
hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
|
@ -289,7 +288,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
// test that we can create mutliple WALFilters reflectively
|
||||
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||
"IAmNotARealWalEntryFilter");
|
||||
admin.addPeer("testWALEntryFilterAddValidation", rpc);
|
||||
hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
|
@ -300,7 +299,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
// test that we can create mutliple WALFilters reflectively
|
||||
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||
"IAmNotARealWalEntryFilter");
|
||||
admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
|
||||
hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -239,7 +239,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Integration test for TestReplicationAdmin, removes and re-add a peer cluster
|
||||
* Removes and re-add a peer cluster
|
||||
*/
|
||||
@Test
|
||||
public void testAddAndRemoveClusters() throws Exception {
|
||||
|
@ -352,8 +352,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
* Test for HBASE-8663
|
||||
* <p>
|
||||
* Create two new Tables with colfamilies enabled for replication then run
|
||||
* ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
|
||||
* TestReplicationAdmin is a better place for this testing but it would need mocks.
|
||||
* {@link Admin#listReplicatedTableCFs()}. Finally verify the table:colfamilies.
|
||||
*/
|
||||
@Test
|
||||
public void testVerifyListReplicatedTable() throws Exception {
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
|
@ -81,7 +80,7 @@ public class TestReplicationWithTags {
|
|||
private static Configuration conf1 = HBaseConfiguration.create();
|
||||
private static Configuration conf2;
|
||||
|
||||
private static ReplicationAdmin replicationAdmin;
|
||||
private static Admin replicationAdmin;
|
||||
|
||||
private static Connection connection1;
|
||||
private static Connection connection2;
|
||||
|
@ -121,7 +120,7 @@ public class TestReplicationWithTags {
|
|||
// Have to reget conf1 in case zk cluster location different
|
||||
// than default
|
||||
conf1 = utility1.getConfiguration();
|
||||
replicationAdmin = new ReplicationAdmin(conf1);
|
||||
replicationAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
|
||||
LOG.info("Setup first Zk");
|
||||
|
||||
// Base conf2 on conf1 so it gets the right zk cluster.
|
||||
|
@ -143,7 +142,7 @@ public class TestReplicationWithTags {
|
|||
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(utility2.getClusterKey());
|
||||
replicationAdmin.addPeer("2", rpc, null);
|
||||
replicationAdmin.addReplicationPeer("2", rpc);
|
||||
|
||||
HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
|
||||
HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
|
||||
|
|
|
@ -28,12 +28,13 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HTestConst;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
|
@ -98,17 +99,17 @@ public class TestGlobalReplicationThrottler {
|
|||
utility2.setZkCluster(miniZK);
|
||||
new ZKWatcher(conf2, "cluster2", null, true);
|
||||
|
||||
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
|
||||
Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin();
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(utility2.getClusterKey());
|
||||
|
||||
utility1.startMiniCluster();
|
||||
utility2.startMiniCluster();
|
||||
|
||||
admin1.addPeer("peer1", rpc, null);
|
||||
admin1.addPeer("peer2", rpc, null);
|
||||
admin1.addPeer("peer3", rpc, null);
|
||||
numOfPeer = admin1.getPeersCount();
|
||||
admin1.addReplicationPeer("peer1", rpc);
|
||||
admin1.addReplicationPeer("peer2", rpc);
|
||||
admin1.addReplicationPeer("peer3", rpc);
|
||||
numOfPeer = admin1.listReplicationPeers().size();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -42,12 +42,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
|
@ -124,18 +124,18 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
|
||||
// create a table with region replicas. Check whether the replication peer is created
|
||||
// and replication started.
|
||||
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
|
||||
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin();
|
||||
String peerId = "region_replica_replication";
|
||||
|
||||
ReplicationPeerConfig peerConfig = null;
|
||||
try {
|
||||
peerConfig = admin.getPeerConfig(peerId);
|
||||
peerConfig = admin.getReplicationPeerConfig(peerId);
|
||||
} catch (ReplicationPeerNotFoundException e) {
|
||||
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
|
||||
}
|
||||
|
||||
if (peerConfig != null) {
|
||||
admin.removePeer(peerId);
|
||||
admin.removeReplicationPeer(peerId);
|
||||
peerConfig = null;
|
||||
}
|
||||
|
||||
|
@ -143,7 +143,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
"testReplicationPeerIsCreated_no_region_replicas");
|
||||
HTU.getAdmin().createTable(htd);
|
||||
try {
|
||||
peerConfig = admin.getPeerConfig(peerId);
|
||||
peerConfig = admin.getReplicationPeerConfig(peerId);
|
||||
fail("Should throw ReplicationException, because replication peer id=" + peerId
|
||||
+ " not exist");
|
||||
} catch (ReplicationPeerNotFoundException e) {
|
||||
|
@ -155,7 +155,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
HTU.getAdmin().createTable(htd);
|
||||
|
||||
// assert peer configuration is correct
|
||||
peerConfig = admin.getPeerConfig(peerId);
|
||||
peerConfig = admin.getReplicationPeerConfig(peerId);
|
||||
assertNotNull(peerConfig);
|
||||
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
|
||||
HTU.getConfiguration()));
|
||||
|
@ -168,18 +168,18 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
|
||||
// modify a table by adding region replicas. Check whether the replication peer is created
|
||||
// and replication started.
|
||||
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
|
||||
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin();
|
||||
String peerId = "region_replica_replication";
|
||||
|
||||
ReplicationPeerConfig peerConfig = null;
|
||||
try {
|
||||
peerConfig = admin.getPeerConfig(peerId);
|
||||
peerConfig = admin.getReplicationPeerConfig(peerId);
|
||||
} catch (ReplicationPeerNotFoundException e) {
|
||||
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
|
||||
}
|
||||
|
||||
if (peerConfig != null) {
|
||||
admin.removePeer(peerId);
|
||||
admin.removeReplicationPeer(peerId);
|
||||
peerConfig = null;
|
||||
}
|
||||
|
||||
|
@ -189,7 +189,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
|
||||
// assert that replication peer is not created yet
|
||||
try {
|
||||
peerConfig = admin.getPeerConfig(peerId);
|
||||
peerConfig = admin.getReplicationPeerConfig(peerId);
|
||||
fail("Should throw ReplicationException, because replication peer id=" + peerId
|
||||
+ " not exist");
|
||||
} catch (ReplicationPeerNotFoundException e) {
|
||||
|
@ -202,7 +202,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
HTU.getAdmin().enableTable(htd.getTableName());
|
||||
|
||||
// assert peer configuration is correct
|
||||
peerConfig = admin.getPeerConfig(peerId);
|
||||
peerConfig = admin.getReplicationPeerConfig(peerId);
|
||||
assertNotNull(peerConfig);
|
||||
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
|
||||
HTU.getConfiguration()));
|
||||
|
@ -405,8 +405,8 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
HTU.getAdmin().createTable(htd);
|
||||
|
||||
// both tables are created, now pause replication
|
||||
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
|
||||
admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
|
||||
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin();
|
||||
admin.disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
|
||||
|
||||
// now that the replication is disabled, write to the table to be dropped, then drop the table.
|
||||
|
||||
|
@ -465,7 +465,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
|
||||
|
||||
// now enable the replication
|
||||
admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
|
||||
admin.enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
|
||||
|
||||
verifyReplication(tableName, regionReplication, 0, 1000);
|
||||
|
||||
|
|
|
@ -70,10 +70,9 @@ public class TestReplicator extends TestReplicationBase {
|
|||
truncateTable(UTIL2, tableName);
|
||||
|
||||
// Replace the peer set up for us by the base class with a wrapper for this test
|
||||
admin.addPeer("testReplicatorBatching",
|
||||
hbaseAdmin.addReplicationPeer("testReplicatorBatching",
|
||||
new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
|
||||
null);
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()));
|
||||
|
||||
ReplicationEndpointForTest.setBatchCount(0);
|
||||
ReplicationEndpointForTest.setEntriesCount(0);
|
||||
|
@ -109,7 +108,7 @@ public class TestReplicator extends TestReplicationBase {
|
|||
ReplicationEndpointForTest.getBatchCount());
|
||||
assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
|
||||
} finally {
|
||||
admin.removePeer("testReplicatorBatching");
|
||||
hbaseAdmin.removeReplicationPeer("testReplicatorBatching");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,10 +119,9 @@ public class TestReplicator extends TestReplicationBase {
|
|||
truncateTable(UTIL2, tableName);
|
||||
|
||||
// Replace the peer set up for us by the base class with a wrapper for this test
|
||||
admin.addPeer("testReplicatorWithErrors",
|
||||
hbaseAdmin.addReplicationPeer("testReplicatorWithErrors",
|
||||
new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
|
||||
.setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
|
||||
null);
|
||||
.setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()));
|
||||
|
||||
FailureInjectingReplicationEndpointForTest.setBatchCount(0);
|
||||
FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
|
||||
|
@ -157,7 +155,7 @@ public class TestReplicator extends TestReplicationBase {
|
|||
|
||||
assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
|
||||
} finally {
|
||||
admin.removePeer("testReplicatorWithErrors");
|
||||
hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue