HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang)
This commit is contained in:
parent
e51584381a
commit
d87b05f040
|
@ -69,7 +69,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
|||
/** Znode containing all replication queues for this region server. */
|
||||
private String myQueuesZnode;
|
||||
/** Name of znode we use to lock during failover */
|
||||
private final static String RS_LOCK_ZNODE = "lock";
|
||||
public final static String RS_LOCK_ZNODE = "lock";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
|
||||
|
||||
|
|
|
@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
|||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
|
||||
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
|
||||
|
@ -324,6 +326,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
|
||||
CatalogJanitor catalogJanitorChore;
|
||||
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
|
||||
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
|
||||
private ReplicationMetaCleaner replicationMetaCleaner;
|
||||
private LogCleaner logCleaner;
|
||||
private HFileCleaner hfileCleaner;
|
||||
|
@ -1182,6 +1185,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
LOG.error("start replicationZKLockCleanerChore failed", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
|
||||
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
|
||||
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
|
||||
} catch (Exception e) {
|
||||
LOG.error("start replicationZKNodeCleanerChore failed", e);
|
||||
}
|
||||
try {
|
||||
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
|
||||
getChoreService().scheduleChore(replicationMetaCleaner);
|
||||
|
@ -1222,6 +1232,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
||||
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
||||
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
|
||||
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
|
||||
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
|
||||
if (this.quotaManager != null) this.quotaManager.stop();
|
||||
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Used to clean the replication queues belonging to the peer which does not exist.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationZKNodeCleaner {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleaner.class);
|
||||
private final ZooKeeperWatcher zkw;
|
||||
private final ReplicationQueuesClient queuesClient;
|
||||
private final ReplicationPeers replicationPeers;
|
||||
private final ReplicationQueueDeletor queueDeletor;
|
||||
private final boolean useMulti;
|
||||
|
||||
public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, Abortable abortable)
|
||||
throws IOException {
|
||||
try {
|
||||
this.zkw = zkw;
|
||||
this.queuesClient = ReplicationFactory
|
||||
.getReplicationQueuesClient(zkw, conf, abortable);
|
||||
this.queuesClient.init();
|
||||
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
|
||||
abortable);
|
||||
this.replicationPeers.init();
|
||||
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
|
||||
this.useMulti = conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return undeletedQueues replicator with its queueIds for removed peers
|
||||
* @throws IOException
|
||||
*/
|
||||
public Map<String, List<String>> getUnDeletedQueues() throws IOException {
|
||||
Map<String, List<String>> undeletedQueues = new HashMap<>();
|
||||
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
||||
try {
|
||||
List<String> replicators = this.queuesClient.getListOfReplicators();
|
||||
for (String replicator : replicators) {
|
||||
List<String> queueIds = this.queuesClient.getAllQueues(replicator);
|
||||
for (String queueId : queueIds) {
|
||||
if (!useMulti && queueId.equals(ReplicationQueuesZKImpl.RS_LOCK_ZNODE)) {
|
||||
continue;
|
||||
}
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||
if (!undeletedQueues.containsKey(replicator)) {
|
||||
undeletedQueues.put(replicator, new ArrayList<String>());
|
||||
}
|
||||
undeletedQueues.get(replicator).add(queueId);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Undeleted replication queue for removed peer found: "
|
||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
||||
queueInfo.getPeerId(), replicator, queueId));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException("Failed to get the replication queues of all replicators", ke);
|
||||
}
|
||||
return undeletedQueues;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
|
||||
* hfile-refs queue
|
||||
* @throws IOException
|
||||
*/
|
||||
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
|
||||
Set<String> undeletedHFileRefsQueue = new HashSet<>();
|
||||
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
||||
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
|
||||
try {
|
||||
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
|
||||
return null;
|
||||
}
|
||||
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
|
||||
Set<String> peers = new HashSet<>(listOfPeers);
|
||||
peers.removeAll(peerIds);
|
||||
if (!peers.isEmpty()) {
|
||||
undeletedHFileRefsQueue.addAll(peers);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
||||
+ hfileRefsZNode, e);
|
||||
}
|
||||
return undeletedHFileRefsQueue;
|
||||
}
|
||||
|
||||
private class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
||||
|
||||
public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
|
||||
super(zk, conf, abortable);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param replicator The regionserver which has undeleted queue
|
||||
* @param queueId The undeleted queue id
|
||||
* @throws IOException
|
||||
*/
|
||||
public void removeQueue(final String replicator, final String queueId) throws IOException {
|
||||
String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
|
||||
queueId);
|
||||
try {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
||||
LOG.info("Successfully removed replication queue, replicator: " + replicator
|
||||
+ ", queueId: " + queueId);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
|
||||
+ queueId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hfileRefsQueueId The undeleted hfile-refs queue id
|
||||
* @throws IOException
|
||||
*/
|
||||
public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
|
||||
String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
|
||||
try {
|
||||
if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, node);
|
||||
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
|
||||
+ hfileRefsZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
||||
+ " from path " + hfileRefsZNode);
|
||||
}
|
||||
}
|
||||
|
||||
String getHfileRefsZNode() {
|
||||
return this.hfileRefsZNode;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the undeleted replication queue's zk node for removed peers.
|
||||
* @param undeletedQueues replicator with its queueIds for removed peers
|
||||
* @throws IOException
|
||||
*/
|
||||
public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException {
|
||||
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
|
||||
String replicator = replicatorAndQueueIds.getKey();
|
||||
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||
queueDeletor.removeQueue(replicator, queueId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the undeleted hfile-refs queue's zk node for removed peers.
|
||||
* @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
|
||||
* hfile-refs queue
|
||||
* @throws IOException
|
||||
*/
|
||||
public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
|
||||
for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
|
||||
queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Chore that will clean the replication queues belonging to the peer which does not exist.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationZKNodeCleanerChore extends ScheduledChore {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleanerChore.class);
|
||||
private final ReplicationZKNodeCleaner cleaner;
|
||||
|
||||
public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
|
||||
ReplicationZKNodeCleaner cleaner) {
|
||||
super("ReplicationZKNodeCleanerChore", stopper, period);
|
||||
this.cleaner = cleaner;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
||||
cleaner.removeQueues(undeletedQueues);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to clean replication zk node", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hbase.util.hbck;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -27,159 +26,75 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/*
|
||||
* Check and fix undeleted replication queues for removed peerId.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationChecker {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
|
||||
private final ZooKeeperWatcher zkw;
|
||||
private ErrorReporter errorReporter;
|
||||
private ReplicationQueuesClient queuesClient;
|
||||
private ReplicationPeers replicationPeers;
|
||||
private ReplicationQueueDeletor queueDeletor;
|
||||
private final ErrorReporter errorReporter;
|
||||
// replicator with its queueIds for removed peers
|
||||
private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
|
||||
// Set of un deleted hfile refs queue Ids
|
||||
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
|
||||
private final String hfileRefsZNode;
|
||||
private final ReplicationZKNodeCleaner cleaner;
|
||||
|
||||
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
|
||||
ErrorReporter errorReporter) throws IOException {
|
||||
try {
|
||||
this.zkw = zkw;
|
||||
this.errorReporter = errorReporter;
|
||||
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
|
||||
this.queuesClient.init();
|
||||
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
|
||||
connection);
|
||||
this.replicationPeers.init();
|
||||
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException("failed to construct ReplicationChecker", e);
|
||||
}
|
||||
|
||||
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
|
||||
String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName);
|
||||
String hfileRefsZNodeName =
|
||||
conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
|
||||
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
|
||||
hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
|
||||
this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
|
||||
this.errorReporter = errorReporter;
|
||||
}
|
||||
|
||||
public boolean hasUnDeletedQueues() {
|
||||
return errorReporter.getErrorList()
|
||||
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
||||
return errorReporter.getErrorList().contains(
|
||||
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
||||
}
|
||||
|
||||
public void checkUnDeletedQueues() throws IOException {
|
||||
Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds());
|
||||
try {
|
||||
List<String> replicators = this.queuesClient.getListOfReplicators();
|
||||
for (String replicator : replicators) {
|
||||
List<String> queueIds = this.queuesClient.getAllQueues(replicator);
|
||||
for (String queueId : queueIds) {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||
if (!undeletedQueueIds.containsKey(replicator)) {
|
||||
undeletedQueueIds.put(replicator, new ArrayList<String>());
|
||||
}
|
||||
undeletedQueueIds.get(replicator).add(queueId);
|
||||
|
||||
String msg = "Undeleted replication queue for removed peer found: "
|
||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
||||
queueInfo.getPeerId(), replicator, queueId);
|
||||
errorReporter.reportError(
|
||||
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException(ke);
|
||||
}
|
||||
|
||||
checkUnDeletedHFileRefsQueues(peerIds);
|
||||
}
|
||||
|
||||
private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
|
||||
try {
|
||||
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
|
||||
return;
|
||||
}
|
||||
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
|
||||
Set<String> peers = new HashSet<>(listOfPeers);
|
||||
peers.removeAll(peerIds);
|
||||
if (!peers.isEmpty()) {
|
||||
undeletedHFileRefsQueueIds.addAll(peers);
|
||||
String msg =
|
||||
"Undeleted replication hfile-refs queue for removed peer found: "
|
||||
+ undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
|
||||
undeletedQueueIds = cleaner.getUnDeletedQueues();
|
||||
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
||||
String replicator = replicatorAndQueueIds.getKey();
|
||||
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
String msg = "Undeleted replication queue for removed peer found: "
|
||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
|
||||
replicator, queueId);
|
||||
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
||||
msg);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
||||
+ hfileRefsZNode, e);
|
||||
}
|
||||
|
||||
checkUnDeletedHFileRefsQueues();
|
||||
}
|
||||
|
||||
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
||||
public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
|
||||
super(zk, conf, abortable);
|
||||
}
|
||||
|
||||
public void removeQueue(String replicator, String queueId) throws IOException {
|
||||
String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
|
||||
queueId);
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
||||
LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
|
||||
+ queueId);
|
||||
}
|
||||
private void checkUnDeletedHFileRefsQueues() throws IOException {
|
||||
undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
|
||||
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
|
||||
String msg = "Undeleted replication hfile-refs queue for removed peer found: "
|
||||
+ undeletedHFileRefsQueueIds + " under hfile-refs node";
|
||||
errorReporter
|
||||
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
|
||||
}
|
||||
}
|
||||
|
||||
public void fixUnDeletedQueues() throws IOException {
|
||||
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
||||
String replicator = replicatorAndQueueIds.getKey();
|
||||
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||
queueDeletor.removeQueue(replicator, queueId);
|
||||
}
|
||||
if (!undeletedQueueIds.isEmpty()) {
|
||||
cleaner.removeQueues(undeletedQueueIds);
|
||||
}
|
||||
fixUnDeletedHFileRefsQueue();
|
||||
}
|
||||
|
||||
private void fixUnDeletedHFileRefsQueue() throws IOException {
|
||||
for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
|
||||
String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(this.zkw, node);
|
||||
LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
|
||||
+ hfileRefsZNode);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
||||
+ " from path " + hfileRefsZNode);
|
||||
}
|
||||
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
|
||||
cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestReplicationZKNodeCleaner {
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private final String ID_ONE = "1";
|
||||
private final String SERVER_ONE = "server1";
|
||||
private final String ID_TWO = "2";
|
||||
private final String SERVER_TWO = "server2";
|
||||
|
||||
private final Configuration conf;
|
||||
private final ZooKeeperWatcher zkw;
|
||||
private final ReplicationQueues repQueues;
|
||||
|
||||
public TestReplicationZKNodeCleaner() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null);
|
||||
repQueues = ReplicationFactory.getReplicationQueues(zkw, conf, null);
|
||||
assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationZKNodeCleaner() throws Exception {
|
||||
repQueues.init(SERVER_ONE);
|
||||
// add queue for ID_ONE which isn't exist
|
||||
repQueues.addLog(ID_ONE, "file1");
|
||||
|
||||
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
||||
Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
||||
assertEquals(1, undeletedQueues.size());
|
||||
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
|
||||
assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
|
||||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
||||
|
||||
// add a recovery queue for ID_TWO which isn't exist
|
||||
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
|
||||
|
||||
undeletedQueues = cleaner.getUnDeletedQueues();
|
||||
assertEquals(1, undeletedQueues.size());
|
||||
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
|
||||
assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
|
||||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
||||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO));
|
||||
|
||||
cleaner.removeQueues(undeletedQueues);
|
||||
undeletedQueues = cleaner.getUnDeletedQueues();
|
||||
assertEquals(0, undeletedQueues.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationZKNodeCleanerChore() throws Exception {
|
||||
repQueues.init(SERVER_ONE);
|
||||
// add queue for ID_ONE which isn't exist
|
||||
repQueues.addLog(ID_ONE, "file1");
|
||||
// add a recovery queue for ID_TWO which isn't exist
|
||||
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
|
||||
|
||||
// Wait the cleaner chore to run
|
||||
Thread.sleep(20000);
|
||||
|
||||
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
||||
assertEquals(0, cleaner.getUnDeletedQueues().size());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue