HBASE-15888 Extend HBASE-12769 for bulk load data replication
This commit is contained in:
parent
a8c8bfd5ee
commit
950a09b03c
|
@ -567,6 +567,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Check for hfile-refs queue
|
||||||
|
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
|
||||||
|
&& queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
|
||||||
|
throw new ReplicationException("Undeleted queue for peerId: " + peerId
|
||||||
|
+ ", found in hfile-refs node path " + hfileRefsZNode);
|
||||||
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
|
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationChecker {
|
public class ReplicationChecker {
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
|
||||||
|
private final ZooKeeperWatcher zkw;
|
||||||
private ErrorReporter errorReporter;
|
private ErrorReporter errorReporter;
|
||||||
private ReplicationQueuesClient queuesClient;
|
private ReplicationQueuesClient queuesClient;
|
||||||
private ReplicationPeers replicationPeers;
|
private ReplicationPeers replicationPeers;
|
||||||
private ReplicationQueueDeletor queueDeletor;
|
private ReplicationQueueDeletor queueDeletor;
|
||||||
// replicator with its queueIds for removed peers
|
// replicator with its queueIds for removed peers
|
||||||
private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
|
private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
|
||||||
|
// Set of un deleted hfile refs queue Ids
|
||||||
|
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
|
||||||
|
private final String hfileRefsZNode;
|
||||||
|
|
||||||
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
|
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
|
||||||
ErrorReporter errorReporter) throws IOException {
|
ErrorReporter errorReporter) throws IOException {
|
||||||
try {
|
try {
|
||||||
|
this.zkw = zkw;
|
||||||
this.errorReporter = errorReporter;
|
this.errorReporter = errorReporter;
|
||||||
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
|
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
|
||||||
this.queuesClient.init();
|
this.queuesClient.init();
|
||||||
|
@ -71,6 +76,13 @@ public class ReplicationChecker {
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
throw new IOException("failed to construct ReplicationChecker", e);
|
throw new IOException("failed to construct ReplicationChecker", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
|
||||||
|
String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName);
|
||||||
|
String hfileRefsZNodeName =
|
||||||
|
conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
|
||||||
|
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
|
||||||
|
hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasUnDeletedQueues() {
|
public boolean hasUnDeletedQueues() {
|
||||||
|
@ -103,6 +115,30 @@ public class ReplicationChecker {
|
||||||
} catch (KeeperException ke) {
|
} catch (KeeperException ke) {
|
||||||
throw new IOException(ke);
|
throw new IOException(ke);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkUnDeletedHFileRefsQueues(peerIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
|
||||||
|
try {
|
||||||
|
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
|
||||||
|
Set<String> peers = new HashSet<>(listOfPeers);
|
||||||
|
peers.removeAll(peerIds);
|
||||||
|
if (!peers.isEmpty()) {
|
||||||
|
undeletedHFileRefsQueueIds.addAll(peers);
|
||||||
|
String msg =
|
||||||
|
"Undeleted replication hfile-refs queue for removed peer found: "
|
||||||
|
+ undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
|
||||||
|
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
||||||
|
msg);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
||||||
|
+ hfileRefsZNode, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
||||||
|
@ -130,5 +166,20 @@ public class ReplicationChecker {
|
||||||
queueDeletor.removeQueue(replicator, queueId);
|
queueDeletor.removeQueue(replicator, queueId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fixUnDeletedHFileRefsQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fixUnDeletedHFileRefsQueue() throws IOException {
|
||||||
|
for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
|
||||||
|
String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
|
||||||
|
try {
|
||||||
|
ZKUtil.deleteNodeRecursively(this.zkw, node);
|
||||||
|
LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
|
||||||
|
+ hfileRefsZNode);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
||||||
|
+ " from path " + hfileRefsZNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue