HBASE-16144 Replication queue's lock will live forever if RS acquiring the lock has died prematurely

This commit is contained in:
Phil Yang 2016-07-06 17:38:07 +08:00
parent 40204eb0c1
commit 1e3bce943b
4 changed files with 181 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -233,7 +234,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
* @param znode the server names of the other server
* @return true if the lock was acquired, false in every other cases
*/
private boolean lockOtherRS(String znode) {
@VisibleForTesting
public boolean lockOtherRS(String znode) {
try {
String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
if (parent.equals(this.myQueuesZnode)) {
@ -260,6 +262,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
return true;
}
public String getLockZNode(String znode) {
return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE;
}
@VisibleForTesting
public boolean checkLockExists(String znode) throws KeeperException {
return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
}
/**
* Delete all the replication queues for a given region server.
* @param regionserverZnode The znode of the region server to delete.

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
@ -308,6 +309,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private PeriodicDoMetrics periodicDoMetricsChore = null;
CatalogJanitor catalogJanitorChore;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
@ -1139,6 +1141,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
try {
replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
cleanerInterval, this.getZooKeeper(), this.conf);
getChoreService().scheduleChore(replicationZKLockCleanerChore);
} catch (Exception e) {
LOG.error("start replicationZKLockCleanerChore failed", e);
}
}
}
@Override
@ -1172,6 +1183,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();

View File

@ -0,0 +1,112 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
/**
* A cleaner that cleans replication locks on zk which is locked by dead region servers
*/
@InterfaceAudience.Private
public class ReplicationZKLockCleanerChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class);
private ZooKeeperWatcher zk;
private ReplicationTracker tracker;
private long ttl;
private ReplicationQueuesZKImpl queues;
// Wait some times before delete lock to prevent a session expired RS not dead fully.
private static final long DEFAULT_TTL = 60 * 10 * 1000;//10 min
@VisibleForTesting
public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl";
public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period,
ZooKeeperWatcher zk, Configuration conf) throws Exception {
super("ReplicationZKLockCleanerChore", stopper, period);
this.zk = zk;
this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL);
tracker = ReplicationFactory.getReplicationTracker(zk,
ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper);
queues = new ReplicationQueuesZKImpl(zk, conf, abortable);
}
@Override protected void chore() {
try {
List<String> regionServers = tracker.getListOfRegionServers();
if (regionServers == null) {
return;
}
Set<String> rsSet = new HashSet<String>(regionServers);
List<String> replicators = queues.getListOfReplicators();
for (String replicator: replicators) {
try {
String lockNode = queues.getLockZNode(replicator);
byte[] data = ZKUtil.getData(zk, lockNode);
if (data == null) {
continue;
}
String rsServerNameZnode = Bytes.toString(data);
String[] array = rsServerNameZnode.split("/");
String znode = array[array.length - 1];
if (!rsSet.contains(znode)) {
Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false);
if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime() > this.ttl) {
// server is dead, but lock is still there, we have to delete the lock.
ZKUtil.deleteNode(zk, lockNode);
LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode);
}
continue;
}
LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode);
} catch (KeeperException.NoNodeException ignore) {
} catch (InterruptedException e) {
LOG.warn("zk operation interrupted", e);
Thread.currentThread().interrupt();
}
}
} catch (KeeperException e) {
LOG.warn("zk operation interrupted", e);
}
}
}

View File

@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@ -43,12 +46,15 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -94,10 +100,16 @@ public class TestMultiSlaveReplication {
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
conf1.setBoolean(HConstants.ZOOKEEPER_USEMULTI , false);// for testZKLockCleaner
conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
conf1.setClass("hbase.region.replica.replication.replicationQueues.class",
ReplicationQueuesZKImpl.class, ReplicationQueues.class);
conf1.setLong(ReplicationZKLockCleanerChore.TTL_CONFIG_KEY, 0L);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
utility1.setZkCluster(miniZK);
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
@ -196,6 +208,39 @@ public class TestMultiSlaveReplication {
utility1.shutdownMiniCluster();
}
@Test
public void testZKLockCleaner() throws Exception {
MiniHBaseCluster cluster = utility1.startMiniCluster(1, 2);
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("zk")));
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
new HBaseAdmin(conf1).createTable(table);
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
replicationAdmin.addPeer("cluster2", rpc, null);
HRegionServer rs = cluster.getRegionServer(0);
ReplicationQueuesZKImpl zk = new ReplicationQueuesZKImpl(rs.getZooKeeper(), conf1, rs);
zk.init(rs.getServerName().toString());
List<String> replicators = zk.getListOfReplicators();
assertEquals(2, replicators.size());
String zNode = cluster.getRegionServer(1).getServerName().toString();
assertTrue(zk.lockOtherRS(zNode));
assertTrue(zk.checkLockExists(zNode));
Thread.sleep(10000);
assertTrue(zk.checkLockExists(zNode));
cluster.abortRegionServer(0);
Thread.sleep(10000);
HRegionServer rs1 = cluster.getRegionServer(1);
zk = new ReplicationQueuesZKImpl(rs1.getZooKeeper(), conf1, rs1);
zk.init(rs1.getServerName().toString());
assertFalse(zk.checkLockExists(zNode));
utility1.shutdownMiniCluster();
}
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
final byte[] row) throws IOException {
final Admin admin = utility.getHBaseAdmin();