From a40ec70da9d9891f5af074535717fe20658a00cc Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Thu, 9 Jun 2016 18:46:07 +0530 Subject: [PATCH] HBASE-15952 Bulk load data replication is not working when RS user does not have permission on hfile-refs node --- .../replication/ReplicationPeersZKImpl.java | 21 ------------ .../hbase/replication/ReplicationQueues.java | 6 ++++ .../replication/ReplicationQueuesZKImpl.java | 33 +++++++++++++++---- .../ReplicationSourceManager.java | 11 +++++-- .../cleaner/TestReplicationHFileCleaner.java | 1 + .../TestReplicationStateBasic.java | 5 +++ 6 files changed, 47 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 076167e7114..d717b0b543d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -128,17 +128,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ZKUtil.createWithParents(this.zookeeper, this.peersZNode); - // Irrespective of bulk load hfile replication is enabled or not we add peerId node to - // hfile-refs node -- HBASE-15397 - try { - String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id); - LOG.info("Adding peer " + peerId + " to hfile reference queue."); - ZKUtil.createWithParents(this.zookeeper, peerId); - } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer with id=" + id - + ", node under hfile references node.", e); - } - List listOfOps = new ArrayList(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), ReplicationSerDeHelper.toByteArray(peerConfig)); @@ -168,16 +157,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re + " because that id does not exist."); } ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); - // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile - // replication is enabled or not - - String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id); - try { - LOG.info("Removing peer " + peerId + " from hfile reference queue."); - ZKUtil.deleteNodeRecursively(this.zookeeper, peerId); - } catch (NoNodeException e) { - LOG.info("Did not find node " + peerId + " to delete.", e); - } } catch (KeeperException e) { throw new ReplicationException("Could not remove peer with id=" + id, e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0d47a88cb4a..507367b304a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -122,6 +122,12 @@ public interface ReplicationQueues { */ void addPeerToHFileRefs(String peerId) throws ReplicationException; + /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId); + /** * Add new hfile references to the queue. * @param peerId peer cluster id to which the hfiles need to be replicated diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 93f3e27041d..c2d3b1fa340 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -84,12 +84,14 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } catch (KeeperException e) { throw new ReplicationException("Could not initialize replication queues.", e); } - // Irrespective of bulk load hfile replication is enabled or not we add peerId node to - // hfile-refs node -- HBASE-15397 - try { - ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize hfile references replication queue.", e); + if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { + try { + ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize hfile references replication queue.", + e); + } } } @@ -499,4 +501,23 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R e); } } + + @Override + public void removePeerFromHFileRefs(String peerId) { + final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + try { + if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); + } + return; + } else { + LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); + } + } catch (KeeperException e) { + LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", + e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 83e0205c312..57f80d8b799 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -115,6 +115,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final ThreadPoolExecutor executor; private final Random rand; + private final boolean replicationForBulkLoadDataEnabled; /** @@ -166,6 +167,9 @@ public class ReplicationSourceManager implements ReplicationListener { this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); this.latestPaths = Collections.synchronizedSet(new HashSet()); + replicationForBulkLoadDataEnabled = + conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); } /** @@ -227,9 +231,6 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server wal queues */ protected void init() throws IOException, ReplicationException { - boolean replicationForBulkLoadDataEnabled = - conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); for (String id : this.replicationPeers.getPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { @@ -582,6 +583,7 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void peerRemoved(String peerId) { removePeer(peerId); + this.replicationQueues.removePeerFromHFileRefs(peerId); } @Override @@ -591,6 +593,9 @@ public class ReplicationSourceManager implements ReplicationListener { boolean added = this.replicationPeers.peerAdded(id); if (added) { addSource(id); + if (replicationForBulkLoadDataEnabled) { + this.replicationQueues.addPeerToHFileRefs(id); + } } } catch (Exception e) { LOG.error("Error while adding a new peer", e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index a7348850b98..eb793dcf091 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -111,6 +111,7 @@ public class TestReplicationHFileCleaner { public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null); + rq.addPeerToHFileRefs(peerId); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index d10d6de706b..2d5277ba9a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -204,6 +204,7 @@ public abstract class TestReplicationStateBasic { assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rq1.addPeerToHFileRefs(ID_ONE); rq1.addHFileRefs(ID_ONE, files1); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); @@ -231,18 +232,22 @@ public abstract class TestReplicationStateBasic { files1.add("file_1"); files1.add("file_2"); files1.add("file_3"); + rq1.addPeerToHFileRefs(ID_ONE); rq1.addHFileRefs(ID_ONE, files1); + rq1.addPeerToHFileRefs(ID_TWO); rq1.addHFileRefs(ID_TWO, files1); assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); rp.removePeer(ID_ONE); + rq1.removePeerFromHFileRefs(ID_ONE); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); rp.removePeer(ID_TWO); + rq1.removePeerFromHFileRefs(ID_TWO); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); assertNull(rqc.getReplicableHFiles(ID_TWO)); }