From 89f1d1a0513ee61dae6e27bc2b012f96f6ba89da Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Tue, 11 Feb 2014 01:16:39 +0000 Subject: [PATCH] HBASE-8751 Enable peer cluster to choose/change the ColumnFamilies/Tables it really want to replicate from a source cluster (Feng Honghua via JD) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1566944 13f79535-47bb-0310-9956-ffa450edef68 --- .../client/replication/ReplicationAdmin.java | 21 + .../hbase/replication/ReplicationPeer.java | 103 +++- .../hbase/replication/ReplicationPeers.java | 30 +- .../replication/ReplicationPeersZKImpl.java | 98 +++- .../regionserver/ReplicationSource.java | 36 +- .../TestPerTableCFReplication.java | 443 ++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 16 +- hbase-shell/src/main/ruby/shell.rb | 4 +- .../src/main/ruby/shell/commands/add_peer.rb | 5 +- .../main/ruby/shell/commands/list_peers.rb | 5 +- .../ruby/shell/commands/set_peer_tableCFs.rb | 47 ++ .../ruby/shell/commands/show_peer_tableCFs.rb | 37 ++ 12 files changed, 819 insertions(+), 26 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/show_peer_tableCFs.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d2d10f884da..72e3fbb828f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -139,6 +139,11 @@ public class ReplicationAdmin implements Closeable { this.replicationPeers.addPeer(id, clusterKey); } + public void addPeer(String id, String clusterKey, String tableCFs) + throws ReplicationException { + this.replicationPeers.addPeer(id, clusterKey, tableCFs); + } + /** * Removes a peer cluster and stops the replication to it. * @param id a short that identifies the cluster @@ -179,6 +184,22 @@ public class ReplicationAdmin implements Closeable { return this.replicationPeers.getAllPeerClusterKeys(); } + /** + * Get the replicable table-cf config of the specified peer. + * @param id a short that identifies the cluster + */ + public String getPeerTableCFs(String id) throws ReplicationException { + return this.replicationPeers.getPeerTableCFsConfig(id); + } + + /** + * Set the replicable table-cf config of the specified peer + * @param id a short that identifies the cluster + */ + public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException { + this.replicationPeers.setPeerTableCFsConfig(id, tableCFs); + } + /** * Get the state of the specified peer cluster * @param id String format of the Short that identifies the peer, an IllegalArgumentException diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index bcdd7929493..1b14daba5cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.replication; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -54,13 +57,14 @@ public class ReplicationPeer implements Abortable, Closeable { private final String id; private List regionServers = new ArrayList(0); private final AtomicBoolean peerEnabled = new AtomicBoolean(); + private volatile Map> tableCFs = new HashMap>(); // Cannot be final since a new object needs to be recreated when session fails private ZooKeeperWatcher zkw; private final Configuration conf; private long lastRegionserverUpdate; private PeerStateTracker peerStateTracker; - + private TableCFsTracker tableCFsTracker; /** * Constructor that takes all the objects required to communicate with the @@ -103,6 +107,76 @@ public class ReplicationPeer implements Abortable, Closeable { this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false))); } + /** + * start a table-cfs tracker to listen the (table, cf-list) map change + * + * @param zookeeper zk watcher for the local cluster + * @param tableCFsNode path to zk node which stores table-cfs + * @throws KeeperException + */ + public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode) + throws KeeperException { + this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper, + this); + this.tableCFsTracker.start(); + this.readTableCFsZnode(); + } + + static Map> parseTableCFsFromConfig(String tableCFsConfig) { + if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { + return null; + } + + Map> tableCFsMap = null; + + // parse out (table, cf-list) pairs from tableCFsConfig + // format: "table1:cf1,cf2;table2:cfA,cfB" + String[] tables = tableCFsConfig.split(";"); + for (String tab : tables) { + // 1 ignore empty table config + tab = tab.trim(); + if (tab.length() == 0) { + continue; + } + // 2 split to "table" and "cf1,cf2" + // for each table: "table:cf1,cf2" or "table" + String[] pair = tab.split(":"); + String tabName = pair[0].trim(); + if (pair.length > 2 || tabName.length() == 0) { + LOG.error("ignore invalid tableCFs setting: " + tab); + continue; + } + + // 3 parse "cf1,cf2" part to List + List cfs = null; + if (pair.length == 2) { + String[] cfsList = pair[1].split(","); + for (String cf : cfsList) { + String cfName = cf.trim(); + if (cfName.length() > 0) { + if (cfs == null) { + cfs = new ArrayList(); + } + cfs.add(cfName); + } + } + } + + // 4 put > to map + if (tableCFsMap == null) { + tableCFsMap = new HashMap>(); + } + tableCFsMap.put(tabName, cfs); + } + + return tableCFsMap; + } + + private void readTableCFsZnode() { + String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); + this.tableCFs = parseTableCFsFromConfig(currentTableCFs); + } + /** * Get the cluster key of that peer * @return string consisting of zk ensemble addresses, client port @@ -120,6 +194,14 @@ public class ReplicationPeer implements Abortable, Closeable { return peerEnabled; } + /** + * Get replicable (table, cf-list) map of this peer + * @return the replicable (table, cf-list) map + */ + public Map> getTableCFs() { + return this.tableCFs; + } + /** * Get a list of all the addresses of all the region servers * for this peer cluster @@ -275,4 +357,23 @@ public class ReplicationPeer implements Abortable, Closeable { } } } + + /** + * Tracker for (table, cf-list) map of this peer + */ + public class TableCFsTracker extends ZooKeeperNodeTracker { + + public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, tableCFsZNode, abortable); + } + + @Override + public synchronized void nodeDataChanged(String path) { + if (path.equals(node)) { + super.nodeDataChanged(path); + readTableCFsZnode(); + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 3b4c764f5fd..4922f70471c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -44,7 +44,6 @@ public interface ReplicationPeers { * Initialize the ReplicationPeers interface. */ void init() throws ReplicationException; - /** * Add a new remote slave cluster for replication. * @param peerId a short that identifies the cluster @@ -53,6 +52,15 @@ public interface ReplicationPeers { */ void addPeer(String peerId, String clusterKey) throws ReplicationException; + /** + * Add a new remote slave cluster for replication. + * @param peerId a short that identifies the cluster + * @param clusterKey the concatenation of the slave cluster's: + * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent + * @param tableCFs the table and column-family list which will be replicated for this peer + */ + void addPeer(String peerId, String clusterKey, String tableCFs) throws ReplicationException; + /** * Removes a remote slave cluster and stops the replication to it. * @param peerId a short that identifies the cluster @@ -71,6 +79,26 @@ public interface ReplicationPeers { */ void disablePeer(String peerId) throws ReplicationException; + /** + * Get the table and column-family list string of the peer from ZK. + * @param peerId a short that identifies the cluster + */ + public String getPeerTableCFsConfig(String peerId) throws ReplicationException; + + /** + * Set the table and column-family list string of the peer to ZK. + * @param peerId a short that identifies the cluster + * @param tableCFs the table and column-family list which will be replicated for this peer + */ + public void setPeerTableCFsConfig(String peerId, String tableCFs) throws ReplicationException; + + /** + * Get the table and column-family-list map of the peer. + * @param peerId a short that identifies the cluster + * @return the table and column-family list which will be replicated for this peer + */ + public Map> getTableCFs(String peerId); + /** * Get the replication status for the specified connected remote slave cluster. * The value might be read from cache, so it is recommended to diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index a6d8112b281..fb09102f6c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -66,17 +66,26 @@ import com.google.protobuf.InvalidProtocolBufferException; * ReplicationPeer.PeerStateTracker class. For example: * * /hbase/replication/peers/1/peer-state [Value: ENABLED] + * + * Each of these peer znodes has a child znode that indicates which data will be replicated + * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a + * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker + * class. For example: + * + * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"] */ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers { // Map of peer clusters keyed by their id private Map peerClusters; + private final String tableCFsNodeName; private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, Abortable abortable) { super(zk, conf, abortable); + this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); this.peerClusters = new HashMap(); } @@ -94,6 +103,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public void addPeer(String id, String clusterKey) throws ReplicationException { + addPeer(id, clusterKey, null); + } + + @Override + public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException { try { if (peerExists(id)) { throw new IllegalArgumentException("Cannot add a peer with id=" + id @@ -108,6 +122,10 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES); // A peer is enabled by default + + String tableCFsStr = (tableCFs == null) ? "" : tableCFs; + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getTableCFsNode(id), + Bytes.toBytes(tableCFsStr)); } catch (KeeperException e) { throw new ReplicationException("Could not add peer with id=" + id + ", clusterKey=" + clusterKey, e); @@ -139,6 +157,50 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re LOG.info("peer " + id + " is disabled"); } + @Override + public String getPeerTableCFsConfig(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + try { + return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id))); + } catch (Exception e) { + throw new ReplicationException(e); + } + } catch (KeeperException e) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e); + } + } + + @Override + public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + + " does not exist."); + } + String tableCFsZKNode = getTableCFsNode(id); + byte[] tableCFs = Bytes.toBytes(tableCFsStr); + if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) { + ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs); + } else { + ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs); + } + LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); + } + } + + @Override + public Map> getTableCFs(String id) throws IllegalArgumentException { + if (!this.peerClusters.containsKey(id)) { + throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); + } + return this.peerClusters.get(id).getTableCFs(); + } + @Override public boolean getStatusOfConnectedPeer(String id) { if (!this.peerClusters.containsKey(id)) { @@ -149,16 +211,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException { - if (!this.getAllPeerIds().contains(id)) { - throw new IllegalArgumentException("peer " + id + " doesn't exist"); - } - String peerStateZNode = getPeerStateNode(id); try { - return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + String peerStateZNode = getPeerStateNode(id); + try { + return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + } catch (KeeperException e) { + throw new ReplicationException(e); + } catch (DeserializationException e) { + throw new ReplicationException(e); + } } catch (KeeperException e) { - throw new ReplicationException(e); - } catch (DeserializationException e) { - throw new ReplicationException(e); + throw new ReplicationException("Unable to get status of the peer with id=" + id + + " from backing store", e); } catch (InterruptedException e) { throw new ReplicationException(e); } @@ -377,6 +444,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return addresses; } + private String getTableCFsNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); + } private String getPeerStateNode(String id) { return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); @@ -430,9 +500,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re try { peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId)); } catch (KeeperException e) { - throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId, - e); + throw new ReplicationException("Error starting the peer state tracker for peerId=" + + peerId, e); } + + try { + peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + + peerId, e); + } + peer.getZkw().registerListener(new PeerRegionServerListener(peer)); return peer; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index b07bcd8ae35..fe9451592ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -26,6 +26,7 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -612,17 +614,37 @@ public class ReplicationSource extends Thread * @param entry The entry to check for replication */ protected void removeNonReplicableEdits(HLog.Entry entry) { - NavigableMap scopes = entry.getKey().getScopes(); + String tabName = entry.getKey().getTablename().getNameAsString(); ArrayList kvs = entry.getEdit().getKeyValues(); + Map> tableCFs = null; + try { + tableCFs = this.replicationPeers.getTableCFs(peerId); + } catch (IllegalArgumentException e) { + LOG.error("should not happen: can't get tableCFs for peer " + peerId + + ", degenerate as if it's not configured by keeping tableCFs==null"); + } int size = kvs.size(); - for (int i = size-1; i >= 0; i--) { - KeyValue kv = kvs.get(i); - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - if (scopes == null || !scopes.containsKey(kv.getFamily())) { - kvs.remove(i); + + // clear kvs(prevent replicating) if logKey's table isn't in this peer's + // replicable table list (empty tableCFs means all table are replicable) + if (tableCFs != null && !tableCFs.containsKey(tabName)) { + kvs.clear(); + } else { + NavigableMap scopes = entry.getKey().getScopes(); + List cfs = (tableCFs == null) ? null : tableCFs.get(tabName); + for (int i = size - 1; i >= 0; i--) { + KeyValue kv = kvs.get(i); + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if (scopes == null || !scopes.containsKey(kv.getFamily()) || + (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) { + kvs.remove(i); + } } } + if (kvs.size() < size/2) { kvs.trimToSize(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java new file mode 100644 index 00000000000..ee102fc84a1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -0,0 +1,443 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestPerTableCFReplication { + + private static final Log LOG = LogFactory.getLog(TestPerTableCFReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + private static Configuration conf3; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static HBaseTestingUtility utility3; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 100; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] tabAName = Bytes.toBytes("TA"); + private static final byte[] tabBName = Bytes.toBytes("TB"); + private static final byte[] tabCName = Bytes.toBytes("TC"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] f1Name = Bytes.toBytes("f1"); + private static final byte[] f2Name = Bytes.toBytes("f2"); + private static final byte[] f3Name = Bytes.toBytes("f3"); + private static final byte[] row1 = Bytes.toBytes("row1"); + private static final byte[] row2 = Bytes.toBytes("row2"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + private static final byte[] val = Bytes.toBytes("myval"); + + private static HTableDescriptor table; + private static HTableDescriptor tabA; + private static HTableDescriptor tabB; + private static HTableDescriptor tabC; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + conf3 = new Configuration(conf1); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster3", null, true); + + utility3 = new HBaseTestingUtility(conf3); + utility3.setZkCluster(miniZK); + new ZooKeeperWatcher(conf3, "cluster3", null, true); + + table = new HTableDescriptor(TableName.valueOf(tableName)); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + + tabA = new HTableDescriptor(tabAName); + fam = new HColumnDescriptor(f1Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabA.addFamily(fam); + fam = new HColumnDescriptor(f2Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabA.addFamily(fam); + fam = new HColumnDescriptor(f3Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabA.addFamily(fam); + + tabB = new HTableDescriptor(tabBName); + fam = new HColumnDescriptor(f1Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabB.addFamily(fam); + fam = new HColumnDescriptor(f2Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabB.addFamily(fam); + fam = new HColumnDescriptor(f3Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabB.addFamily(fam); + + tabC = new HTableDescriptor(tabCName); + fam = new HColumnDescriptor(f1Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabC.addFamily(fam); + fam = new HColumnDescriptor(f2Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabC.addFamily(fam); + fam = new HColumnDescriptor(f3Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabC.addFamily(fam); + + utility1.startMiniCluster(); + utility2.startMiniCluster(); + utility3.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + @Test + public void testParseTableCFsFromConfig() { + Map> tabCFsMap = null; + + // 1. null or empty string, result should be null + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null); + assertEquals(null, tabCFsMap); + + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(""); + assertEquals(null, tabCFsMap); + + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(" "); + assertEquals(null, tabCFsMap); + + // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1"); + assertEquals(1, tabCFsMap.size()); // only one table + assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1" + assertFalse(tabCFsMap.containsKey("tab2")); // not other table + assertEquals(null, tabCFsMap.get("tab1")); // null cf-list, + + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1"); + assertEquals(1, tabCFsMap.size()); // only one table + assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2" + assertFalse(tabCFsMap.containsKey("tab1")); // not other table + assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf + assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1" + + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3"); + assertEquals(1, tabCFsMap.size()); // only one table + assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2" + assertFalse(tabCFsMap.containsKey("tab1")); // not other table + assertEquals(2, tabCFsMap.get("tab3").size()); // cf-list contains 2 cf + assertTrue(tabCFsMap.get("tab3").contains("cf1"));// contains "cf1" + assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3" + + // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); + // 3.1 contains 3 tables : "tab1", "tab2" and "tab3" + assertEquals(3, tabCFsMap.size()); + assertTrue(tabCFsMap.containsKey("tab1")); + assertTrue(tabCFsMap.containsKey("tab2")); + assertTrue(tabCFsMap.containsKey("tab3")); + // 3.2 table "tab1" : null cf-list + assertEquals(null, tabCFsMap.get("tab1")); + // 3.3 table "tab2" : cf-list contains a single cf "cf1" + assertEquals(1, tabCFsMap.get("tab2").size()); + assertEquals("cf1", tabCFsMap.get("tab2").get(0)); + // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" + assertEquals(2, tabCFsMap.get("tab3").size()); + assertTrue(tabCFsMap.get("tab3").contains("cf1")); + assertTrue(tabCFsMap.get("tab3").contains("cf3")); + + // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated + // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;"); + // 4.1 contains 3 tables : "tab1", "tab2" and "tab3" + assertEquals(3, tabCFsMap.size()); + assertTrue(tabCFsMap.containsKey("tab1")); + assertTrue(tabCFsMap.containsKey("tab2")); + assertTrue(tabCFsMap.containsKey("tab3")); + // 4.2 table "tab1" : null cf-list + assertEquals(null, tabCFsMap.get("tab1")); + // 4.3 table "tab2" : cf-list contains a single cf "cf1" + assertEquals(1, tabCFsMap.get("tab2").size()); + assertEquals("cf1", tabCFsMap.get("tab2").get(0)); + // 4.4 table "tab3" : cf-list contains "cf1" and "cf3" + assertEquals(2, tabCFsMap.get("tab3").size()); + assertTrue(tabCFsMap.get("tab3").contains("cf1")); + assertTrue(tabCFsMap.get("tab3").contains("cf3")); + + // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3" + // "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally + tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"); + // 5.1 no "tab1" and "tab2", only "tab3" + assertEquals(1, tabCFsMap.size()); // only one table + assertFalse(tabCFsMap.containsKey("tab1")); + assertFalse(tabCFsMap.containsKey("tab2")); + assertTrue(tabCFsMap.containsKey("tab3")); + // 5.2 table "tab3" : cf-list contains "cf1" and "cf3" + assertEquals(2, tabCFsMap.get("tab3").size()); + assertTrue(tabCFsMap.get("tab3").contains("cf1")); + assertTrue(tabCFsMap.get("tab3").contains("cf3")); + } + + @Test(timeout=300000) + public void testPerTableCFReplication() throws Exception { + LOG.info("testPerTableCFReplication"); + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + + new HBaseAdmin(conf1).createTable(tabA); + new HBaseAdmin(conf1).createTable(tabB); + new HBaseAdmin(conf1).createTable(tabC); + new HBaseAdmin(conf2).createTable(tabA); + new HBaseAdmin(conf2).createTable(tabB); + new HBaseAdmin(conf2).createTable(tabC); + new HBaseAdmin(conf3).createTable(tabA); + new HBaseAdmin(conf3).createTable(tabB); + new HBaseAdmin(conf3).createTable(tabC); + + HTable htab1A = new HTable(conf1, tabAName); + HTable htab2A = new HTable(conf2, tabAName); + HTable htab3A = new HTable(conf3, tabAName); + + HTable htab1B = new HTable(conf1, tabBName); + HTable htab2B = new HTable(conf2, tabBName); + HTable htab3B = new HTable(conf3, tabBName); + + HTable htab1C = new HTable(conf1, tabCName); + HTable htab2C = new HTable(conf2, tabCName); + HTable htab3C = new HTable(conf3, tabCName); + + // A. add cluster2/cluster3 as peers to cluster1 + admin1.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3"); + admin1.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2"); + + // A1. tableA can only replicated to cluster3 + putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); + ensureRowNotReplicated(row1, f1Name, htab2A); + deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A); + + putAndWaitWithFamily(row1, f2Name, htab1A, htab3A); + ensureRowNotReplicated(row1, f2Name, htab2A); + deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A); + + putAndWaitWithFamily(row1, f3Name, htab1A, htab3A); + ensureRowNotReplicated(row1, f3Name, htab2A); + deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A); + + // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3 + putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); + deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); + + // cf 'f2' of tableB can only replicated to cluster3 + putAndWaitWithFamily(row1, f2Name, htab1B, htab3B); + ensureRowNotReplicated(row1, f2Name, htab2B); + deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B); + + // cf 'f3' of tableB can only replicated to cluster2 + putAndWaitWithFamily(row1, f3Name, htab1B, htab2B); + ensureRowNotReplicated(row1, f3Name, htab3B); + deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B); + + // A3. tableC can only replicated to cluster2 + putAndWaitWithFamily(row1, f1Name, htab1C, htab2C); + ensureRowNotReplicated(row1, f1Name, htab3C); + deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C); + + putAndWaitWithFamily(row1, f2Name, htab1C, htab2C); + ensureRowNotReplicated(row1, f2Name, htab3C); + deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C); + + putAndWaitWithFamily(row1, f3Name, htab1C, htab2C); + ensureRowNotReplicated(row1, f3Name, htab3C); + deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C); + + // B. change peers' replicable table-cf config + admin1.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3"); + admin1.setPeerTableCFs("3", "TB; TC:f3"); + + // B1. cf 'f1' of tableA can only replicated to cluster2 + putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); + ensureRowNotReplicated(row2, f1Name, htab3A); + deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A); + // cf 'f2' of tableA can only replicated to cluster2 + putAndWaitWithFamily(row2, f2Name, htab1A, htab2A); + ensureRowNotReplicated(row2, f2Name, htab3A); + deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A); + // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3 + putAndWaitWithFamily(row2, f3Name, htab1A); + ensureRowNotReplicated(row2, f3Name, htab2A, htab3A); + deleteAndWaitWithFamily(row2, f3Name, htab1A); + + // B2. tableB can only replicated to cluster3 + putAndWaitWithFamily(row2, f1Name, htab1B, htab3B); + ensureRowNotReplicated(row2, f1Name, htab2B); + deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B); + + putAndWaitWithFamily(row2, f2Name, htab1B, htab3B); + ensureRowNotReplicated(row2, f2Name, htab2B); + deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B); + + putAndWaitWithFamily(row2, f3Name, htab1B, htab3B); + ensureRowNotReplicated(row2, f3Name, htab2B); + deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B); + + // B3. cf 'f1' of tableC non-replicable to either cluster + putAndWaitWithFamily(row2, f1Name, htab1C); + ensureRowNotReplicated(row2, f1Name, htab2C, htab3C); + deleteAndWaitWithFamily(row2, f1Name, htab1C); + // cf 'f2' of tableC can only replicated to cluster2 + putAndWaitWithFamily(row2, f2Name, htab1C, htab2C); + ensureRowNotReplicated(row2, f2Name, htab3C); + deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C); + // cf 'f3' of tableC can replicated to cluster2 and cluster3 + putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); + deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); + } + + private void ensureRowNotReplicated(byte[] row, byte[] fam, HTable... tables) throws IOException { + Get get = new Get(row); + get.addFamily(fam); + for (HTable table : tables) { + Result res = table.get(get); + assertEquals(0, res.size()); + } + } + + private void deleteAndWaitWithFamily(byte[] row, byte[] fam, + HTable source, HTable... targets) + throws Exception { + Delete del = new Delete(row); + del.deleteFamily(fam); + source.delete(del); + + Get get = new Get(row); + get.addFamily(fam); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + boolean removedFromAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + removedFromAll = false; + break; + } + } + if (removedFromAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } + + private void putAndWaitWithFamily(byte[] row, byte[] fam, + HTable source, HTable... targets) + throws Exception { + Put put = new Put(row); + put.add(fam, row, val); + source.put(put); + + Get get = new Get(row); + get.addFamily(fam); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + boolean replicatedToAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + replicatedToAll = false; + break; + } else { + assertEquals(res.size(), 1); + assertArrayEquals(res.value(), val); + } + } + if (replicatedToAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } +} diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index baa6aeaeda8..d1ddee36d47 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -32,8 +32,8 @@ module Hbase #---------------------------------------------------------------------------------------------- # Add a new peer cluster to replicate to - def add_peer(id, cluster_key) - @replication_admin.addPeer(id, cluster_key) + def add_peer(id, cluster_key, peer_tableCFs = nil) + @replication_admin.addPeer(id, cluster_key, peer_tableCFs) end #---------------------------------------------------------------------------------------------- @@ -72,5 +72,17 @@ module Hbase def disable_peer(id) @replication_admin.disablePeer(id) end + + #---------------------------------------------------------------------------------------------- + # Show the current tableCFs config for the specified peer + def show_peer_tableCFs(id) + @replication_admin.getPeerTableCFs(id) + end + + #---------------------------------------------------------------------------------------------- + # Set new tableCFs config for the specified peer + def set_peer_tableCFs(id, tableCFs) + @replication_admin.setPeerTableCFs(id, tableCFs) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index fef59b77bd2..67f783585cf 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -323,6 +323,8 @@ Shell.load_command_group( list_peers enable_peer disable_peer + show_peer_tableCFs + set_peer_tableCFs list_replicated_tables ] ) @@ -361,4 +363,4 @@ Shell.load_command_group( get_auths clear_auths ] -) \ No newline at end of file +) diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index deb5c5def51..ecd8e753920 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -30,12 +30,13 @@ Examples: hbase> add_peer '1', "server1.cie.com:2181:/hbase" hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod" + hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "tab1; tab2:cf1; tab3:cf2,cf3" EOF end - def command(id, cluster_key) + def command(id, cluster_key, peer_tableCFs = nil) format_simple_command do - replication_admin.add_peer(id, cluster_key) + replication_admin.add_peer(id, cluster_key, peer_tableCFs) end end end diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 2f57592e587..cc1be044169 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -33,11 +33,12 @@ EOF now = Time.now peers = replication_admin.list_peers - formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"]) + formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"]) peers.entrySet().each do |e| state = replication_admin.get_peer_state(e.key) - formatter.row([ e.key, e.value, state ]) + tableCFs = replication_admin.show_peer_tableCFs(e.key) + formatter.row([ e.key, e.value, state, tableCFs ]) end formatter.footer(now) diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb new file mode 100644 index 00000000000..3a88dbb7412 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb @@ -0,0 +1,47 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerTableCFs< Command + def help + return <<-EOF + Set the replicable table-cf config for the specified peer + Examples: + + # set all tables to be replicable for a peer + hbase> set_peer_tableCFs '1', "" + hbase> set_peer_tableCFs '1' + # set table / table-cf to be replicable for a peer, for a table without + # an explicit column-family list, all replicable column-families (with + # replication_scope == 1) will be replicated + hbase> set_peer_tableCFs '2', "table1; table2:cf1,cf2; table3:cfA,cfB" + + EOF + end + + def command(id, peer_table_cfs = nil) + format_simple_command do + replication_admin.set_peer_tableCFs(id, peer_table_cfs) + end + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/show_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/show_peer_tableCFs.rb new file mode 100644 index 00000000000..3ce3d06f012 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/show_peer_tableCFs.rb @@ -0,0 +1,37 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class ShowPeerTableCFs< Command + def help + return <<-EOF + Show replicable table-cf config for the specified peer. + + hbase> show_peer_tableCFs + EOF + end + + def command(id) + puts replication_admin.show_peer_tableCFs(id) + end + end + end +end