Revert "Replication queue's lock will live forever if RS acquiring the lock has died prematurely"
This reverts commit a8dfd8c436
.
This commit is contained in:
parent
eb43f0a5d0
commit
a0aa017ae9
|
@ -31,12 +31,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationFactory {
|
public class ReplicationFactory {
|
||||||
|
|
||||||
public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
|
|
||||||
|
|
||||||
public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
|
public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
|
Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
|
||||||
"replication.replicationQueues.class", defaultReplicationQueueClass);
|
"replication.replicationQueues.class", ReplicationQueuesZKImpl.class);
|
||||||
return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
|
return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -239,8 +238,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
* @param znode the server names of the other server
|
* @param znode the server names of the other server
|
||||||
* @return true if the lock was acquired, false in every other cases
|
* @return true if the lock was acquired, false in every other cases
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
private boolean lockOtherRS(String znode) {
|
||||||
public boolean lockOtherRS(String znode) {
|
|
||||||
try {
|
try {
|
||||||
String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
|
String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||||
if (parent.equals(this.myQueuesZnode)) {
|
if (parent.equals(this.myQueuesZnode)) {
|
||||||
|
@ -267,15 +265,6 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLockZNode(String znode) {
|
|
||||||
return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public boolean checkLockExists(String znode) throws KeeperException {
|
|
||||||
return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete all the replication queues for a given region server.
|
* Delete all the replication queues for a given region server.
|
||||||
* @param regionserverZnode The znode of the region server to delete.
|
* @param regionserverZnode The znode of the region server to delete.
|
||||||
|
|
|
@ -97,7 +97,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
||||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
|
|
||||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||||
|
@ -138,8 +137,6 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
|
||||||
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
|
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
@ -304,7 +301,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
private PeriodicDoMetrics periodicDoMetricsChore = null;
|
private PeriodicDoMetrics periodicDoMetricsChore = null;
|
||||||
|
|
||||||
CatalogJanitor catalogJanitorChore;
|
CatalogJanitor catalogJanitorChore;
|
||||||
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
|
|
||||||
private LogCleaner logCleaner;
|
private LogCleaner logCleaner;
|
||||||
private HFileCleaner hfileCleaner;
|
private HFileCleaner hfileCleaner;
|
||||||
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
||||||
|
@ -966,17 +962,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Started service threads");
|
LOG.trace("Started service threads");
|
||||||
}
|
}
|
||||||
if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
|
|
||||||
ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class && !conf
|
|
||||||
.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
|
|
||||||
try {
|
|
||||||
replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
|
|
||||||
cleanerInterval, this.getZooKeeper(), this.conf);
|
|
||||||
getChoreService().scheduleChore(replicationZKLockCleanerChore);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("start replicationZKLockCleanerChore failed", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1010,7 +995,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// Clean up and close up shop
|
// Clean up and close up shop
|
||||||
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
||||||
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
||||||
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
|
|
||||||
if (this.quotaManager != null) this.quotaManager.stop();
|
if (this.quotaManager != null) this.quotaManager.stop();
|
||||||
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
||||||
if (this.serverManager != null) this.serverManager.stop();
|
if (this.serverManager != null) this.serverManager.stop();
|
||||||
|
|
|
@ -1,112 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.ScheduledChore;
|
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A cleaner that cleans replication locks on zk which is locked by dead region servers
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class ReplicationZKLockCleanerChore extends ScheduledChore {
|
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class);
|
|
||||||
private ZooKeeperWatcher zk;
|
|
||||||
private ReplicationTracker tracker;
|
|
||||||
private long ttl;
|
|
||||||
private ReplicationQueuesZKImpl queues;
|
|
||||||
|
|
||||||
// Wait some times before delete lock to prevent a session expired RS not dead fully.
|
|
||||||
private static final long DEFAULT_TTL = 60 * 10 * 1000;//10 min
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl";
|
|
||||||
|
|
||||||
public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period,
|
|
||||||
ZooKeeperWatcher zk, Configuration conf) throws Exception {
|
|
||||||
super("ReplicationZKLockCleanerChore", stopper, period);
|
|
||||||
|
|
||||||
this.zk = zk;
|
|
||||||
this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL);
|
|
||||||
tracker = ReplicationFactory.getReplicationTracker(zk,
|
|
||||||
ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper);
|
|
||||||
queues = new ReplicationQueuesZKImpl(zk, conf, abortable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override protected void chore() {
|
|
||||||
try {
|
|
||||||
List<String> regionServers = tracker.getListOfRegionServers();
|
|
||||||
if (regionServers == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Set<String> rsSet = new HashSet<String>(regionServers);
|
|
||||||
List<String> replicators = queues.getListOfReplicators();
|
|
||||||
|
|
||||||
for (String replicator: replicators) {
|
|
||||||
try {
|
|
||||||
String lockNode = queues.getLockZNode(replicator);
|
|
||||||
byte[] data = ZKUtil.getData(zk, lockNode);
|
|
||||||
if (data == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String rsServerNameZnode = Bytes.toString(data);
|
|
||||||
String[] array = rsServerNameZnode.split("/");
|
|
||||||
String znode = array[array.length - 1];
|
|
||||||
if (!rsSet.contains(znode)) {
|
|
||||||
Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false);
|
|
||||||
if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime() > this.ttl) {
|
|
||||||
// server is dead, but lock is still there, we have to delete the lock.
|
|
||||||
ZKUtil.deleteNode(zk, lockNode);
|
|
||||||
LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode);
|
|
||||||
|
|
||||||
} catch (KeeperException.NoNodeException ignore) {
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("zk operation interrupted", e);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
LOG.warn("zk operation interrupted", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,14 +21,11 @@ package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -39,15 +36,12 @@ import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
@ -98,17 +92,10 @@ public class TestMultiSlaveReplication {
|
||||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||||
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
|
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
|
||||||
conf1.setBoolean(HConstants.ZOOKEEPER_USEMULTI , false);// for testZKLockCleaner
|
|
||||||
conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
|
|
||||||
conf1.setClass("hbase.region.replica.replication.replicationQueues.class",
|
|
||||||
ReplicationQueuesZKImpl.class, ReplicationQueues.class);
|
|
||||||
conf1.setLong(ReplicationZKLockCleanerChore.TTL_CONFIG_KEY, 0L);
|
|
||||||
|
|
||||||
|
|
||||||
utility1 = new HBaseTestingUtility(conf1);
|
utility1 = new HBaseTestingUtility(conf1);
|
||||||
utility1.startMiniZKCluster();
|
utility1.startMiniZKCluster();
|
||||||
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
|
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
|
||||||
utility1.setZkCluster(miniZK);
|
|
||||||
new ZooKeeperWatcher(conf1, "cluster1", null, true);
|
new ZooKeeperWatcher(conf1, "cluster1", null, true);
|
||||||
|
|
||||||
conf2 = new Configuration(conf1);
|
conf2 = new Configuration(conf1);
|
||||||
|
@ -211,40 +198,6 @@ public class TestMultiSlaveReplication {
|
||||||
utility1.shutdownMiniCluster();
|
utility1.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testZKLockCleaner() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = utility1.startMiniCluster(1, 2);
|
|
||||||
HBaseAdmin admin = utility1.getHBaseAdmin();
|
|
||||||
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("zk")));
|
|
||||||
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
|
||||||
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
|
||||||
table.addFamily(fam);
|
|
||||||
admin.createTable(table);
|
|
||||||
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
|
|
||||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
|
||||||
rpc.setClusterKey(utility2.getClusterKey());
|
|
||||||
replicationAdmin.addPeer("cluster2", rpc, null);
|
|
||||||
HRegionServer rs = cluster.getRegionServer(0);
|
|
||||||
ReplicationQueuesZKImpl zk = new ReplicationQueuesZKImpl(rs.getZooKeeper(), conf1, rs);
|
|
||||||
zk.init(rs.getServerName().toString());
|
|
||||||
List<String> replicators = zk.getListOfReplicators();
|
|
||||||
assertEquals(2, replicators.size());
|
|
||||||
String zNode = cluster.getRegionServer(1).getServerName().toString();
|
|
||||||
|
|
||||||
assertTrue(zk.lockOtherRS(zNode));
|
|
||||||
assertTrue(zk.checkLockExists(zNode));
|
|
||||||
Thread.sleep(10000);
|
|
||||||
assertTrue(zk.checkLockExists(zNode));
|
|
||||||
cluster.abortRegionServer(0);
|
|
||||||
Thread.sleep(10000);
|
|
||||||
HRegionServer rs1 = cluster.getRegionServer(1);
|
|
||||||
zk = new ReplicationQueuesZKImpl(rs1.getZooKeeper(), conf1, rs1);
|
|
||||||
zk.init(rs1.getServerName().toString());
|
|
||||||
assertFalse(zk.checkLockExists(zNode));
|
|
||||||
|
|
||||||
utility1.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
|
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
|
||||||
final byte[] row) throws IOException {
|
final byte[] row) throws IOException {
|
||||||
final Admin admin = utility.getHBaseAdmin();
|
final Admin admin = utility.getHBaseAdmin();
|
||||||
|
|
Loading…
Reference in New Issue