HBASE-19687 Move the logic in ReplicationZKNodeCleaner to ReplicationChecker and remove ReplicationZKNodeCleanerChore
This commit is contained in:
parent
13318dd350
commit
b84fbde175
|
@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
@ -345,10 +345,10 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
ReplicationPeerStorage storage =
|
ReplicationPeerStorage storage =
|
||||||
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
|
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
|
||||||
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
|
||||||
return Pair.newPair(peerConfig,
|
return Pair.newPair(peerConfig,
|
||||||
ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
|
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
throw new IOException("An error occurred while trying to connect to the remove peer cluster",
|
throw new IOException("An error occurred while trying to connect to the remove peer cluster",
|
||||||
e);
|
e);
|
||||||
|
|
|
@ -17,14 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -106,25 +103,6 @@ public class ReplicationPeers {
|
||||||
return Collections.unmodifiableSet(peerCache.keySet());
|
return Collections.unmodifiableSet(peerCache.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
|
|
||||||
Configuration baseConf) throws ReplicationException {
|
|
||||||
Configuration otherConf;
|
|
||||||
try {
|
|
||||||
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!peerConfig.getConfiguration().isEmpty()) {
|
|
||||||
CompoundConfiguration compound = new CompoundConfiguration();
|
|
||||||
compound.add(otherConf);
|
|
||||||
compound.addStringMap(peerConfig.getConfiguration());
|
|
||||||
return compound;
|
|
||||||
}
|
|
||||||
|
|
||||||
return otherConf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public PeerState refreshPeerState(String peerId) throws ReplicationException {
|
public PeerState refreshPeerState(String peerId) throws ReplicationException {
|
||||||
ReplicationPeerImpl peer = peerCache.get(peerId);
|
ReplicationPeerImpl peer = peerCache.get(peerId);
|
||||||
if (peer == null) {
|
if (peer == null) {
|
||||||
|
@ -158,7 +136,7 @@ public class ReplicationPeers {
|
||||||
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
|
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
|
||||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||||
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled,
|
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
|
||||||
peerConfig);
|
peerId, enabled, peerConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,4 +43,37 @@ public final class ReplicationUtils {
|
||||||
return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
|
||||||
|
Configuration baseConf) throws ReplicationException {
|
||||||
|
Configuration otherConf;
|
||||||
|
try {
|
||||||
|
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!peerConfig.getConfiguration().isEmpty()) {
|
||||||
|
CompoundConfiguration compound = new CompoundConfiguration();
|
||||||
|
compound.add(otherConf);
|
||||||
|
compound.addStringMap(peerConfig.getConfiguration());
|
||||||
|
return compound;
|
||||||
|
}
|
||||||
|
|
||||||
|
return otherConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void removeAllQueues(ReplicationQueueStorage queueStorage, String peerId)
|
||||||
|
throws ReplicationException {
|
||||||
|
for (ServerName replicator : queueStorage.getListOfReplicators()) {
|
||||||
|
List<String> queueIds = queueStorage.getAllQueues(replicator);
|
||||||
|
for (String queueId : queueIds) {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
if (queueInfo.getPeerId().equals(peerId)) {
|
||||||
|
queueStorage.removeQueue(replicator, queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -240,7 +240,7 @@ public abstract class TestReplicationStateBasic {
|
||||||
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
|
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
|
||||||
assertNumberOfPeers(2);
|
assertNumberOfPeers(2);
|
||||||
|
|
||||||
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
|
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
|
||||||
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
|
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
|
||||||
rp.getPeerStorage().removePeer(ID_ONE);
|
rp.getPeerStorage().removePeer(ID_ONE);
|
||||||
rp.removePeer(ID_ONE);
|
rp.removePeer(ID_ONE);
|
||||||
|
|
|
@ -108,8 +108,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
||||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
|
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
|
|
||||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||||
|
@ -364,8 +362,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
private ClusterStatusPublisher clusterStatusPublisherChore = null;
|
private ClusterStatusPublisher clusterStatusPublisherChore = null;
|
||||||
|
|
||||||
CatalogJanitor catalogJanitorChore;
|
CatalogJanitor catalogJanitorChore;
|
||||||
|
|
||||||
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
|
|
||||||
private LogCleaner logCleaner;
|
private LogCleaner logCleaner;
|
||||||
private HFileCleaner hfileCleaner;
|
private HFileCleaner hfileCleaner;
|
||||||
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
||||||
|
@ -1166,14 +1162,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Started service threads");
|
LOG.trace("Started service threads");
|
||||||
}
|
}
|
||||||
// Start replication zk node cleaner
|
|
||||||
try {
|
|
||||||
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
|
|
||||||
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
|
|
||||||
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("start replicationZKNodeCleanerChore failed", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1196,7 +1184,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// Clean up and close up shop
|
// Clean up and close up shop
|
||||||
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
||||||
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
||||||
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
|
|
||||||
if (this.quotaManager != null) this.quotaManager.stop();
|
if (this.quotaManager != null) this.quotaManager.stop();
|
||||||
|
|
||||||
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
||||||
|
|
|
@ -1,192 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Used to clean the replication queues belonging to the peer which does not exist.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class ReplicationZKNodeCleaner {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
|
|
||||||
private final ReplicationQueueStorage queueStorage;
|
|
||||||
private final ReplicationPeerStorage peerStorage;
|
|
||||||
private final ReplicationQueueDeletor queueDeletor;
|
|
||||||
|
|
||||||
public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
|
|
||||||
throws IOException {
|
|
||||||
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
|
||||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
|
|
||||||
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return undeletedQueues replicator with its queueIds for removed peers
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
|
|
||||||
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
|
|
||||||
try {
|
|
||||||
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
|
|
||||||
List<ServerName> replicators = this.queueStorage.getListOfReplicators();
|
|
||||||
if (replicators == null || replicators.isEmpty()) {
|
|
||||||
return undeletedQueues;
|
|
||||||
}
|
|
||||||
for (ServerName replicator : replicators) {
|
|
||||||
List<String> queueIds = this.queueStorage.getAllQueues(replicator);
|
|
||||||
for (String queueId : queueIds) {
|
|
||||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
|
||||||
if (!peerIds.contains(queueInfo.getPeerId())) {
|
|
||||||
undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Undeleted replication queue for removed peer found: "
|
|
||||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
|
||||||
queueInfo.getPeerId(), replicator, queueId));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (ReplicationException ke) {
|
|
||||||
throw new IOException("Failed to get the replication queues of all replicators", ke);
|
|
||||||
}
|
|
||||||
return undeletedQueues;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
|
|
||||||
* hfile-refs queue
|
|
||||||
*/
|
|
||||||
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
|
|
||||||
Set<String> undeletedHFileRefsQueue = new HashSet<>();
|
|
||||||
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
|
|
||||||
try {
|
|
||||||
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
|
|
||||||
List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
|
|
||||||
Set<String> peers = new HashSet<>(listOfPeers);
|
|
||||||
peers.removeAll(peerIds);
|
|
||||||
if (!peers.isEmpty()) {
|
|
||||||
undeletedHFileRefsQueue.addAll(peers);
|
|
||||||
}
|
|
||||||
} catch (ReplicationException e) {
|
|
||||||
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
|
||||||
+ hfileRefsZNode, e);
|
|
||||||
}
|
|
||||||
return undeletedHFileRefsQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
|
||||||
|
|
||||||
ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
|
|
||||||
super(zk, conf, abortable);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param replicator The regionserver which has undeleted queue
|
|
||||||
* @param queueId The undeleted queue id
|
|
||||||
*/
|
|
||||||
void removeQueue(final ServerName replicator, final String queueId) throws IOException {
|
|
||||||
String queueZnodePath =
|
|
||||||
ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()),
|
|
||||||
queueId);
|
|
||||||
try {
|
|
||||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
|
||||||
if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) {
|
|
||||||
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
|
||||||
LOG.info("Successfully removed replication queue, replicator: " + replicator
|
|
||||||
+ ", queueId: " + queueId);
|
|
||||||
}
|
|
||||||
} catch (ReplicationException | KeeperException e) {
|
|
||||||
throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
|
|
||||||
+ queueId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param hfileRefsQueueId The undeleted hfile-refs queue id
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
|
|
||||||
String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
|
|
||||||
try {
|
|
||||||
if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) {
|
|
||||||
ZKUtil.deleteNodeRecursively(this.zookeeper, node);
|
|
||||||
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
|
|
||||||
+ hfileRefsZNode);
|
|
||||||
}
|
|
||||||
} catch (ReplicationException | KeeperException e) {
|
|
||||||
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
|
||||||
+ " from path " + hfileRefsZNode, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
String getHfileRefsZNode() {
|
|
||||||
return this.hfileRefsZNode;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove the undeleted replication queue's zk node for removed peers.
|
|
||||||
* @param undeletedQueues replicator with its queueIds for removed peers
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void removeQueues(final Map<ServerName, List<String>> undeletedQueues) throws IOException {
|
|
||||||
for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
|
|
||||||
ServerName replicator = replicatorAndQueueIds.getKey();
|
|
||||||
for (String queueId : replicatorAndQueueIds.getValue()) {
|
|
||||||
queueDeletor.removeQueue(replicator, queueId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove the undeleted hfile-refs queue's zk node for removed peers.
|
|
||||||
* @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
|
|
||||||
* hfile-refs queue
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
|
|
||||||
for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
|
|
||||||
queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,54 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.ScheduledChore;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Chore that will clean the replication queues belonging to the peer which does not exist.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class ReplicationZKNodeCleanerChore extends ScheduledChore {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleanerChore.class);
|
|
||||||
private final ReplicationZKNodeCleaner cleaner;
|
|
||||||
|
|
||||||
public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
|
|
||||||
ReplicationZKNodeCleaner cleaner) {
|
|
||||||
super("ReplicationZKNodeCleanerChore", stopper, period);
|
|
||||||
this.cleaner = cleaner;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void chore() {
|
|
||||||
try {
|
|
||||||
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
|
||||||
cleaner.removeQueues(undeletedQueues);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Failed to clean replication zk node", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -216,19 +217,6 @@ public class ReplicationPeerManager {
|
||||||
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
|
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeAllQueues0(String peerId) throws ReplicationException {
|
|
||||||
for (ServerName replicator : queueStorage.getListOfReplicators()) {
|
|
||||||
List<String> queueIds = queueStorage.getAllQueues(replicator);
|
|
||||||
for (String queueId : queueIds) {
|
|
||||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
|
||||||
if (queueInfo.getPeerId().equals(peerId)) {
|
|
||||||
queueStorage.removeQueue(replicator, queueId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
|
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
|
||||||
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
|
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
|
||||||
// on-going when the refresh peer config procedure is done, if a RS which has already been
|
// on-going when the refresh peer config procedure is done, if a RS which has already been
|
||||||
|
@ -241,8 +229,8 @@ public class ReplicationPeerManager {
|
||||||
// claimed once after the refresh peer procedure done(as the next claim queue will just delete
|
// claimed once after the refresh peer procedure done(as the next claim queue will just delete
|
||||||
// it), so we can make sure that a two pass scan will finally find the queue and remove it,
|
// it), so we can make sure that a two pass scan will finally find the queue and remove it,
|
||||||
// unless it has already been removed by others.
|
// unless it has already been removed by others.
|
||||||
removeAllQueues0(peerId);
|
ReplicationUtils.removeAllQueues(queueStorage, peerId);
|
||||||
removeAllQueues0(peerId);
|
ReplicationUtils.removeAllQueues(queueStorage, peerId);
|
||||||
queueStorage.removePeerFromHFileRefs(peerId);
|
queueStorage.removePeerFromHFileRefs(peerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.master.RegionState;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
||||||
|
@ -752,7 +753,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
* @return 0 on success, non-zero on failure
|
* @return 0 on success, non-zero on failure
|
||||||
*/
|
*/
|
||||||
public int onlineHbck()
|
public int onlineHbck()
|
||||||
throws IOException, KeeperException, InterruptedException {
|
throws IOException, KeeperException, InterruptedException, ReplicationException {
|
||||||
// print hbase server version
|
// print hbase server version
|
||||||
errors.print("Version: " + status.getHBaseVersion());
|
errors.print("Version: " + status.getHBaseVersion());
|
||||||
|
|
||||||
|
@ -3576,8 +3577,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
return hbi;
|
return hbi;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkAndFixReplication() throws IOException {
|
private void checkAndFixReplication() throws ReplicationException {
|
||||||
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors);
|
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, errors);
|
||||||
checker.checkUnDeletedQueues();
|
checker.checkUnDeletedQueues();
|
||||||
|
|
||||||
if (checker.hasUnDeletedQueues() && this.fixReplication) {
|
if (checker.hasUnDeletedQueues() && this.fixReplication) {
|
||||||
|
@ -4865,8 +4866,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
|
public HBaseFsck exec(ExecutorService exec, String[] args)
|
||||||
InterruptedException {
|
throws KeeperException, IOException, InterruptedException, ReplicationException {
|
||||||
long sleepBeforeRerun = DEFAULT_SLEEP_BEFORE_RERUN;
|
long sleepBeforeRerun = DEFAULT_SLEEP_BEFORE_RERUN;
|
||||||
|
|
||||||
boolean checkCorruptHFiles = false;
|
boolean checkCorruptHFiles = false;
|
||||||
|
|
|
@ -17,84 +17,115 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.util.hbck;
|
package org.apache.hadoop.hbase.util.hbck;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check and fix undeleted replication queues for removed peerId.
|
* Check and fix undeleted replication queues for removed peerId.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationChecker {
|
public class ReplicationChecker {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class);
|
||||||
|
|
||||||
private final ErrorReporter errorReporter;
|
private final ErrorReporter errorReporter;
|
||||||
// replicator with its queueIds for removed peers
|
// replicator with its queueIds for removed peers
|
||||||
private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
|
private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
|
||||||
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
|
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
|
||||||
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
|
private Set<String> undeletedHFileRefsPeerIds = new HashSet<>();
|
||||||
private final ReplicationZKNodeCleaner cleaner;
|
|
||||||
|
|
||||||
public ReplicationChecker(Configuration conf, ZKWatcher zkw, ClusterConnection connection,
|
private final ReplicationPeerStorage peerStorage;
|
||||||
ErrorReporter errorReporter) throws IOException {
|
private final ReplicationQueueStorage queueStorage;
|
||||||
this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
|
|
||||||
|
public ReplicationChecker(Configuration conf, ZKWatcher zkw, ErrorReporter errorReporter) {
|
||||||
|
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
|
||||||
|
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||||
this.errorReporter = errorReporter;
|
this.errorReporter = errorReporter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasUnDeletedQueues() {
|
public boolean hasUnDeletedQueues() {
|
||||||
return errorReporter.getErrorList().contains(
|
return errorReporter.getErrorList()
|
||||||
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkUnDeletedQueues() throws IOException {
|
private Map<ServerName, List<String>> getUnDeletedQueues() throws ReplicationException {
|
||||||
undeletedQueueIds = cleaner.getUnDeletedQueues();
|
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
|
||||||
for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
|
||||||
ServerName replicator = replicatorAndQueueIds.getKey();
|
for (ServerName replicator : queueStorage.getListOfReplicators()) {
|
||||||
for (String queueId : replicatorAndQueueIds.getValue()) {
|
for (String queueId : queueStorage.getAllQueues(replicator)) {
|
||||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
String msg = "Undeleted replication queue for removed peer found: "
|
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
|
undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId);
|
||||||
replicator, queueId);
|
LOG.debug(
|
||||||
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
"Undeleted replication queue for removed peer found: " +
|
||||||
msg);
|
"[removedPeerId={}, replicator={}, queueId={}]",
|
||||||
|
queueInfo.getPeerId(), replicator, queueId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return undeletedQueues;
|
||||||
checkUnDeletedHFileRefsQueues();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkUnDeletedHFileRefsQueues() throws IOException {
|
private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException {
|
||||||
undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
|
Set<String> undeletedHFileRefsPeerIds =
|
||||||
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
|
new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue());
|
||||||
String msg = "Undeleted replication hfile-refs queue for removed peer found: "
|
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
|
||||||
+ undeletedHFileRefsQueueIds + " under hfile-refs node";
|
undeletedHFileRefsPeerIds.removeAll(peerIds);
|
||||||
errorReporter
|
if (LOG.isDebugEnabled()) {
|
||||||
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
|
for (String peerId : undeletedHFileRefsPeerIds) {
|
||||||
|
LOG.debug("Undeleted replication hfile-refs queue for removed peer {} found", peerId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return undeletedHFileRefsPeerIds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void fixUnDeletedQueues() throws IOException {
|
public void checkUnDeletedQueues() throws ReplicationException {
|
||||||
if (!undeletedQueueIds.isEmpty()) {
|
undeletedQueueIds = getUnDeletedQueues();
|
||||||
cleaner.removeQueues(undeletedQueueIds);
|
undeletedQueueIds.forEach((replicator, queueIds) -> {
|
||||||
|
queueIds.forEach(queueId -> {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
String msg = "Undeleted replication queue for removed peer found: " +
|
||||||
|
String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
|
||||||
|
replicator, queueId);
|
||||||
|
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
||||||
|
msg);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers();
|
||||||
|
undeletedHFileRefsPeerIds.stream()
|
||||||
|
.map(
|
||||||
|
peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found")
|
||||||
|
.forEach(msg -> errorReporter
|
||||||
|
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fixUnDeletedQueues() throws ReplicationException {
|
||||||
|
for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
||||||
|
ServerName replicator = replicatorAndQueueIds.getKey();
|
||||||
|
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||||
|
queueStorage.removeQueue(replicator, queueId);
|
||||||
|
}
|
||||||
|
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
|
||||||
}
|
}
|
||||||
fixUnDeletedHFileRefsQueue();
|
for (String peerId : undeletedHFileRefsPeerIds) {
|
||||||
}
|
queueStorage.removePeerFromHFileRefs(peerId);
|
||||||
|
|
||||||
private void fixUnDeletedHFileRefsQueue() throws IOException {
|
|
||||||
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
|
|
||||||
cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category({ MasterTests.class, MediumTests.class })
|
|
||||||
public class TestReplicationZKNodeCleaner {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestReplicationZKNodeCleaner.class);
|
|
||||||
|
|
||||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private final String ID_ONE = "1";
|
|
||||||
private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234);
|
|
||||||
private final String ID_TWO = "2";
|
|
||||||
private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234);
|
|
||||||
|
|
||||||
private final Configuration conf;
|
|
||||||
private final ZKWatcher zkw;
|
|
||||||
private final ReplicationQueueStorage repQueues;
|
|
||||||
|
|
||||||
public TestReplicationZKNodeCleaner() throws Exception {
|
|
||||||
conf = TEST_UTIL.getConfiguration();
|
|
||||||
zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
|
|
||||||
repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000);
|
|
||||||
TEST_UTIL.startMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReplicationZKNodeCleaner() throws Exception {
|
|
||||||
// add queue for ID_ONE which isn't exist
|
|
||||||
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
|
|
||||||
|
|
||||||
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
|
||||||
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
|
||||||
assertEquals(1, undeletedQueues.size());
|
|
||||||
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
|
|
||||||
assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
|
|
||||||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
|
||||||
|
|
||||||
// add a recovery queue for ID_TWO which isn't exist
|
|
||||||
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
|
|
||||||
|
|
||||||
undeletedQueues = cleaner.getUnDeletedQueues();
|
|
||||||
assertEquals(1, undeletedQueues.size());
|
|
||||||
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
|
|
||||||
assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
|
|
||||||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
|
||||||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO));
|
|
||||||
|
|
||||||
cleaner.removeQueues(undeletedQueues);
|
|
||||||
undeletedQueues = cleaner.getUnDeletedQueues();
|
|
||||||
assertEquals(0, undeletedQueues.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReplicationZKNodeCleanerChore() throws Exception {
|
|
||||||
// add queue for ID_ONE which isn't exist
|
|
||||||
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
|
|
||||||
// add a recovery queue for ID_TWO which isn't exist
|
|
||||||
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
|
|
||||||
|
|
||||||
// Wait the cleaner chore to run
|
|
||||||
Thread.sleep(20000);
|
|
||||||
|
|
||||||
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
|
||||||
assertEquals(0, cleaner.getUnDeletedQueues().size());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
||||||
|
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
|
public class TestHBaseFsckReplication {
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
ReplicationPeerStorage peerStorage = ReplicationStorageFactory
|
||||||
|
.getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
|
||||||
|
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
|
||||||
|
.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
|
||||||
|
|
||||||
|
String peerId1 = "1";
|
||||||
|
String peerId2 = "2";
|
||||||
|
peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
|
||||||
|
true);
|
||||||
|
peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
|
||||||
|
true);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1,
|
||||||
|
"file-" + i);
|
||||||
|
}
|
||||||
|
queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file");
|
||||||
|
HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
|
||||||
|
HbckTestingUtil.assertNoErrors(fsck);
|
||||||
|
|
||||||
|
// should not remove anything since the replication peer is still alive
|
||||||
|
assertEquals(10, queueStorage.getListOfReplicators().size());
|
||||||
|
peerStorage.removePeer(peerId1);
|
||||||
|
// there should be orphan queues
|
||||||
|
assertEquals(10, queueStorage.getListOfReplicators().size());
|
||||||
|
fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false);
|
||||||
|
HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
|
||||||
|
return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
|
||||||
|
}).limit(10).toArray(ERROR_CODE[]::new));
|
||||||
|
|
||||||
|
// should not delete anything when fix is false
|
||||||
|
assertEquals(10, queueStorage.getListOfReplicators().size());
|
||||||
|
|
||||||
|
fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
|
||||||
|
HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
|
||||||
|
return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
|
||||||
|
}).limit(10).toArray(ERROR_CODE[]::new));
|
||||||
|
|
||||||
|
List<ServerName> replicators = queueStorage.getListOfReplicators();
|
||||||
|
// should not remove the server with queue for peerId2
|
||||||
|
assertEquals(1, replicators.size());
|
||||||
|
assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0));
|
||||||
|
for (String queueId : queueStorage.getAllQueues(replicators.get(0))) {
|
||||||
|
assertEquals(peerId2, queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,7 +46,7 @@ public class HbckTestingUtil {
|
||||||
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
|
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
|
||||||
boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
|
boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
|
||||||
boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks,
|
boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks,
|
||||||
boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication,
|
boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication,
|
||||||
TableName table) throws Exception {
|
TableName table) throws Exception {
|
||||||
HBaseFsck fsck = new HBaseFsck(conf, exec);
|
HBaseFsck fsck = new HBaseFsck(conf, exec);
|
||||||
try {
|
try {
|
||||||
|
@ -78,10 +78,8 @@ public class HbckTestingUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs hbck with the -sidelineCorruptHFiles option
|
* Runs hbck with the -sidelineCorruptHFiles option
|
||||||
* @param conf
|
|
||||||
* @param table table constraint
|
* @param table table constraint
|
||||||
* @return <returncode, hbckInstance>
|
* @return hbckInstance
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public static HBaseFsck doHFileQuarantine(Configuration conf, TableName table) throws Exception {
|
public static HBaseFsck doHFileQuarantine(Configuration conf, TableName table) throws Exception {
|
||||||
String[] args = {"-sidelineCorruptHFiles", "-ignorePreCheckPermission", table.getNameAsString()};
|
String[] args = {"-sidelineCorruptHFiles", "-ignorePreCheckPermission", table.getNameAsString()};
|
||||||
|
|
Loading…
Reference in New Issue