diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 2d5c5e9e63f..ca66fb3ec45 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -38,17 +38,25 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -501,4 +509,203 @@ public class ReplicationAdmin implements Closeable { return replicationColFams; } + + /** + * Enable a table's replication switch. + * @param tableName name of the table + * @throws IOException if a remote or network exception occurs + */ + public void enableTableRep(final TableName tableName) throws IOException { + if (tableName == null) { + throw new IllegalArgumentException("Table name cannot be null"); + } + try (Admin admin = this.connection.getAdmin()) { + if (!admin.tableExists(tableName)) { + throw new TableNotFoundException("Table '" + tableName.getNameAsString() + + "' does not exists."); + } + } + byte[][] splits = getTableSplitRowKeys(tableName); + checkAndSyncTableDescToPeers(tableName, splits); + setTableRep(tableName, true); + } + + /** + * Disable a table's replication switch. + * @param tableName name of the table + * @throws IOException if a remote or network exception occurs + */ + public void disableTableRep(final TableName tableName) throws IOException { + if (tableName == null) { + throw new IllegalArgumentException("Table name is null"); + } + try (Admin admin = this.connection.getAdmin()) { + if (!admin.tableExists(tableName)) { + throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() + + "' does not exists."); + } + } + setTableRep(tableName, false); + } + + /** + * Get the split row keys of table + * @param tableName table name + * @return array of split row keys + * @throws IOException + */ + private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException { + try (RegionLocator locator = connection.getRegionLocator(tableName);) { + byte[][] startKeys = locator.getStartKeys(); + if (startKeys.length == 1) { + return null; + } + byte[][] splits = new byte[startKeys.length - 1][]; + for (int i = 1; i < startKeys.length; i++) { + splits[i - 1] = startKeys[i]; + } + return splits; + } + } + + /** + * Connect to peer and check the table descriptor on peer: + *
    + *
  1. Create the same table on peer when not exist.
  2. + *
  3. Throw exception if the table exists on peer cluster but descriptors are not same.
  4. + *
+ * @param tableName name of the table to sync to the peer + * @param splits table split keys + * @throws IOException + */ + private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) + throws IOException { + List repPeers = listValidReplicationPeers(); + if (repPeers == null || repPeers.size() <= 0) { + throw new IllegalArgumentException("Found no peer cluster for replication."); + } + for (ReplicationPeer repPeer : repPeers) { + Configuration peerConf = repPeer.getConfiguration(); + HTableDescriptor htd = null; + try (Connection conn = ConnectionFactory.createConnection(peerConf); + Admin admin = this.connection.getAdmin(); + Admin repHBaseAdmin = conn.getAdmin()) { + htd = admin.getTableDescriptor(tableName); + HTableDescriptor peerHtd = null; + if (!repHBaseAdmin.tableExists(tableName)) { + repHBaseAdmin.createTable(htd, splits); + } else { + peerHtd = repHBaseAdmin.getTableDescriptor(tableName); + if (peerHtd == null) { + throw new IllegalArgumentException("Failed to get table descriptor for table " + + tableName.getNameAsString() + " from peer cluster " + repPeer.getId()); + } else if (!peerHtd.equals(htd)) { + throw new IllegalArgumentException("Table " + tableName.getNameAsString() + + " exists in peer cluster " + repPeer.getId() + + ", but the table descriptors are not same when comapred with source cluster." + + " Thus can not enable the table's replication switch."); + } + } + } + } + } + + private List listValidReplicationPeers() { + Map peers = listPeerConfigs(); + if (peers == null || peers.size() <= 0) { + return null; + } + List validPeers = new ArrayList(peers.size()); + for (Entry peerEntry : peers.entrySet()) { + String peerId = peerEntry.getKey(); + String clusterKey = peerEntry.getValue().getClusterKey(); + Configuration peerConf = new Configuration(this.connection.getConfiguration()); + Stat s = null; + try { + ZKUtil.applyClusterKeyToConf(peerConf, clusterKey); + Pair pair = this.replicationPeers.getPeerConf(peerId); + ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); + s = + zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT), + null); + if (null == s) { + LOG.info(peerId + ' ' + clusterKey + " is invalid now."); + continue; + } + validPeers.add(peer); + } catch (ReplicationException e) { + LOG.warn("Failed to get valid replication peers. " + + "Error connecting to peer cluster with peerId=" + peerId); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } catch (KeeperException e) { + LOG.warn("Failed to get valid replication peers. KeeperException code=" + + e.code().intValue()); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } catch (InterruptedException e) { + LOG.warn("Failed to get valid replication peers due to InterruptedException."); + LOG.debug("Failure details to get valid replication peers.", e); + Thread.currentThread().interrupt(); + continue; + } catch (IOException e) { + LOG.warn("Failed to get valid replication peers due to IOException."); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } + } + return validPeers; + } + + /** + * Set the table's replication switch if the table's replication switch is already not set. + * @param tableName name of the table + * @param isRepEnabled is replication switch enable or disable + * @throws IOException if a remote or network exception occurs + */ + private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException { + Admin admin = null; + try { + admin = this.connection.getAdmin(); + HTableDescriptor htd = admin.getTableDescriptor(tableName); + if (isTableRepEnabled(htd) ^ isRepEnabled) { + boolean isOnlineSchemaUpdateEnabled = + this.connection.getConfiguration() + .getBoolean("hbase.online.schema.update.enable", true); + if (!isOnlineSchemaUpdateEnabled) { + admin.disableTable(tableName); + } + for (HColumnDescriptor hcd : htd.getFamilies()) { + hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL + : HConstants.REPLICATION_SCOPE_LOCAL); + } + admin.modifyTable(tableName, htd); + if (!isOnlineSchemaUpdateEnabled) { + admin.enableTable(tableName); + } + } + } finally { + if (admin != null) { + try { + admin.close(); + } catch (IOException e) { + LOG.warn("Failed to close admin connection."); + LOG.debug("Details on failure to close admin connection.", e); + } + } + } + } + + /** + * @param htd table descriptor details for the table to check + * @return true if table's replication switch is enabled + */ + private boolean isTableRepEnabled(HTableDescriptor htd) { + for (HColumnDescriptor hcd : htd.getFamilies()) { + if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { + return false; + } + } + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java new file mode 100644 index 00000000000..b5899b814f1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.client.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Unit testing of ReplicationAdmin with clusters + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestReplicationAdminWithClusters extends TestReplicationBase { + + static Connection connection1; + static Connection connection2; + static Admin admin1; + static Admin admin2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestReplicationBase.setUpBeforeClass(); + connection1 = ConnectionFactory.createConnection(conf1); + connection2 = ConnectionFactory.createConnection(conf2); + admin1 = connection1.getAdmin(); + admin2 = connection2.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + admin1.close(); + admin2.close(); + connection1.close(); + connection2.close(); + TestReplicationBase.tearDownAfterClass(); + } + + @Test(timeout = 300000) + public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { + admin2.disableTable(tableName); + admin2.deleteTable(tableName); + assertFalse(admin2.tableExists(tableName)); + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep(tableName); + assertTrue(admin2.tableExists(tableName)); + } + + @Test(timeout = 300000) + public void testEnableReplicationWhenReplicationNotEnabled() throws Exception { + HTableDescriptor table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL); + } + admin1.disableTable(tableName); + admin1.modifyTable(tableName, table); + admin1.enableTable(tableName); + + admin2.disableTable(tableName); + admin2.modifyTable(tableName, table); + admin2.enableTable(tableName); + + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep(tableName); + table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); + } + } + + @Test(timeout = 300000) + public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { + HTableDescriptor table = admin2.getTableDescriptor(tableName); + HColumnDescriptor f = new HColumnDescriptor("newFamily"); + table.addFamily(f); + admin2.disableTable(tableName); + admin2.modifyTable(tableName, table); + admin2.enableTable(tableName); + + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + try { + adminExt.enableTableRep(tableName); + fail("Exception should be thrown if table descriptors in the clusters are not same."); + } catch (RuntimeException ignored) { + + } + admin1.disableTable(tableName); + admin1.modifyTable(tableName, table); + admin1.enableTable(tableName); + adminExt.enableTableRep(tableName); + table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); + } + } + + @Test(timeout = 300000) + public void testDisableAndEnableReplication() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.disableTableRep(tableName); + HTableDescriptor table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL); + } + table = admin2.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL); + } + adminExt.enableTableRep(tableName); + table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); + } + } + + @Test(timeout = 300000, expected = TableNotFoundException.class) + public void testDisableReplicationForNonExistingTable() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.disableTableRep(TableName.valueOf("nonExistingTable")); + } + + @Test(timeout = 300000, expected = TableNotFoundException.class) + public void testEnableReplicationForNonExistingTable() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep(TableName.valueOf("nonExistingTable")); + } + + @Test(timeout = 300000, expected = IllegalArgumentException.class) + public void testDisableReplicationWhenTableNameAsNull() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.disableTableRep(null); + } + + @Test(timeout = 300000, expected = IllegalArgumentException.class) + public void testEnableReplicationWhenTableNameAsNull() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep(null); + } +} diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 2d0845f5625..b2ca8e1a6e9 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -23,6 +23,7 @@ java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig java_import org.apache.hadoop.hbase.util.Bytes java_import org.apache.hadoop.hbase.zookeeper.ZKUtil +java_import org.apache.hadoop.hbase.TableName # Wrapper for org.apache.hadoop.hbase.client.replication.ReplicationAdmin @@ -157,5 +158,17 @@ module Hbase def remove_peer_tableCFs(id, tableCFs) @replication_admin.removePeerTableCFs(id, tableCFs) end + #---------------------------------------------------------------------------------------------- + # Enables a table's replication switch + def enable_tablerep(table_name) + tableName = TableName.valueOf(table_name) + @replication_admin.enableTableRep(tableName) + end + #---------------------------------------------------------------------------------------------- + # Disables a table's replication switch + def disable_tablerep(table_name) + tableName = TableName.valueOf(table_name) + @replication_admin.disableTableRep(tableName) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 5db2776357d..893079de1b0 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -349,6 +349,8 @@ Shell.load_command_group( list_replicated_tables append_peer_tableCFs remove_peer_tableCFs + enable_table_replication + disable_table_replication ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb b/hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb new file mode 100644 index 00000000000..4c46feac3cd --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb @@ -0,0 +1,42 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class DisableTableReplication< Command + def help + return <<-EOF +Disable a table's replication switch. + +Examples: + + hbase> disable_table_replication 'table_name' +EOF + end + + def command(table_name) + format_simple_command do + replication_admin.disable_tablerep(table_name) + end + puts "The replication swith of table '#{table_name}' successfully disabled" + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb b/hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb new file mode 100644 index 00000000000..5d57f03fb54 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb @@ -0,0 +1,42 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class EnableTableReplication< Command + def help + return <<-EOF +Enable a table's replication switch. + +Examples: + + hbase> enable_table_replication 'table_name' +EOF + end + + def command(table_name) + format_simple_command do + replication_admin.enable_tablerep(table_name) + end + puts "The replication swith of table '#{table_name}' successfully enabled" + end + end + end +end