HBASE-15952 Bulk load data replication is not working when RS user does not have permission on hfile-refs node

This commit is contained in:
Ashish Singhi 2016-06-09 18:46:07 +05:30
parent 13d06a2cc8
commit a40ec70da9
6 changed files with 47 additions and 30 deletions

View File

@ -128,17 +128,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
// Irrespective of bulk load hfile replication is enabled or not we add peerId node to
// hfile-refs node -- HBASE-15397
try {
String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
LOG.info("Adding peer " + peerId + " to hfile reference queue.");
ZKUtil.createWithParents(this.zookeeper, peerId);
} catch (KeeperException e) {
throw new ReplicationException("Failed to add peer with id=" + id
+ ", node under hfile references node.", e);
}
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
ReplicationSerDeHelper.toByteArray(peerConfig));
@ -168,16 +157,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
+ " because that id does not exist.");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
// Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
// replication is enabled or not
String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
try {
LOG.info("Removing peer " + peerId + " from hfile reference queue.");
ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
} catch (NoNodeException e) {
LOG.info("Did not find node " + peerId + " to delete.", e);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not remove peer with id=" + id, e);
}

View File

@ -122,6 +122,12 @@ public interface ReplicationQueues {
*/
void addPeerToHFileRefs(String peerId) throws ReplicationException;
/**
* Remove a peer from hfile reference queue.
* @param peerId peer cluster id to be removed
*/
void removePeerFromHFileRefs(String peerId);
/**
* Add new hfile references to the queue.
* @param peerId peer cluster id to which the hfiles need to be replicated

View File

@ -84,12 +84,14 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
// Irrespective of bulk load hfile replication is enabled or not we add peerId node to
// hfile-refs node -- HBASE-15397
try {
ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize hfile references replication queue.", e);
if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
try {
ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize hfile references replication queue.",
e);
}
}
}
@ -499,4 +501,23 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
e);
}
}
@Override
public void removePeerFromHFileRefs(String peerId) {
final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
try {
if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
}
return;
} else {
LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
}
} catch (KeeperException e) {
LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
e);
}
}
}

View File

@ -115,6 +115,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final ThreadPoolExecutor executor;
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
/**
@ -166,6 +167,9 @@ public class ReplicationSourceManager implements ReplicationListener {
this.executor.setThreadFactory(tfb.build());
this.rand = new Random();
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
/**
@ -227,9 +231,6 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
boolean replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@ -582,6 +583,7 @@ public class ReplicationSourceManager implements ReplicationListener {
@Override
public void peerRemoved(String peerId) {
removePeer(peerId);
this.replicationQueues.removePeerFromHFileRefs(peerId);
}
@Override
@ -591,6 +593,9 @@ public class ReplicationSourceManager implements ReplicationListener {
boolean added = this.replicationPeers.peerAdded(id);
if (added) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
} catch (Exception e) {
LOG.error("Error while adding a new peer", e);

View File

@ -111,6 +111,7 @@ public class TestReplicationHFileCleaner {
public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
rq.addPeerToHFileRefs(peerId);
}
@After

View File

@ -204,6 +204,7 @@ public abstract class TestReplicationStateBasic {
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
rq1.addPeerToHFileRefs(ID_ONE);
rq1.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
@ -231,18 +232,22 @@ public abstract class TestReplicationStateBasic {
files1.add("file_1");
files1.add("file_2");
files1.add("file_3");
rq1.addPeerToHFileRefs(ID_ONE);
rq1.addHFileRefs(ID_ONE, files1);
rq1.addPeerToHFileRefs(ID_TWO);
rq1.addHFileRefs(ID_TWO, files1);
assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
rp.removePeer(ID_ONE);
rq1.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
rp.removePeer(ID_TWO);
rq1.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_TWO));
}