HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang)
This commit is contained in:
parent
b554e05410
commit
67420fe21d
|
@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||||
|
@ -135,6 +137,8 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
||||||
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
|
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
@ -320,6 +324,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
|
|
||||||
CatalogJanitor catalogJanitorChore;
|
CatalogJanitor catalogJanitorChore;
|
||||||
private ReplicationMetaCleaner replicationMetaCleaner;
|
private ReplicationMetaCleaner replicationMetaCleaner;
|
||||||
|
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
|
||||||
private LogCleaner logCleaner;
|
private LogCleaner logCleaner;
|
||||||
private HFileCleaner hfileCleaner;
|
private HFileCleaner hfileCleaner;
|
||||||
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
||||||
|
@ -1008,6 +1013,17 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
LOG.trace("Started service threads");
|
LOG.trace("Started service threads");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start replication zk node cleaner
|
||||||
|
if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
|
||||||
|
ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) {
|
||||||
|
try {
|
||||||
|
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
|
||||||
|
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
|
||||||
|
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("start replicationZKNodeCleanerChore failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
|
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
|
||||||
getChoreService().scheduleChore(replicationMetaCleaner);
|
getChoreService().scheduleChore(replicationMetaCleaner);
|
||||||
}
|
}
|
||||||
|
@ -1043,6 +1059,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// Clean up and close up shop
|
// Clean up and close up shop
|
||||||
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
||||||
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
||||||
|
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
|
||||||
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
|
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
|
||||||
if (this.quotaManager != null) this.quotaManager.stop();
|
if (this.quotaManager != null) this.quotaManager.stop();
|
||||||
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
||||||
|
|
|
@ -0,0 +1,202 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to clean the replication queues belonging to the peer which does not exist.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ReplicationZKNodeCleaner {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleaner.class);
|
||||||
|
private final ZooKeeperWatcher zkw;
|
||||||
|
private final ReplicationQueuesClient queuesClient;
|
||||||
|
private final ReplicationPeers replicationPeers;
|
||||||
|
private final ReplicationQueueDeletor queueDeletor;
|
||||||
|
|
||||||
|
public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, Abortable abortable)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
this.zkw = zkw;
|
||||||
|
this.queuesClient = ReplicationFactory
|
||||||
|
.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
|
||||||
|
this.queuesClient.init();
|
||||||
|
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
|
||||||
|
abortable);
|
||||||
|
this.replicationPeers.init();
|
||||||
|
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return undeletedQueues replicator with its queueIds for removed peers
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Map<String, List<String>> getUnDeletedQueues() throws IOException {
|
||||||
|
Map<String, List<String>> undeletedQueues = new HashMap<>();
|
||||||
|
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
||||||
|
try {
|
||||||
|
List<String> replicators = this.queuesClient.getListOfReplicators();
|
||||||
|
for (String replicator : replicators) {
|
||||||
|
List<String> queueIds = this.queuesClient.getAllQueues(replicator);
|
||||||
|
for (String queueId : queueIds) {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||||
|
undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<String>()).add(
|
||||||
|
queueId);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Undeleted replication queue for removed peer found: "
|
||||||
|
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
||||||
|
queueInfo.getPeerId(), replicator, queueId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
throw new IOException("Failed to get the replication queues of all replicators", ke);
|
||||||
|
}
|
||||||
|
return undeletedQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
|
||||||
|
* hfile-refs queue
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
|
||||||
|
Set<String> undeletedHFileRefsQueue = new HashSet<>();
|
||||||
|
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
||||||
|
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
|
||||||
|
try {
|
||||||
|
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
|
||||||
|
Set<String> peers = new HashSet<>(listOfPeers);
|
||||||
|
peers.removeAll(peerIds);
|
||||||
|
if (!peers.isEmpty()) {
|
||||||
|
undeletedHFileRefsQueue.addAll(peers);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
||||||
|
+ hfileRefsZNode, e);
|
||||||
|
}
|
||||||
|
return undeletedHFileRefsQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
||||||
|
|
||||||
|
public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
|
||||||
|
super(zk, conf, abortable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param replicator The regionserver which has undeleted queue
|
||||||
|
* @param queueId The undeleted queue id
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void removeQueue(final String replicator, final String queueId) throws IOException {
|
||||||
|
String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
|
||||||
|
queueId);
|
||||||
|
try {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
|
||||||
|
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
||||||
|
LOG.info("Successfully removed replication queue, replicator: " + replicator
|
||||||
|
+ ", queueId: " + queueId);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
|
||||||
|
+ queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param hfileRefsQueueId The undeleted hfile-refs queue id
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
|
||||||
|
String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
|
||||||
|
try {
|
||||||
|
if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
|
||||||
|
ZKUtil.deleteNodeRecursively(this.zookeeper, node);
|
||||||
|
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
|
||||||
|
+ hfileRefsZNode);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
||||||
|
+ " from path " + hfileRefsZNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String getHfileRefsZNode() {
|
||||||
|
return this.hfileRefsZNode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the undeleted replication queue's zk node for removed peers.
|
||||||
|
* @param undeletedQueues replicator with its queueIds for removed peers
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException {
|
||||||
|
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
|
||||||
|
String replicator = replicatorAndQueueIds.getKey();
|
||||||
|
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||||
|
queueDeletor.removeQueue(replicator, queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the undeleted hfile-refs queue's zk node for removed peers.
|
||||||
|
* @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
|
||||||
|
* hfile-refs queue
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
|
||||||
|
for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
|
||||||
|
queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chore that will clean the replication queues belonging to the peer which does not exist.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ReplicationZKNodeCleanerChore extends ScheduledChore {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleanerChore.class);
|
||||||
|
private final ReplicationZKNodeCleaner cleaner;
|
||||||
|
|
||||||
|
public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
|
||||||
|
ReplicationZKNodeCleaner cleaner) {
|
||||||
|
super("ReplicationZKNodeCleanerChore", stopper, period);
|
||||||
|
this.cleaner = cleaner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
try {
|
||||||
|
Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
||||||
|
cleaner.removeQueues(undeletedQueues);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to clean replication zk node", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hbase.util.hbck;
|
package org.apache.hadoop.hbase.util.hbck;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -27,161 +26,75 @@ import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check and fix undeleted replication queues for removed peerId.
|
* Check and fix undeleted replication queues for removed peerId.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationChecker {
|
public class ReplicationChecker {
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
|
|
||||||
private final ZooKeeperWatcher zkw;
|
|
||||||
private final ErrorReporter errorReporter;
|
private final ErrorReporter errorReporter;
|
||||||
private final ReplicationQueuesClient queuesClient;
|
|
||||||
private final ReplicationPeers replicationPeers;
|
|
||||||
private final ReplicationQueueDeletor queueDeletor;
|
|
||||||
// replicator with its queueIds for removed peers
|
// replicator with its queueIds for removed peers
|
||||||
private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
|
private Map<String, List<String>> undeletedQueueIds = new HashMap<>();
|
||||||
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
|
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
|
||||||
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
|
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
|
||||||
private final String hfileRefsZNode;
|
private final ReplicationZKNodeCleaner cleaner;
|
||||||
|
|
||||||
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
|
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
|
||||||
ErrorReporter errorReporter) throws IOException {
|
ErrorReporter errorReporter) throws IOException {
|
||||||
try {
|
this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
|
||||||
this.zkw = zkw;
|
|
||||||
this.errorReporter = errorReporter;
|
this.errorReporter = errorReporter;
|
||||||
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(
|
|
||||||
new ReplicationQueuesClientArguments(conf, connection, zkw));
|
|
||||||
this.queuesClient.init();
|
|
||||||
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
|
|
||||||
connection);
|
|
||||||
this.replicationPeers.init();
|
|
||||||
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException("failed to construct ReplicationChecker", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
|
|
||||||
String replicationZNode = ZKUtil.joinZNode(this.zkw.znodePaths.baseZNode, replicationZNodeName);
|
|
||||||
String hfileRefsZNodeName =
|
|
||||||
conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
|
|
||||||
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
|
|
||||||
hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasUnDeletedQueues() {
|
public boolean hasUnDeletedQueues() {
|
||||||
return errorReporter.getErrorList()
|
return errorReporter.getErrorList().contains(
|
||||||
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkUnDeletedQueues() throws IOException {
|
public void checkUnDeletedQueues() throws IOException {
|
||||||
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
undeletedQueueIds = cleaner.getUnDeletedQueues();
|
||||||
try {
|
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
||||||
List<String> replicators = this.queuesClient.getListOfReplicators();
|
String replicator = replicatorAndQueueIds.getKey();
|
||||||
for (String replicator : replicators) {
|
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||||
List<String> queueIds = this.queuesClient.getAllQueues(replicator);
|
|
||||||
for (String queueId : queueIds) {
|
|
||||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
if (!peerIds.contains(queueInfo.getPeerId())) {
|
|
||||||
if (!undeletedQueueIds.containsKey(replicator)) {
|
|
||||||
undeletedQueueIds.put(replicator, new ArrayList<String>());
|
|
||||||
}
|
|
||||||
undeletedQueueIds.get(replicator).add(queueId);
|
|
||||||
|
|
||||||
String msg = "Undeleted replication queue for removed peer found: "
|
String msg = "Undeleted replication queue for removed peer found: "
|
||||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
|
||||||
queueInfo.getPeerId(), replicator, queueId);
|
replicator, queueId);
|
||||||
errorReporter.reportError(
|
|
||||||
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (KeeperException ke) {
|
|
||||||
throw new IOException(ke);
|
|
||||||
}
|
|
||||||
|
|
||||||
checkUnDeletedHFileRefsQueues(peerIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
|
|
||||||
try {
|
|
||||||
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
|
|
||||||
Set<String> peers = new HashSet<>(listOfPeers);
|
|
||||||
peers.removeAll(peerIds);
|
|
||||||
if (!peers.isEmpty()) {
|
|
||||||
undeletedHFileRefsQueueIds.addAll(peers);
|
|
||||||
String msg =
|
|
||||||
"Undeleted replication hfile-refs queue for removed peer found: "
|
|
||||||
+ undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
|
|
||||||
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
||||||
msg);
|
msg);
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
|
||||||
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
|
||||||
+ hfileRefsZNode, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
checkUnDeletedHFileRefsQueues();
|
||||||
public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
|
|
||||||
super(zk, conf, abortable);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeQueue(String replicator, String queueId) throws IOException {
|
private void checkUnDeletedHFileRefsQueues() throws IOException {
|
||||||
String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
|
undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
|
||||||
queueId);
|
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
|
||||||
try {
|
String msg = "Undeleted replication hfile-refs queue for removed peer found: "
|
||||||
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
+ undeletedHFileRefsQueueIds + " under hfile-refs node";
|
||||||
LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
|
errorReporter
|
||||||
} catch (KeeperException e) {
|
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
|
||||||
throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
|
|
||||||
+ queueId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void fixUnDeletedQueues() throws IOException {
|
public void fixUnDeletedQueues() throws IOException {
|
||||||
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
if (!undeletedQueueIds.isEmpty()) {
|
||||||
String replicator = replicatorAndQueueIds.getKey();
|
cleaner.removeQueues(undeletedQueueIds);
|
||||||
for (String queueId : replicatorAndQueueIds.getValue()) {
|
|
||||||
queueDeletor.removeQueue(replicator, queueId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
fixUnDeletedHFileRefsQueue();
|
fixUnDeletedHFileRefsQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fixUnDeletedHFileRefsQueue() throws IOException {
|
private void fixUnDeletedHFileRefsQueue() throws IOException {
|
||||||
for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
|
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
|
||||||
String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
|
cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
|
||||||
try {
|
|
||||||
ZKUtil.deleteNodeRecursively(this.zkw, node);
|
|
||||||
LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
|
|
||||||
+ hfileRefsZNode);
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
|
||||||
+ " from path " + hfileRefsZNode);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,115 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestReplicationZKNodeCleaner {
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private final String ID_ONE = "1";
|
||||||
|
private final String SERVER_ONE = "server1";
|
||||||
|
private final String ID_TWO = "2";
|
||||||
|
private final String SERVER_TWO = "server2";
|
||||||
|
|
||||||
|
private final Configuration conf;
|
||||||
|
private final ZooKeeperWatcher zkw;
|
||||||
|
private final ReplicationQueues repQueues;
|
||||||
|
|
||||||
|
public TestReplicationZKNodeCleaner() throws Exception {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null);
|
||||||
|
repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
|
||||||
|
zkw));
|
||||||
|
assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000);
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicationZKNodeCleaner() throws Exception {
|
||||||
|
repQueues.init(SERVER_ONE);
|
||||||
|
// add queue for ID_ONE which isn't exist
|
||||||
|
repQueues.addLog(ID_ONE, "file1");
|
||||||
|
|
||||||
|
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
||||||
|
Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
||||||
|
assertEquals(1, undeletedQueues.size());
|
||||||
|
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
|
||||||
|
assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
|
||||||
|
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
||||||
|
|
||||||
|
// add a recovery queue for ID_TWO which isn't exist
|
||||||
|
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
|
||||||
|
|
||||||
|
undeletedQueues = cleaner.getUnDeletedQueues();
|
||||||
|
assertEquals(1, undeletedQueues.size());
|
||||||
|
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
|
||||||
|
assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
|
||||||
|
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
||||||
|
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO));
|
||||||
|
|
||||||
|
cleaner.removeQueues(undeletedQueues);
|
||||||
|
undeletedQueues = cleaner.getUnDeletedQueues();
|
||||||
|
assertEquals(0, undeletedQueues.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicationZKNodeCleanerChore() throws Exception {
|
||||||
|
repQueues.init(SERVER_ONE);
|
||||||
|
// add queue for ID_ONE which isn't exist
|
||||||
|
repQueues.addLog(ID_ONE, "file1");
|
||||||
|
// add a recovery queue for ID_TWO which isn't exist
|
||||||
|
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
|
||||||
|
|
||||||
|
// Wait the cleaner chore to run
|
||||||
|
Thread.sleep(20000);
|
||||||
|
|
||||||
|
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
||||||
|
assertEquals(0, cleaner.getUnDeletedQueues().size());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue