HBASE-19687 Move the logic in ReplicationZKNodeCleaner to ReplicationChecker and remove ReplicationZKNodeCleanerChore

This commit is contained in:
zhangduo 2018-01-03 09:39:44 +08:00
parent d95ee41583
commit 368db315a6
13 changed files with 260 additions and 460 deletions

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -345,10 +345,10 @@ public class VerifyReplication extends Configured implements Tool {
}
});
ReplicationPeerStorage storage =
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
} catch (ReplicationException e) {
throw new IOException("An error occurred while trying to connect to the remove peer cluster",
e);

View File

@ -17,14 +17,11 @@
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -106,25 +103,6 @@ public class ReplicationPeers {
return Collections.unmodifiableSet(peerCache.keySet());
}
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}
if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}
return otherConf;
}
public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) {
@ -151,7 +129,7 @@ public class ReplicationPeers {
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled,
peerConfig);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, enabled, peerConfig);
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Helper class for replication.
*/
@InterfaceAudience.Private
public final class ReplicationUtils {
private ReplicationUtils() {
}
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}
if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}
return otherConf;
}
public static void removeAllQueues(ReplicationQueueStorage queueStorage, String peerId)
throws ReplicationException {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
}

View File

@ -240,7 +240,7 @@ public abstract class TestReplicationStateBasic {
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
assertNumberOfPeers(2);
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
rp.getPeerStorage().removePeer(ID_ONE);
rp.removePeer(ID_ONE);

View File

@ -108,8 +108,6 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@ -363,7 +361,6 @@ public class HMaster extends HRegionServer implements MasterServices {
CatalogJanitor catalogJanitorChore;
private ReplicationMetaCleaner replicationMetaCleaner;
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@ -1150,15 +1147,6 @@ public class HMaster extends HRegionServer implements MasterServices {
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
// Start replication zk node cleaner
try {
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
} catch (Exception e) {
LOG.error("start replicationZKNodeCleanerChore failed", e);
}
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
getChoreService().scheduleChore(replicationMetaCleaner);
}
@ -1183,7 +1171,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();

View File

@ -1,192 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to clean the replication queues belonging to the peer which does not exist.
*/
@InterfaceAudience.Private
public class ReplicationZKNodeCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
private final ReplicationQueueStorage queueStorage;
private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueDeletor queueDeletor;
public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
throws IOException {
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
}
/**
* @return undeletedQueues replicator with its queueIds for removed peers
* @throws IOException
*/
public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
try {
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
List<ServerName> replicators = this.queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) {
return undeletedQueues;
}
for (ServerName replicator : replicators) {
List<String> queueIds = this.queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId);
if (LOG.isDebugEnabled()) {
LOG.debug("Undeleted replication queue for removed peer found: "
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
queueInfo.getPeerId(), replicator, queueId));
}
}
}
}
} catch (ReplicationException ke) {
throw new IOException("Failed to get the replication queues of all replicators", ke);
}
return undeletedQueues;
}
/**
* @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
* hfile-refs queue
*/
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
Set<String> undeletedHFileRefsQueue = new HashSet<>();
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
try {
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
Set<String> peers = new HashSet<>(listOfPeers);
peers.removeAll(peerIds);
if (!peers.isEmpty()) {
undeletedHFileRefsQueue.addAll(peers);
}
} catch (ReplicationException e) {
throw new IOException("Failed to get list of all peers from hfile-refs znode "
+ hfileRefsZNode, e);
}
return undeletedHFileRefsQueue;
}
private class ReplicationQueueDeletor extends ReplicationStateZKBase {
ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
super(zk, conf, abortable);
}
/**
* @param replicator The regionserver which has undeleted queue
* @param queueId The undeleted queue id
*/
void removeQueue(final ServerName replicator, final String queueId) throws IOException {
String queueZnodePath =
ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()),
queueId);
try {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) {
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
LOG.info("Successfully removed replication queue, replicator: " + replicator
+ ", queueId: " + queueId);
}
} catch (ReplicationException | KeeperException e) {
throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
+ queueId);
}
}
/**
* @param hfileRefsQueueId The undeleted hfile-refs queue id
* @throws IOException
*/
void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
try {
if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) {
ZKUtil.deleteNodeRecursively(this.zookeeper, node);
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
+ hfileRefsZNode);
}
} catch (ReplicationException | KeeperException e) {
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+ " from path " + hfileRefsZNode, e);
}
}
String getHfileRefsZNode() {
return this.hfileRefsZNode;
}
}
/**
* Remove the undeleted replication queue's zk node for removed peers.
* @param undeletedQueues replicator with its queueIds for removed peers
* @throws IOException
*/
public void removeQueues(final Map<ServerName, List<String>> undeletedQueues) throws IOException {
for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
queueDeletor.removeQueue(replicator, queueId);
}
}
}
/**
* Remove the undeleted hfile-refs queue's zk node for removed peers.
* @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
* hfile-refs queue
* @throws IOException
*/
public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
}
}
}

View File

@ -1,54 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Chore that will clean the replication queues belonging to the peer which does not exist.
*/
@InterfaceAudience.Private
public class ReplicationZKNodeCleanerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleanerChore.class);
private final ReplicationZKNodeCleaner cleaner;
public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
ReplicationZKNodeCleaner cleaner) {
super("ReplicationZKNodeCleanerChore", stopper, period);
this.cleaner = cleaner;
}
@Override
protected void chore() {
try {
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
cleaner.removeQueues(undeletedQueues);
} catch (IOException e) {
LOG.warn("Failed to clean replication zk node", e);
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -216,19 +217,6 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
private void removeAllQueues0(String peerId) throws ReplicationException {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
@ -241,8 +229,8 @@ public class ReplicationPeerManager {
// claimed once after the refresh peer procedure done(as the next claim queue will just delete
// it), so we can make sure that a two pass scan will finally find the queue and remove it,
// unless it has already been removed by others.
removeAllQueues0(peerId);
removeAllQueues0(peerId);
ReplicationUtils.removeAllQueues(queueStorage, peerId);
ReplicationUtils.removeAllQueues(queueStorage, peerId);
queueStorage.removePeerFromHFileRefs(peerId);
}

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@ -142,6 +143,7 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
@ -149,6 +151,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
import org.apache.hbase.thirdparty.com.google.common.collect.TreeMultimap;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
@ -752,7 +755,7 @@ public class HBaseFsck extends Configured implements Closeable {
* @return 0 on success, non-zero on failure
*/
public int onlineHbck()
throws IOException, KeeperException, InterruptedException {
throws IOException, KeeperException, InterruptedException, ReplicationException {
// print hbase server version
errors.print("Version: " + status.getHBaseVersion());
@ -3571,8 +3574,8 @@ public class HBaseFsck extends Configured implements Closeable {
return hbi;
}
private void checkAndFixReplication() throws IOException {
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors);
private void checkAndFixReplication() throws ReplicationException {
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, errors);
checker.checkUnDeletedQueues();
if (checker.hasUnDeletedQueues() && this.fixReplication) {
@ -4859,8 +4862,8 @@ public class HBaseFsck extends Configured implements Closeable {
}
}
public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
InterruptedException {
public HBaseFsck exec(ExecutorService exec, String[] args)
throws KeeperException, IOException, InterruptedException, ReplicationException {
long sleepBeforeRerun = DEFAULT_SLEEP_BEFORE_RERUN;
boolean checkCorruptHFiles = false;

View File

@ -17,84 +17,115 @@
*/
package org.apache.hadoop.hbase.util.hbck;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Check and fix undeleted replication queues for removed peerId.
*/
@InterfaceAudience.Private
public class ReplicationChecker {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class);
private final ErrorReporter errorReporter;
// replicator with its queueIds for removed peers
private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
private final ReplicationZKNodeCleaner cleaner;
private Set<String> undeletedHFileRefsPeerIds = new HashSet<>();
public ReplicationChecker(Configuration conf, ZKWatcher zkw, ClusterConnection connection,
ErrorReporter errorReporter) throws IOException {
this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueStorage queueStorage;
public ReplicationChecker(Configuration conf, ZKWatcher zkw, ErrorReporter errorReporter) {
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
this.errorReporter = errorReporter;
}
public boolean hasUnDeletedQueues() {
return errorReporter.getErrorList().contains(
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
return errorReporter.getErrorList()
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
}
public void checkUnDeletedQueues() throws IOException {
undeletedQueueIds = cleaner.getUnDeletedQueues();
for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
private Map<ServerName, List<String>> getUnDeletedQueues() throws ReplicationException {
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
for (ServerName replicator : queueStorage.getListOfReplicators()) {
for (String queueId : queueStorage.getAllQueues(replicator)) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
String msg = "Undeleted replication queue for removed peer found: "
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
replicator, queueId);
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
msg);
if (!peerIds.contains(queueInfo.getPeerId())) {
undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId);
LOG.debug(
"Undeleted replication queue for removed peer found: " +
"[removedPeerId={}, replicator={}, queueId={}]",
queueInfo.getPeerId(), replicator, queueId);
}
}
}
checkUnDeletedHFileRefsQueues();
return undeletedQueues;
}
private void checkUnDeletedHFileRefsQueues() throws IOException {
undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
String msg = "Undeleted replication hfile-refs queue for removed peer found: "
+ undeletedHFileRefsQueueIds + " under hfile-refs node";
errorReporter
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException {
Set<String> undeletedHFileRefsPeerIds =
new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue());
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
undeletedHFileRefsPeerIds.removeAll(peerIds);
if (LOG.isDebugEnabled()) {
for (String peerId : undeletedHFileRefsPeerIds) {
LOG.debug("Undeleted replication hfile-refs queue for removed peer {} found", peerId);
}
}
return undeletedHFileRefsPeerIds;
}
public void fixUnDeletedQueues() throws IOException {
if (!undeletedQueueIds.isEmpty()) {
cleaner.removeQueues(undeletedQueueIds);
public void checkUnDeletedQueues() throws ReplicationException {
undeletedQueueIds = getUnDeletedQueues();
undeletedQueueIds.forEach((replicator, queueIds) -> {
queueIds.forEach(queueId -> {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
String msg = "Undeleted replication queue for removed peer found: " +
String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
replicator, queueId);
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
msg);
});
});
undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers();
undeletedHFileRefsPeerIds.stream()
.map(
peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found")
.forEach(msg -> errorReporter
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg));
}
public void fixUnDeletedQueues() throws ReplicationException {
for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
queueStorage.removeQueue(replicator, queueId);
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
fixUnDeletedHFileRefsQueue();
}
private void fixUnDeletedHFileRefsQueue() throws IOException {
if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
for (String peerId : undeletedHFileRefsPeerIds) {
queueStorage.removePeerFromHFileRefs(peerId);
}
}
}

View File

@ -1,109 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestReplicationZKNodeCleaner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final String ID_ONE = "1";
private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234);
private final String ID_TWO = "2";
private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234);
private final Configuration conf;
private final ZKWatcher zkw;
private final ReplicationQueueStorage repQueues;
public TestReplicationZKNodeCleaner() throws Exception {
conf = TEST_UTIL.getConfiguration();
zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000);
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testReplicationZKNodeCleaner() throws Exception {
// add queue for ID_ONE which isn't exist
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(1, undeletedQueues.size());
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
// add a recovery queue for ID_TWO which isn't exist
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(1, undeletedQueues.size());
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO));
cleaner.removeQueues(undeletedQueues);
undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(0, undeletedQueues.size());
}
@Test
public void testReplicationZKNodeCleanerChore() throws Exception {
// add queue for ID_ONE which isn't exist
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
// add a recovery queue for ID_TWO which isn't exist
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
// Wait the cleaner chore to run
Thread.sleep(20000);
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
assertEquals(0, cleaner.getUnDeletedQueues().size());
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestHBaseFsckReplication {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
ReplicationPeerStorage peerStorage = ReplicationStorageFactory
.getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
String peerId1 = "1";
String peerId2 = "2";
peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
true);
peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
true);
for (int i = 0; i < 10; i++) {
queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1,
"file-" + i);
}
queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file");
HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
HbckTestingUtil.assertNoErrors(fsck);
// should not remove anything since the replication peer is still alive
assertEquals(10, queueStorage.getListOfReplicators().size());
peerStorage.removePeer(peerId1);
// there should be orphan queues
assertEquals(10, queueStorage.getListOfReplicators().size());
fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false);
HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
}).limit(10).toArray(ERROR_CODE[]::new));
// should not delete anything when fix is false
assertEquals(10, queueStorage.getListOfReplicators().size());
fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
}).limit(10).toArray(ERROR_CODE[]::new));
List<ServerName> replicators = queueStorage.getListOfReplicators();
// should not remove the server with queue for peerId2
assertEquals(1, replicators.size());
assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0));
for (String queueId : queueStorage.getAllQueues(replicators.get(0))) {
assertEquals(peerId2, queueId);
}
}
}

View File

@ -46,7 +46,7 @@ public class HbckTestingUtil {
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks,
boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication,
boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication,
TableName table) throws Exception {
HBaseFsck fsck = new HBaseFsck(conf, exec);
try {
@ -78,10 +78,8 @@ public class HbckTestingUtil {
/**
* Runs hbck with the -sidelineCorruptHFiles option
* @param conf
* @param table table constraint
* @return <returncode, hbckInstance>
* @throws Exception
* @return hbckInstance
*/
public static HBaseFsck doHFileQuarantine(Configuration conf, TableName table) throws Exception {
String[] args = {"-sidelineCorruptHFiles", "-ignorePreCheckPermission", table.getNameAsString()};