HBASE-13057 Provide client utility to easily enable and disable table replication (Ashish Singhi)

This commit is contained in:
tedyu 2015-02-20 10:18:47 -08:00
parent 229b0774c0
commit 9a311303a8
6 changed files with 469 additions and 0 deletions

View File

@ -38,17 +38,25 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -501,4 +509,203 @@ public class ReplicationAdmin implements Closeable {
return replicationColFams;
}
/**
* Enable a table's replication switch.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
*/
public void enableTableRep(final TableName tableName) throws IOException {
if (tableName == null) {
throw new IllegalArgumentException("Table name cannot be null");
}
try (Admin admin = this.connection.getAdmin()) {
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Table '" + tableName.getNameAsString()
+ "' does not exists.");
}
}
byte[][] splits = getTableSplitRowKeys(tableName);
checkAndSyncTableDescToPeers(tableName, splits);
setTableRep(tableName, true);
}
/**
* Disable a table's replication switch.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
*/
public void disableTableRep(final TableName tableName) throws IOException {
if (tableName == null) {
throw new IllegalArgumentException("Table name is null");
}
try (Admin admin = this.connection.getAdmin()) {
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
+ "' does not exists.");
}
}
setTableRep(tableName, false);
}
/**
* Get the split row keys of table
* @param tableName table name
* @return array of split row keys
* @throws IOException
*/
private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
try (RegionLocator locator = connection.getRegionLocator(tableName);) {
byte[][] startKeys = locator.getStartKeys();
if (startKeys.length == 1) {
return null;
}
byte[][] splits = new byte[startKeys.length - 1][];
for (int i = 1; i < startKeys.length; i++) {
splits[i - 1] = startKeys[i];
}
return splits;
}
}
/**
* Connect to peer and check the table descriptor on peer:
* <ol>
* <li>Create the same table on peer when not exist.</li>
* <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
* </ol>
* @param tableName name of the table to sync to the peer
* @param splits table split keys
* @throws IOException
*/
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
throws IOException {
List<ReplicationPeer> repPeers = listValidReplicationPeers();
if (repPeers == null || repPeers.size() <= 0) {
throw new IllegalArgumentException("Found no peer cluster for replication.");
}
for (ReplicationPeer repPeer : repPeers) {
Configuration peerConf = repPeer.getConfiguration();
HTableDescriptor htd = null;
try (Connection conn = ConnectionFactory.createConnection(peerConf);
Admin admin = this.connection.getAdmin();
Admin repHBaseAdmin = conn.getAdmin()) {
htd = admin.getTableDescriptor(tableName);
HTableDescriptor peerHtd = null;
if (!repHBaseAdmin.tableExists(tableName)) {
repHBaseAdmin.createTable(htd, splits);
} else {
peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
if (peerHtd == null) {
throw new IllegalArgumentException("Failed to get table descriptor for table "
+ tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
} else if (!peerHtd.equals(htd)) {
throw new IllegalArgumentException("Table " + tableName.getNameAsString()
+ " exists in peer cluster " + repPeer.getId()
+ ", but the table descriptors are not same when comapred with source cluster."
+ " Thus can not enable the table's replication switch.");
}
}
}
}
}
private List<ReplicationPeer> listValidReplicationPeers() {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
if (peers == null || peers.size() <= 0) {
return null;
}
List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
String peerId = peerEntry.getKey();
String clusterKey = peerEntry.getValue().getClusterKey();
Configuration peerConf = new Configuration(this.connection.getConfiguration());
Stat s = null;
try {
ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
s =
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
null);
if (null == s) {
LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
continue;
}
validPeers.add(peer);
} catch (ReplicationException e) {
LOG.warn("Failed to get valid replication peers. "
+ "Error connecting to peer cluster with peerId=" + peerId);
LOG.debug("Failure details to get valid replication peers.", e);
continue;
} catch (KeeperException e) {
LOG.warn("Failed to get valid replication peers. KeeperException code="
+ e.code().intValue());
LOG.debug("Failure details to get valid replication peers.", e);
continue;
} catch (InterruptedException e) {
LOG.warn("Failed to get valid replication peers due to InterruptedException.");
LOG.debug("Failure details to get valid replication peers.", e);
Thread.currentThread().interrupt();
continue;
} catch (IOException e) {
LOG.warn("Failed to get valid replication peers due to IOException.");
LOG.debug("Failure details to get valid replication peers.", e);
continue;
}
}
return validPeers;
}
/**
* Set the table's replication switch if the table's replication switch is already not set.
* @param tableName name of the table
* @param isRepEnabled is replication switch enable or disable
* @throws IOException if a remote or network exception occurs
*/
private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
Admin admin = null;
try {
admin = this.connection.getAdmin();
HTableDescriptor htd = admin.getTableDescriptor(tableName);
if (isTableRepEnabled(htd) ^ isRepEnabled) {
boolean isOnlineSchemaUpdateEnabled =
this.connection.getConfiguration()
.getBoolean("hbase.online.schema.update.enable", true);
if (!isOnlineSchemaUpdateEnabled) {
admin.disableTable(tableName);
}
for (HColumnDescriptor hcd : htd.getFamilies()) {
hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
: HConstants.REPLICATION_SCOPE_LOCAL);
}
admin.modifyTable(tableName, htd);
if (!isOnlineSchemaUpdateEnabled) {
admin.enableTable(tableName);
}
}
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
LOG.warn("Failed to close admin connection.");
LOG.debug("Details on failure to close admin connection.", e);
}
}
}
}
/**
* @param htd table descriptor details for the table to check
* @return true if table's replication switch is enabled
*/
private boolean isTableRepEnabled(HTableDescriptor htd) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.client.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Unit testing of ReplicationAdmin with clusters
*/
@Category({ MediumTests.class, ClientTests.class })
public class TestReplicationAdminWithClusters extends TestReplicationBase {
static Connection connection1;
static Connection connection2;
static Admin admin1;
static Admin admin2;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
connection1 = ConnectionFactory.createConnection(conf1);
connection2 = ConnectionFactory.createConnection(conf2);
admin1 = connection1.getAdmin();
admin2 = connection2.getAdmin();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
admin1.close();
admin2.close();
connection1.close();
connection2.close();
TestReplicationBase.tearDownAfterClass();
}
@Test(timeout = 300000)
public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
admin2.disableTable(tableName);
admin2.deleteTable(tableName);
assertFalse(admin2.tableExists(tableName));
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.enableTableRep(tableName);
assertTrue(admin2.tableExists(tableName));
}
@Test(timeout = 300000)
public void testEnableReplicationWhenReplicationNotEnabled() throws Exception {
HTableDescriptor table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
}
admin1.disableTable(tableName);
admin1.modifyTable(tableName, table);
admin1.enableTable(tableName);
admin2.disableTable(tableName);
admin2.modifyTable(tableName, table);
admin2.enableTable(tableName);
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.enableTableRep(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL);
}
}
@Test(timeout = 300000)
public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
HTableDescriptor table = admin2.getTableDescriptor(tableName);
HColumnDescriptor f = new HColumnDescriptor("newFamily");
table.addFamily(f);
admin2.disableTable(tableName);
admin2.modifyTable(tableName, table);
admin2.enableTable(tableName);
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
try {
adminExt.enableTableRep(tableName);
fail("Exception should be thrown if table descriptors in the clusters are not same.");
} catch (RuntimeException ignored) {
}
admin1.disableTable(tableName);
admin1.modifyTable(tableName, table);
admin1.enableTable(tableName);
adminExt.enableTableRep(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL);
}
}
@Test(timeout = 300000)
public void testDisableAndEnableReplication() throws Exception {
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.disableTableRep(tableName);
HTableDescriptor table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL);
}
table = admin2.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL);
}
adminExt.enableTableRep(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL);
}
}
@Test(timeout = 300000, expected = TableNotFoundException.class)
public void testDisableReplicationForNonExistingTable() throws Exception {
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.disableTableRep(TableName.valueOf("nonExistingTable"));
}
@Test(timeout = 300000, expected = TableNotFoundException.class)
public void testEnableReplicationForNonExistingTable() throws Exception {
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.enableTableRep(TableName.valueOf("nonExistingTable"));
}
@Test(timeout = 300000, expected = IllegalArgumentException.class)
public void testDisableReplicationWhenTableNameAsNull() throws Exception {
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.disableTableRep(null);
}
@Test(timeout = 300000, expected = IllegalArgumentException.class)
public void testEnableReplicationWhenTableNameAsNull() throws Exception {
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
adminExt.enableTableRep(null);
}
}

View File

@ -23,6 +23,7 @@ java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
java_import org.apache.hadoop.hbase.util.Bytes
java_import org.apache.hadoop.hbase.zookeeper.ZKUtil
java_import org.apache.hadoop.hbase.TableName
# Wrapper for org.apache.hadoop.hbase.client.replication.ReplicationAdmin
@ -157,5 +158,17 @@ module Hbase
def remove_peer_tableCFs(id, tableCFs)
@replication_admin.removePeerTableCFs(id, tableCFs)
end
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@replication_admin.enableTableRep(tableName)
end
#----------------------------------------------------------------------------------------------
# Disables a table's replication switch
def disable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@replication_admin.disableTableRep(tableName)
end
end
end

View File

@ -349,6 +349,8 @@ Shell.load_command_group(
list_replicated_tables
append_peer_tableCFs
remove_peer_tableCFs
enable_table_replication
disable_table_replication
]
)

View File

@ -0,0 +1,42 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class DisableTableReplication< Command
def help
return <<-EOF
Disable a table's replication switch.
Examples:
hbase> disable_table_replication 'table_name'
EOF
end
def command(table_name)
format_simple_command do
replication_admin.disable_tablerep(table_name)
end
puts "The replication swith of table '#{table_name}' successfully disabled"
end
end
end
end

View File

@ -0,0 +1,42 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class EnableTableReplication< Command
def help
return <<-EOF
Enable a table's replication switch.
Examples:
hbase> enable_table_replication 'table_name'
EOF
end
def command(table_name)
format_simple_command do
replication_admin.enable_tablerep(table_name)
end
puts "The replication swith of table '#{table_name}' successfully enabled"
end
end
end
end