diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index d582f28d3da..0833bca42ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -25,6 +25,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -233,7 +234,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R * @param znode the server names of the other server * @return true if the lock was acquired, false in every other cases */ - private boolean lockOtherRS(String znode) { + @VisibleForTesting + public boolean lockOtherRS(String znode) { try { String parent = ZKUtil.joinZNode(this.queuesZNode, znode); if (parent.equals(this.myQueuesZnode)) { @@ -260,6 +262,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return true; } + public String getLockZNode(String znode) { + return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE; + } + + @VisibleForTesting + public boolean checkLockExists(String znode) throws KeeperException { + return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0; + } + /** * Delete all the replication queues for a given region server. * @param regionserverZnode The znode of the region server to delete. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 19ca6e83620..7faebdd4ea8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; @@ -308,6 +309,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private PeriodicDoMetrics periodicDoMetricsChore = null; CatalogJanitor catalogJanitorChore; + private ReplicationZKLockCleanerChore replicationZKLockCleanerChore; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; @@ -1139,6 +1141,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } + if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { + try { + replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this, + cleanerInterval, this.getZooKeeper(), this.conf); + getChoreService().scheduleChore(replicationZKLockCleanerChore); + } catch (Exception e) { + LOG.error("start replicationZKLockCleanerChore failed", e); + } + } } @Override @@ -1172,6 +1183,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // Clean up and close up shop if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); + if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java new file mode 100644 index 00000000000..dc5338eca76 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + + +/** + * A cleaner that cleans replication locks on zk which is locked by dead region servers + */ +@InterfaceAudience.Private +public class ReplicationZKLockCleanerChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class); + private ZooKeeperWatcher zk; + private ReplicationTracker tracker; + private long ttl; + private ReplicationQueuesZKImpl queues; + + // Wait some times before delete lock to prevent a session expired RS not dead fully. + private static final long DEFAULT_TTL = 60 * 10 * 1000;//10 min + + @VisibleForTesting + public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl"; + + public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period, + ZooKeeperWatcher zk, Configuration conf) throws Exception { + super("ReplicationZKLockCleanerChore", stopper, period); + + this.zk = zk; + this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL); + tracker = ReplicationFactory.getReplicationTracker(zk, + ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper); + queues = new ReplicationQueuesZKImpl(zk, conf, abortable); + } + + @Override protected void chore() { + try { + List regionServers = tracker.getListOfRegionServers(); + if (regionServers == null) { + return; + } + Set rsSet = new HashSet(regionServers); + List replicators = queues.getListOfReplicators(); + + for (String replicator: replicators) { + try { + String lockNode = queues.getLockZNode(replicator); + byte[] data = ZKUtil.getData(zk, lockNode); + if (data == null) { + continue; + } + String rsServerNameZnode = Bytes.toString(data); + String[] array = rsServerNameZnode.split("/"); + String znode = array[array.length - 1]; + if (!rsSet.contains(znode)) { + Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false); + if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime() > this.ttl) { + // server is dead, but lock is still there, we have to delete the lock. + ZKUtil.deleteNode(zk, lockNode); + LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode); + } + continue; + } + LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode); + + } catch (KeeperException.NoNodeException ignore) { + } catch (InterruptedException e) { + LOG.warn("zk operation interrupted", e); + Thread.currentThread().interrupt(); + } + } + } catch (KeeperException e) { + LOG.warn("zk operation interrupted", e); + } + + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index ae38ec66cde..4480dd259ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; @@ -43,12 +46,15 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -94,10 +100,16 @@ public class TestMultiSlaveReplication { conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); + conf1.setBoolean(HConstants.ZOOKEEPER_USEMULTI , false);// for testZKLockCleaner + conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); + conf1.setClass("hbase.region.replica.replication.replicationQueues.class", + ReplicationQueuesZKImpl.class, ReplicationQueues.class); + conf1.setLong(ReplicationZKLockCleanerChore.TTL_CONFIG_KEY, 0L); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + utility1.setZkCluster(miniZK); new ZooKeeperWatcher(conf1, "cluster1", null, true); conf2 = new Configuration(conf1); @@ -196,6 +208,39 @@ public class TestMultiSlaveReplication { utility1.shutdownMiniCluster(); } + @Test + public void testZKLockCleaner() throws Exception { + MiniHBaseCluster cluster = utility1.startMiniCluster(1, 2); + HTableDescriptor table = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("zk"))); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + new HBaseAdmin(conf1).createTable(table); + ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + replicationAdmin.addPeer("cluster2", rpc, null); + HRegionServer rs = cluster.getRegionServer(0); + ReplicationQueuesZKImpl zk = new ReplicationQueuesZKImpl(rs.getZooKeeper(), conf1, rs); + zk.init(rs.getServerName().toString()); + List replicators = zk.getListOfReplicators(); + assertEquals(2, replicators.size()); + String zNode = cluster.getRegionServer(1).getServerName().toString(); + + assertTrue(zk.lockOtherRS(zNode)); + assertTrue(zk.checkLockExists(zNode)); + Thread.sleep(10000); + assertTrue(zk.checkLockExists(zNode)); + cluster.abortRegionServer(0); + Thread.sleep(10000); + HRegionServer rs1 = cluster.getRegionServer(1); + zk = new ReplicationQueuesZKImpl(rs1.getZooKeeper(), conf1, rs1); + zk.init(rs1.getServerName().toString()); + assertFalse(zk.checkLockExists(zNode)); + + utility1.shutdownMiniCluster(); + } + private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, final byte[] row) throws IOException { final Admin admin = utility.getHBaseAdmin();