HBASE-17443 Move listReplicated/enableTableRep/disableTableRep methods from ReplicationAdmin to Admin

This commit is contained in:
Guanghao Zhang 2017-01-24 13:40:29 +08:00
parent aa5d9a9ad3
commit 843ba9e545
5 changed files with 284 additions and 181 deletions

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
@ -1928,4 +1929,23 @@ public interface Admin extends Abortable, Closeable {
*/
void removeDrainFromRegionServers(List<ServerName> servers) throws IOException;
/**
* Find all table and column families that are replicated from this cluster
* @return the replicated table-cfs list of this cluster.
*/
List<TableCFs> listReplicatedTableCFs() throws IOException;
/**
* Enable a table's replication switch.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
*/
void enableTableReplication(final TableName tableName) throws IOException;
/**
* Disable a table's replication switch.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
*/
void disableTableReplication(final TableName tableName) throws IOException;
}

View File

@ -28,6 +28,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@ -43,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@ -3917,4 +3920,176 @@ public class HBaseAdmin implements Admin {
}
});
}
@Override
public List<TableCFs> listReplicatedTableCFs() throws IOException {
List<TableCFs> replicatedTableCFs = new ArrayList<>();
HTableDescriptor[] tables = listTables();
for (HTableDescriptor table : tables) {
HColumnDescriptor[] columns = table.getColumnFamilies();
Map<String, Integer> cfs = new HashMap<>();
for (HColumnDescriptor column : columns) {
if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
cfs.put(column.getNameAsString(), column.getScope());
}
}
if (!cfs.isEmpty()) {
replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
}
}
return replicatedTableCFs;
}
@Override
public void enableTableReplication(final TableName tableName) throws IOException {
if (tableName == null) {
throw new IllegalArgumentException("Table name cannot be null");
}
if (!tableExists(tableName)) {
throw new TableNotFoundException("Table '" + tableName.getNameAsString()
+ "' does not exists.");
}
byte[][] splits = getTableSplits(tableName);
checkAndSyncTableDescToPeers(tableName, splits);
setTableRep(tableName, true);
}
@Override
public void disableTableReplication(final TableName tableName) throws IOException {
if (tableName == null) {
throw new IllegalArgumentException("Table name is null");
}
if (!tableExists(tableName)) {
throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
+ "' does not exists.");
}
setTableRep(tableName, false);
}
/**
* Connect to peer and check the table descriptor on peer:
* <ol>
* <li>Create the same table on peer when not exist.</li>
* <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
* </ol>
* @param tableName name of the table to sync to the peer
* @param splits table split keys
* @throws IOException
*/
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
throws IOException {
List<ReplicationPeerDescription> peers = listReplicationPeers();
if (peers == null || peers.size() <= 0) {
throw new IllegalArgumentException("Found no peer cluster for replication.");
}
for (ReplicationPeerDescription peerDesc : peers) {
if (needToReplicate(tableName, peerDesc)) {
Configuration peerConf = getPeerClusterConfiguration(peerDesc);
try (Connection conn = ConnectionFactory.createConnection(peerConf);
Admin repHBaseAdmin = conn.getAdmin()) {
HTableDescriptor htd = getTableDescriptor(tableName);
HTableDescriptor peerHtd = null;
if (!repHBaseAdmin.tableExists(tableName)) {
repHBaseAdmin.createTable(htd, splits);
} else {
peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
if (peerHtd == null) {
throw new IllegalArgumentException("Failed to get table descriptor for table "
+ tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
} else if (!peerHtd.equals(htd)) {
throw new IllegalArgumentException("Table " + tableName.getNameAsString()
+ " exists in peer cluster " + peerDesc.getPeerId()
+ ", but the table descriptors are not same when compared with source cluster."
+ " Thus can not enable the table's replication switch.");
}
}
}
}
}
}
/**
* Decide whether the table need replicate to the peer cluster according to the peer config
* @param table name of the table
* @param peerConfig config for the peer
* @return true if the table need replicate to the peer cluster
*/
private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) {
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
Set<String> namespaces = peerConfig.getNamespaces();
Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap();
// If null means user has explicitly not configured any namespaces and table CFs
// so all the tables data are applicable for replication
if (namespaces == null && tableCFsMap == null) {
return true;
}
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
return true;
}
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
return true;
}
LOG.debug("Table " + table.getNameAsString()
+ " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey="
+ peerConfig.getClusterKey());
return false;
}
/**
* Set the table's replication switch if the table's replication switch is already not set.
* @param tableName name of the table
* @param isRepEnabled is replication switch enable or disable
* @throws IOException if a remote or network exception occurs
*/
private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
HTableDescriptor htd = getTableDescriptor(tableName);
if (isTableRepEnabled(htd) ^ isRepEnabled) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
: HConstants.REPLICATION_SCOPE_LOCAL);
}
modifyTable(tableName, htd);
}
}
/**
* @param htd table descriptor details for the table to check
* @return true if table's replication switch is enabled
*/
private boolean isTableRepEnabled(HTableDescriptor htd) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
&& hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
return false;
}
}
return true;
}
/**
* Returns the configuration needed to talk to the remote slave cluster.
* @param peer the description of replication peer
* @return the configuration for the peer cluster, null if it was unable to get the configuration
* @throws IOException
*/
private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer)
throws IOException {
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
}
if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}
return otherConf;
}
}

View File

@ -430,7 +430,6 @@ public class ReplicationAdmin implements Closeable {
admin.close();
}
/**
* Find all column families that are replicated from this cluster
* @return the full list of the replicated column families of this cluster as:
@ -441,36 +440,26 @@ public class ReplicationAdmin implements Closeable {
* 1) the replication may only apply to selected peers instead of all peers
* 2) the replicationType may indicate the host Cluster servers as Slave
* for the table:columnFam.
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicatedTableCFs()} instead
*/
@Deprecated
public List<HashMap<String, String>> listReplicated() throws IOException {
List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
Admin admin = connection.getAdmin();
HTableDescriptor[] tables;
try {
tables = admin.listTables();
} finally {
if (admin!= null) admin.close();
}
for (HTableDescriptor table : tables) {
HColumnDescriptor[] columns = table.getColumnFamilies();
String tableName = table.getNameAsString();
for (HColumnDescriptor column : columns) {
if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
// At this moment, the columfam is replicated to all peers
HashMap<String, String> replicationEntry = new HashMap<String, String>();
replicationEntry.put(TNAME, tableName);
replicationEntry.put(CFNAME, column.getNameAsString());
replicationEntry.put(REPLICATIONTYPE,
column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
REPLICATIONGLOBAL :
REPLICATIONSERIAL);
replicationColFams.add(replicationEntry);
}
}
}
List<HashMap<String, String>> replicationColFams = new ArrayList<>();
admin.listReplicatedTableCFs().forEach(
(tableCFs) -> {
String table = tableCFs.getTable().getNameAsString();
tableCFs.getColumnFamilyMap()
.forEach(
(cf, scope) -> {
HashMap<String, String> replicationEntry = new HashMap<String, String>();
replicationEntry.put(TNAME, table);
replicationEntry.put(CFNAME, cf);
replicationEntry.put(REPLICATIONTYPE,
scope == HConstants.REPLICATION_SCOPE_GLOBAL ? REPLICATIONGLOBAL
: REPLICATIONSERIAL);
replicationColFams.add(replicationEntry);
});
});
return replicationColFams;
}
@ -478,110 +467,24 @@ public class ReplicationAdmin implements Closeable {
* Enable a table's replication switch.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableTableReplication(TableName)}
* instead
*/
@Deprecated
public void enableTableRep(final TableName tableName) throws IOException {
if (tableName == null) {
throw new IllegalArgumentException("Table name cannot be null");
}
try (Admin admin = this.connection.getAdmin()) {
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Table '" + tableName.getNameAsString()
+ "' does not exists.");
}
}
byte[][] splits = getTableSplitRowKeys(tableName);
checkAndSyncTableDescToPeers(tableName, splits);
setTableRep(tableName, true);
admin.enableTableReplication(tableName);
}
/**
* Disable a table's replication switch.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableTableReplication(TableName)}
* instead
*/
@Deprecated
public void disableTableRep(final TableName tableName) throws IOException {
if (tableName == null) {
throw new IllegalArgumentException("Table name is null");
}
try (Admin admin = this.connection.getAdmin()) {
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
+ "' does not exists.");
}
}
setTableRep(tableName, false);
}
/**
* Get the split row keys of table
* @param tableName table name
* @return array of split row keys
* @throws IOException
*/
private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
try (RegionLocator locator = connection.getRegionLocator(tableName);) {
byte[][] startKeys = locator.getStartKeys();
if (startKeys.length == 1) {
return null;
}
byte[][] splits = new byte[startKeys.length - 1][];
for (int i = 1; i < startKeys.length; i++) {
splits[i - 1] = startKeys[i];
}
return splits;
}
}
/**
* Connect to peer and check the table descriptor on peer:
* <ol>
* <li>Create the same table on peer when not exist.</li>
* <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
* </ol>
* @param tableName name of the table to sync to the peer
* @param splits table split keys
* @throws IOException
*/
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
throws IOException {
List<ReplicationPeer> repPeers = listReplicationPeers();
if (repPeers == null || repPeers.size() <= 0) {
throw new IllegalArgumentException("Found no peer cluster for replication.");
}
final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
for (ReplicationPeer repPeer : repPeers) {
Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
// TODO Currently peer TableCFs will not include namespace so we need to check only for table
// name without namespace in it. Need to correct this logic once we fix HBASE-11386.
if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
continue;
}
Configuration peerConf = repPeer.getConfiguration();
HTableDescriptor htd = null;
try (Connection conn = ConnectionFactory.createConnection(peerConf);
Admin admin = this.connection.getAdmin();
Admin repHBaseAdmin = conn.getAdmin()) {
htd = admin.getTableDescriptor(tableName);
HTableDescriptor peerHtd = null;
if (!repHBaseAdmin.tableExists(tableName)) {
repHBaseAdmin.createTable(htd, splits);
} else {
peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
if (peerHtd == null) {
throw new IllegalArgumentException("Failed to get table descriptor for table "
+ tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
} else if (!peerHtd.equals(htd)) {
throw new IllegalArgumentException("Table " + tableName.getNameAsString()
+ " exists in peer cluster " + repPeer.getId()
+ ", but the table descriptors are not same when compared with source cluster."
+ " Thus can not enable the table's replication switch.");
}
}
}
}
admin.disableTableReplication(tableName);
}
@VisibleForTesting
@ -615,50 +518,6 @@ public class ReplicationAdmin implements Closeable {
return listOfPeers;
}
/**
* Set the table's replication switch if the table's replication switch is already not set.
* @param tableName name of the table
* @param isRepEnabled is replication switch enable or disable
* @throws IOException if a remote or network exception occurs
*/
private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
Admin admin = null;
try {
admin = this.connection.getAdmin();
HTableDescriptor htd = admin.getTableDescriptor(tableName);
if (isTableRepEnabled(htd) ^ isRepEnabled) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
: HConstants.REPLICATION_SCOPE_LOCAL);
}
admin.modifyTable(tableName, htd);
}
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
LOG.warn("Failed to close admin connection.");
LOG.debug("Details on failure to close admin connection.", e);
}
}
}
}
/**
* @param htd table descriptor details for the table to check
* @return true if table's replication switch is enabled
*/
private boolean isTableRepEnabled(HTableDescriptor htd) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
&& hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
return false;
}
}
return true;
}
/**
* Set a namespace in the peer config means that all tables in this namespace
* will be replicated to the peer cluster.

View File

@ -0,0 +1,49 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.replication;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Used by {@link org.apache.hadoop.hbase.client.Admin#listReplicatedTableCFs()}.
* The cfs is a map of <ColumnFamily, ReplicationScope>.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TableCFs {
private final TableName table;
private final Map<String, Integer> cfs;
public TableCFs(final TableName table, final Map<String, Integer> cfs) {
this.table = table;
this.cfs = cfs;
}
public TableName getTable() {
return this.table;
}
public Map<String, Integer> getColumnFamilyMap() {
return this.cfs;
}
}

View File

@ -75,7 +75,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin2.disableTable(tableName);
admin2.deleteTable(tableName);
assertFalse(admin2.tableExists(tableName));
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
assertTrue(admin2.tableExists(tableName));
}
@ -93,7 +93,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin2.modifyTable(tableName, table);
admin2.enableTable(tableName);
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL);
@ -110,7 +110,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin2.enableTable(tableName);
try {
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
fail("Exception should be thrown if table descriptors in the clusters are not same.");
} catch (RuntimeException ignored) {
@ -118,7 +118,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin1.disableTable(tableName);
admin1.modifyTable(tableName, table);
admin1.enableTable(tableName);
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL);
@ -127,7 +127,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
@Test(timeout = 300000)
public void testDisableAndEnableReplication() throws Exception {
adminExt.disableTableRep(tableName);
admin1.disableTableReplication(tableName);
HTableDescriptor table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL);
@ -136,7 +136,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL);
}
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL);
@ -145,24 +145,24 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
@Test(timeout = 300000, expected = TableNotFoundException.class)
public void testDisableReplicationForNonExistingTable() throws Exception {
adminExt.disableTableRep(TableName.valueOf("nonExistingTable"));
admin1.disableTableReplication(TableName.valueOf("nonExistingTable"));
}
@Test(timeout = 300000, expected = TableNotFoundException.class)
public void testEnableReplicationForNonExistingTable() throws Exception {
adminExt.enableTableRep(TableName.valueOf("nonExistingTable"));
admin1.enableTableReplication(TableName.valueOf("nonExistingTable"));
}
@Test(timeout = 300000, expected = IllegalArgumentException.class)
public void testDisableReplicationWhenTableNameAsNull() throws Exception {
adminExt.disableTableRep(null);
admin1.disableTableReplication(null);
}
@Test(timeout = 300000, expected = IllegalArgumentException.class)
public void testEnableReplicationWhenTableNameAsNull() throws Exception {
adminExt.enableTableRep(null);
admin1.enableTableReplication(null);
}
/*
* Test enable table replication should create table only in user explicit specified table-cfs.
* HBASE-14717
@ -182,20 +182,20 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
tableCfs.put(tn, null);
try {
adminExt.setPeerTableCFs(peerId, tableCfs);
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
assertFalse("Table should not be created if user has set table cfs explicitly for the "
+ "peer and this is not part of that collection",
admin2.isTableAvailable(tableName));
tableCfs.put(tableName, null);
adminExt.setPeerTableCFs(peerId, tableCfs);
adminExt.enableTableRep(tableName);
admin1.enableTableReplication(tableName);
assertTrue(
"Table should be created if user has explicitly added table into table cfs collection",
admin2.isTableAvailable(tableName));
} finally {
adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
adminExt.disableTableRep(tableName);
admin1.disableTableReplication(tableName);
}
}