HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)
This commit is contained in:
parent
e5b566e01c
commit
3c8e92019c
|
@ -128,11 +128,12 @@ public class ReplicationAdmin implements Closeable {
|
||||||
try {
|
try {
|
||||||
zkw = createZooKeeperWatcher();
|
zkw = createZooKeeperWatcher();
|
||||||
try {
|
try {
|
||||||
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
|
|
||||||
this.replicationPeers.init();
|
|
||||||
this.replicationQueuesClient =
|
this.replicationQueuesClient =
|
||||||
ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
|
ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
|
||||||
this.replicationQueuesClient.init();
|
this.replicationQueuesClient.init();
|
||||||
|
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
|
||||||
|
this.replicationQueuesClient, this.connection);
|
||||||
|
this.replicationPeers.init();
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
if (zkw != null) {
|
if (zkw != null) {
|
||||||
zkw.close();
|
zkw.close();
|
||||||
|
@ -190,7 +191,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
this.replicationPeers.addPeer(id,
|
this.replicationPeers.addPeer(id,
|
||||||
new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
|
new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a new remote slave cluster for replication.
|
* Add a new remote slave cluster for replication.
|
||||||
* @param id a short name that identifies the cluster
|
* @param id a short name that identifies the cluster
|
||||||
|
|
|
@ -42,7 +42,12 @@ public class ReplicationFactory {
|
||||||
|
|
||||||
public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
|
public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
|
||||||
Abortable abortable) {
|
Abortable abortable) {
|
||||||
return new ReplicationPeersZKImpl(zk, conf, abortable);
|
return getReplicationPeers(zk, conf, null, abortable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
|
||||||
|
final ReplicationQueuesClient queuesClient, Abortable abortable) {
|
||||||
|
return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
|
public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
|
||||||
|
|
|
@ -81,14 +81,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
// Map of peer clusters keyed by their id
|
// Map of peer clusters keyed by their id
|
||||||
private Map<String, ReplicationPeerZKImpl> peerClusters;
|
private Map<String, ReplicationPeerZKImpl> peerClusters;
|
||||||
private final String tableCFsNodeName;
|
private final String tableCFsNodeName;
|
||||||
|
private final ReplicationQueuesClient queuesClient;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
|
||||||
|
|
||||||
public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
|
public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
|
||||||
Abortable abortable) {
|
final ReplicationQueuesClient queuesClient, Abortable abortable) {
|
||||||
super(zk, conf, abortable);
|
super(zk, conf, abortable);
|
||||||
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
|
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
|
||||||
this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
|
this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
|
||||||
|
this.queuesClient = queuesClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -116,6 +118,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
throw new IllegalArgumentException("Found invalid peer name:" + id);
|
throw new IllegalArgumentException("Found invalid peer name:" + id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkQueuesDeleted(id);
|
||||||
|
|
||||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||||
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
|
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
|
||||||
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
|
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
|
||||||
|
@ -561,5 +565,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
return ProtobufUtil.prependPBMagic(bytes);
|
return ProtobufUtil.prependPBMagic(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkQueuesDeleted(String peerId) throws ReplicationException {
|
||||||
|
if (queuesClient == null) return;
|
||||||
|
try {
|
||||||
|
List<String> replicators = queuesClient.getListOfReplicators();
|
||||||
|
for (String replicator : replicators) {
|
||||||
|
List<String> queueIds = queuesClient.getAllQueues(replicator);
|
||||||
|
for (String queueId : queueIds) {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
if (queueInfo.getPeerId().equals(peerId)) {
|
||||||
|
throw new ReplicationException("undeleted queue for peerId: " + peerId
|
||||||
|
+ ", replicator: " + replicator + ", queueId: " + queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
||||||
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
||||||
|
import org.apache.hadoop.hbase.util.hbck.ReplicationChecker;
|
||||||
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
|
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
|
||||||
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
|
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
|
||||||
import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
|
import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
|
||||||
|
@ -251,6 +252,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
|
private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
|
||||||
private boolean fixTableLocks = false; // fix table locks which are expired
|
private boolean fixTableLocks = false; // fix table locks which are expired
|
||||||
private boolean fixTableZNodes = false; // fix table Znodes which are orphaned
|
private boolean fixTableZNodes = false; // fix table Znodes which are orphaned
|
||||||
|
private boolean fixReplication = false; // fix undeleted replication queues for removed peer
|
||||||
private boolean fixAny = false; // Set to true if any of the fix is required.
|
private boolean fixAny = false; // Set to true if any of the fix is required.
|
||||||
|
|
||||||
// limit checking/fixes to listed tables, if empty attempt to check/fix all
|
// limit checking/fixes to listed tables, if empty attempt to check/fix all
|
||||||
|
@ -715,6 +717,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
// Check (and fix if requested) orphaned table ZNodes
|
// Check (and fix if requested) orphaned table ZNodes
|
||||||
checkAndFixOrphanedTableZNodes();
|
checkAndFixOrphanedTableZNodes();
|
||||||
|
|
||||||
|
checkAndFixReplication();
|
||||||
|
|
||||||
// Remove the hbck lock
|
// Remove the hbck lock
|
||||||
unlockHbck();
|
unlockHbck();
|
||||||
|
|
||||||
|
@ -3231,6 +3235,21 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkAndFixReplication() throws IOException {
|
||||||
|
ZooKeeperWatcher zkw = createZooKeeperWatcher();
|
||||||
|
try {
|
||||||
|
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors);
|
||||||
|
checker.checkUnDeletedQueues();
|
||||||
|
|
||||||
|
if (checker.hasUnDeletedQueues() && this.fixReplication) {
|
||||||
|
checker.fixUnDeletedQueues();
|
||||||
|
setShouldRerun();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
zkw.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether a orphaned table ZNode exists and fix it if requested.
|
* Check whether a orphaned table ZNode exists and fix it if requested.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -3817,7 +3836,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
|
FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
|
||||||
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
|
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
|
||||||
ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
|
ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
|
||||||
WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, ORPHANED_ZK_TABLE_ENTRY, BOUNDARIES_ERROR
|
WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, ORPHANED_ZK_TABLE_ENTRY, BOUNDARIES_ERROR,
|
||||||
|
UNDELETED_REPLICATION_QUEUE
|
||||||
}
|
}
|
||||||
void clear();
|
void clear();
|
||||||
void report(String message);
|
void report(String message);
|
||||||
|
@ -4218,6 +4238,14 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
fixTableLocks = shouldFix;
|
fixTableLocks = shouldFix;
|
||||||
fixAny |= shouldFix;
|
fixAny |= shouldFix;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set replication fix mode.
|
||||||
|
*/
|
||||||
|
public void setFixReplication(boolean shouldFix) {
|
||||||
|
fixReplication = shouldFix;
|
||||||
|
fixAny |= shouldFix;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set orphaned table ZNodes fix mode.
|
* Set orphaned table ZNodes fix mode.
|
||||||
|
@ -4492,6 +4520,10 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
out.println(" Table Znode options");
|
out.println(" Table Znode options");
|
||||||
out.println(" -fixOrphanedTableZnodes Set table state in ZNode to disabled if table does not exists");
|
out.println(" -fixOrphanedTableZnodes Set table state in ZNode to disabled if table does not exists");
|
||||||
|
|
||||||
|
out.println("");
|
||||||
|
out.println(" Replication options");
|
||||||
|
out.println(" -fixReplication Deletes replication queues for removed peers");
|
||||||
|
|
||||||
out.flush();
|
out.flush();
|
||||||
errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
|
errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
|
||||||
|
|
||||||
|
@ -4678,6 +4710,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
setRegionBoundariesCheck();
|
setRegionBoundariesCheck();
|
||||||
} else if (cmd.equals("-fixTableLocks")) {
|
} else if (cmd.equals("-fixTableLocks")) {
|
||||||
setFixTableLocks(true);
|
setFixTableLocks(true);
|
||||||
|
} else if (cmd.equals("-fixReplication")) {
|
||||||
|
setFixReplication(true);
|
||||||
} else if (cmd.equals("-fixOrphanedTableZnodes")) {
|
} else if (cmd.equals("-fixOrphanedTableZnodes")) {
|
||||||
setFixTableZNodes(true);
|
setFixTableZNodes(true);
|
||||||
} else if (cmd.startsWith("-")) {
|
} else if (cmd.startsWith("-")) {
|
||||||
|
|
|
@ -0,0 +1,134 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util.hbck;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||||
|
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||||
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check and fix undeleted replication queues for removed peerId.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ReplicationChecker {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
|
||||||
|
private ErrorReporter errorReporter;
|
||||||
|
private ReplicationQueuesClient queuesClient;
|
||||||
|
private ReplicationPeers replicationPeers;
|
||||||
|
private ReplicationQueueDeletor queueDeletor;
|
||||||
|
// replicator with its queueIds for removed peers
|
||||||
|
private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
|
||||||
|
|
||||||
|
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
|
||||||
|
ErrorReporter errorReporter) throws IOException {
|
||||||
|
try {
|
||||||
|
this.errorReporter = errorReporter;
|
||||||
|
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
|
||||||
|
this.queuesClient.init();
|
||||||
|
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
|
||||||
|
connection);
|
||||||
|
this.replicationPeers.init();
|
||||||
|
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
throw new IOException("failed to construct ReplicationChecker", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasUnDeletedQueues() {
|
||||||
|
return errorReporter.getErrorList()
|
||||||
|
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkUnDeletedQueues() throws IOException {
|
||||||
|
Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds());
|
||||||
|
try {
|
||||||
|
List<String> replicators = this.queuesClient.getListOfReplicators();
|
||||||
|
for (String replicator : replicators) {
|
||||||
|
List<String> queueIds = this.queuesClient.getAllQueues(replicator);
|
||||||
|
for (String queueId : queueIds) {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||||
|
if (!undeletedQueueIds.containsKey(replicator)) {
|
||||||
|
undeletedQueueIds.put(replicator, new ArrayList<String>());
|
||||||
|
}
|
||||||
|
undeletedQueueIds.get(replicator).add(queueId);
|
||||||
|
|
||||||
|
String msg = "Undeleted replication queue for removed peer found: "
|
||||||
|
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
||||||
|
queueInfo.getPeerId(), replicator, queueId);
|
||||||
|
errorReporter.reportError(
|
||||||
|
HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
throw new IOException(ke);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
||||||
|
public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
|
||||||
|
super(zk, conf, abortable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeQueue(String replicator, String queueId) throws IOException {
|
||||||
|
String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
|
||||||
|
queueId);
|
||||||
|
try {
|
||||||
|
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
||||||
|
LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
|
||||||
|
+ queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fixUnDeletedQueues() throws IOException {
|
||||||
|
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
|
||||||
|
String replicator = replicatorAndQueueIds.getKey();
|
||||||
|
for (String queueId : replicatorAndQueueIds.getValue()) {
|
||||||
|
queueDeletor.removeQueue(replicator, queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -113,6 +116,37 @@ public class TestReplicationAdmin {
|
||||||
admin.removePeer(ID_SECOND);
|
admin.removePeer(ID_SECOND);
|
||||||
assertEquals(0, admin.getPeersCount());
|
assertEquals(0, admin.getPeersCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddPeerWithUnDeletedQueues() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
|
||||||
|
ReplicationQueues repQueues =
|
||||||
|
ReplicationFactory.getReplicationQueues(zkw, conf, null);
|
||||||
|
repQueues.init("server1");
|
||||||
|
|
||||||
|
// add queue for ID_ONE
|
||||||
|
repQueues.addLog(ID_ONE, "file1");
|
||||||
|
try {
|
||||||
|
admin.addPeer(ID_ONE, KEY_ONE);
|
||||||
|
fail();
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
// OK!
|
||||||
|
}
|
||||||
|
repQueues.removeQueue(ID_ONE);
|
||||||
|
assertEquals(0, repQueues.getAllQueues().size());
|
||||||
|
|
||||||
|
// add recovered queue for ID_ONE
|
||||||
|
repQueues.addLog(ID_ONE + "-server2", "file1");
|
||||||
|
try {
|
||||||
|
admin.addPeer(ID_ONE, KEY_ONE);
|
||||||
|
fail();
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
// OK!
|
||||||
|
}
|
||||||
|
repQueues.removeAllQueues();
|
||||||
|
zkw.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* basic checks that when we add a peer that it is enabled, and that we can disable
|
* basic checks that when we add a peer that it is enabled, and that we can disable
|
||||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||||
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
|
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||||
|
@ -111,6 +112,8 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl;
|
import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
|
import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
||||||
|
@ -120,6 +123,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck.TableInfo;
|
||||||
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
||||||
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -1690,7 +1694,7 @@ public class TestHBaseFsck {
|
||||||
// for some time until children references are deleted. HBCK erroneously sees this as
|
// for some time until children references are deleted. HBCK erroneously sees this as
|
||||||
// overlapping regions
|
// overlapping regions
|
||||||
HBaseFsck hbck = doFsck(
|
HBaseFsck hbck = doFsck(
|
||||||
conf, true, true, false, false, false, true, true, true, false, false, false, null);
|
conf, true, true, false, false, false, true, true, true, false, false, false, false, null);
|
||||||
assertErrors(hbck, new ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported
|
assertErrors(hbck, new ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported
|
||||||
|
|
||||||
// assert that the split hbase:meta entry is still there.
|
// assert that the split hbase:meta entry is still there.
|
||||||
|
@ -1759,7 +1763,7 @@ public class TestHBaseFsck {
|
||||||
|
|
||||||
// now fix it. The fix should not revert the region split, but add daughters to META
|
// now fix it. The fix should not revert the region split, but add daughters to META
|
||||||
hbck = doFsck(
|
hbck = doFsck(
|
||||||
conf, true, true, false, false, false, false, false, false, false, false, false, null);
|
conf, true, true, false, false, false, false, false, false, false, false, false,false,null);
|
||||||
assertErrors(hbck,
|
assertErrors(hbck,
|
||||||
new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
|
new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
|
||||||
ERROR_CODE.HOLE_IN_REGION_CHAIN });
|
ERROR_CODE.HOLE_IN_REGION_CHAIN });
|
||||||
|
@ -2201,6 +2205,55 @@ public class TestHBaseFsck {
|
||||||
doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
|
doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
|
||||||
hbck.close();
|
hbck.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testCheckReplication() throws Exception {
|
||||||
|
// check no errors
|
||||||
|
HBaseFsck hbck = doFsck(conf, false);
|
||||||
|
assertNoErrors(hbck);
|
||||||
|
|
||||||
|
// create peer
|
||||||
|
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
|
||||||
|
Assert.assertEquals(0, replicationAdmin.getPeersCount());
|
||||||
|
String zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
|
||||||
|
replicationAdmin.addPeer("1", "127.0.0.1:2181" + zkPort + ":/hbase");
|
||||||
|
replicationAdmin.getPeersCount();
|
||||||
|
Assert.assertEquals(1, replicationAdmin.getPeersCount());
|
||||||
|
|
||||||
|
// create replicator
|
||||||
|
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection);
|
||||||
|
ReplicationQueues repQueues =
|
||||||
|
ReplicationFactory.getReplicationQueues(zkw, conf, connection);
|
||||||
|
repQueues.init("server1");
|
||||||
|
// queues for current peer, no errors
|
||||||
|
repQueues.addLog("1", "file1");
|
||||||
|
repQueues.addLog("1-server2", "file1");
|
||||||
|
Assert.assertEquals(2, repQueues.getAllQueues().size());
|
||||||
|
hbck = doFsck(conf, false);
|
||||||
|
assertNoErrors(hbck);
|
||||||
|
|
||||||
|
// queues for removed peer
|
||||||
|
repQueues.addLog("2", "file1");
|
||||||
|
repQueues.addLog("2-server2", "file1");
|
||||||
|
Assert.assertEquals(4, repQueues.getAllQueues().size());
|
||||||
|
hbck = doFsck(conf, false);
|
||||||
|
assertErrors(hbck, new ERROR_CODE[] { ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
|
||||||
|
ERROR_CODE.UNDELETED_REPLICATION_QUEUE });
|
||||||
|
|
||||||
|
// fix the case
|
||||||
|
hbck = doFsck(conf, true);
|
||||||
|
hbck = doFsck(conf, false);
|
||||||
|
assertNoErrors(hbck);
|
||||||
|
// ensure only "2" is deleted
|
||||||
|
Assert.assertEquals(2, repQueues.getAllQueues().size());
|
||||||
|
Assert.assertNull(repQueues.getLogsInQueue("2"));
|
||||||
|
Assert.assertNull(repQueues.getLogsInQueue("2-sever2"));
|
||||||
|
|
||||||
|
replicationAdmin.removePeer("1");
|
||||||
|
repQueues.removeAllQueues();
|
||||||
|
zkw.close();
|
||||||
|
replicationAdmin.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This creates a table and simulates the race situation where a concurrent compaction or split
|
* This creates a table and simulates the race situation where a concurrent compaction or split
|
||||||
|
@ -2737,7 +2790,8 @@ public class TestHBaseFsck {
|
||||||
// fix hole
|
// fix hole
|
||||||
assertErrors(
|
assertErrors(
|
||||||
doFsck(
|
doFsck(
|
||||||
conf, false, true, false, false, false, false, false, false, false, false, false, null),
|
conf, false, true, false, false, false, false, false, false, false, false, false,
|
||||||
|
false, null),
|
||||||
new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
|
new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
|
||||||
ERROR_CODE.NOT_IN_META_OR_DEPLOYED });
|
ERROR_CODE.NOT_IN_META_OR_DEPLOYED });
|
||||||
|
|
||||||
|
|
|
@ -40,14 +40,14 @@ public class HbckTestingUtil {
|
||||||
|
|
||||||
public static HBaseFsck doFsck(
|
public static HBaseFsck doFsck(
|
||||||
Configuration conf, boolean fix, TableName table) throws Exception {
|
Configuration conf, boolean fix, TableName table) throws Exception {
|
||||||
return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
|
return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments,
|
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments,
|
||||||
boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps,
|
boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps,
|
||||||
boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile,
|
boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile,
|
||||||
boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
|
boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
|
||||||
boolean fixTableZnodes,
|
boolean fixTableZnodes, Boolean fixReplication,
|
||||||
TableName table) throws Exception {
|
TableName table) throws Exception {
|
||||||
HBaseFsck fsck = new HBaseFsck(conf, exec);
|
HBaseFsck fsck = new HBaseFsck(conf, exec);
|
||||||
fsck.setDisplayFullReport(); // i.e. -details
|
fsck.setDisplayFullReport(); // i.e. -details
|
||||||
|
@ -62,6 +62,7 @@ public class HbckTestingUtil {
|
||||||
fsck.setFixReferenceFiles(fixReferenceFiles);
|
fsck.setFixReferenceFiles(fixReferenceFiles);
|
||||||
fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
|
fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
|
||||||
fsck.setFixTableLocks(fixTableLocks);
|
fsck.setFixTableLocks(fixTableLocks);
|
||||||
|
fsck.setFixReplication(fixReplication);
|
||||||
fsck.setFixTableZNodes(fixTableZnodes);
|
fsck.setFixTableZNodes(fixTableZnodes);
|
||||||
fsck.connect();
|
fsck.connect();
|
||||||
if (table != null) {
|
if (table != null) {
|
||||||
|
|
Loading…
Reference in New Issue