HBASE-5549 HBASE-5572 Master can fail if ZooKeeper session expires (N Keywal)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1301775 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0ad4e32993
commit
1d24d71821
|
@ -130,91 +130,90 @@ class ActiveMasterManager extends ZooKeeperListener {
|
||||||
* set on this Master)
|
* set on this Master)
|
||||||
*/
|
*/
|
||||||
boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus,
|
boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus,
|
||||||
ClusterStatusTracker clusterStatusTracker) {
|
ClusterStatusTracker clusterStatusTracker) {
|
||||||
startupStatus.setStatus("Trying to register in ZK as active master");
|
while (true) {
|
||||||
boolean cleanSetOfActiveMaster = true;
|
startupStatus.setStatus("Trying to register in ZK as active master");
|
||||||
// Try to become the active master, watch if there is another master.
|
// Try to become the active master, watch if there is another master.
|
||||||
// Write out our ServerName as versioned bytes.
|
// Write out our ServerName as versioned bytes.
|
||||||
try {
|
try {
|
||||||
String backupZNode = ZKUtil.joinZNode(
|
String backupZNode = ZKUtil.joinZNode(
|
||||||
this.watcher.backupMasterAddressesZNode, this.sn.toString());
|
this.watcher.backupMasterAddressesZNode, this.sn.toString());
|
||||||
if (ZKUtil.createEphemeralNodeAndWatch(this.watcher,
|
if (ZKUtil.createEphemeralNodeAndWatch(this.watcher,
|
||||||
this.watcher.masterAddressZNode, this.sn.getVersionedBytes())) {
|
this.watcher.masterAddressZNode, this.sn.getVersionedBytes())) {
|
||||||
// If we were a backup master before, delete our ZNode from the backup
|
// If we were a backup master before, delete our ZNode from the backup
|
||||||
// master directory since we are the active now
|
// master directory since we are the active now
|
||||||
LOG.info("Deleting ZNode for " + backupZNode +
|
LOG.info("Deleting ZNode for " + backupZNode +
|
||||||
" from backup master directory");
|
" from backup master directory");
|
||||||
ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
|
ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
|
||||||
|
|
||||||
// We are the master, return
|
// We are the master, return
|
||||||
startupStatus.setStatus("Successfully registered as active master.");
|
startupStatus.setStatus("Successfully registered as active master.");
|
||||||
|
this.clusterHasActiveMaster.set(true);
|
||||||
|
LOG.info("Master=" + this.sn);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// There is another active master running elsewhere or this is a restart
|
||||||
|
// and the master ephemeral node has not expired yet.
|
||||||
this.clusterHasActiveMaster.set(true);
|
this.clusterHasActiveMaster.set(true);
|
||||||
LOG.info("Master=" + this.sn);
|
|
||||||
return cleanSetOfActiveMaster;
|
|
||||||
}
|
|
||||||
cleanSetOfActiveMaster = false;
|
|
||||||
|
|
||||||
// There is another active master running elsewhere or this is a restart
|
/*
|
||||||
// and the master ephemeral node has not expired yet.
|
* Add a ZNode for ourselves in the backup master directory since we are
|
||||||
this.clusterHasActiveMaster.set(true);
|
* not the active master.
|
||||||
|
*
|
||||||
/*
|
* If we become the active master later, ActiveMasterManager will delete
|
||||||
* Add a ZNode for ourselves in the backup master directory since we are
|
* this node explicitly. If we crash before then, ZooKeeper will delete
|
||||||
* not the active master.
|
* this node for us since it is ephemeral.
|
||||||
*
|
*/
|
||||||
* If we become the active master later, ActiveMasterManager will delete
|
LOG.info("Adding ZNode for " + backupZNode +
|
||||||
* this node explicitly. If we crash before then, ZooKeeper will delete
|
" in backup master directory");
|
||||||
* this node for us since it is ephemeral.
|
ZKUtil.createEphemeralNodeAndWatch(this.watcher, backupZNode,
|
||||||
*/
|
|
||||||
LOG.info("Adding ZNode for " + backupZNode +
|
|
||||||
" in backup master directory");
|
|
||||||
ZKUtil.createEphemeralNodeAndWatch(this.watcher, backupZNode,
|
|
||||||
HConstants.EMPTY_BYTE_ARRAY);
|
HConstants.EMPTY_BYTE_ARRAY);
|
||||||
|
|
||||||
String msg;
|
String msg;
|
||||||
byte [] bytes =
|
byte[] bytes =
|
||||||
ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode);
|
ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode);
|
||||||
if (bytes == null) {
|
if (bytes == null) {
|
||||||
msg = ("A master was detected, but went down before its address " +
|
msg = ("A master was detected, but went down before its address " +
|
||||||
"could be read. Attempting to become the next active master");
|
"could be read. Attempting to become the next active master");
|
||||||
} else {
|
|
||||||
ServerName currentMaster = ServerName.parseVersionedServerName(bytes);
|
|
||||||
if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
|
|
||||||
msg = ("Current master has this master's address, " +
|
|
||||||
currentMaster + "; master was restarted? Waiting on znode " +
|
|
||||||
"to expire...");
|
|
||||||
// Hurry along the expiration of the znode.
|
|
||||||
ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
|
|
||||||
} else {
|
} else {
|
||||||
msg = "Another master is the active master, " + currentMaster +
|
ServerName currentMaster = ServerName.parseVersionedServerName(bytes);
|
||||||
"; waiting to become the next active master";
|
if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
|
||||||
|
msg = ("Current master has this master's address, " +
|
||||||
|
currentMaster + "; master was restarted? Deleting node.");
|
||||||
|
// Hurry along the expiration of the znode.
|
||||||
|
ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
|
||||||
|
} else {
|
||||||
|
msg = "Another master is the active master, " + currentMaster +
|
||||||
|
"; waiting to become the next active master";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info(msg);
|
||||||
|
startupStatus.setStatus(msg);
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
master.abort("Received an unexpected KeeperException, aborting", ke);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
LOG.info(msg);
|
synchronized (this.clusterHasActiveMaster) {
|
||||||
startupStatus.setStatus(msg);
|
while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
|
||||||
} catch (KeeperException ke) {
|
try {
|
||||||
master.abort("Received an unexpected KeeperException, aborting", ke);
|
this.clusterHasActiveMaster.wait();
|
||||||
return false;
|
} catch (InterruptedException e) {
|
||||||
}
|
// We expect to be interrupted when a master dies,
|
||||||
synchronized (this.clusterHasActiveMaster) {
|
// will fall out if so
|
||||||
while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
|
LOG.debug("Interrupted waiting for master to die", e);
|
||||||
try {
|
}
|
||||||
this.clusterHasActiveMaster.wait();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// We expect to be interrupted when a master dies, will fall out if so
|
|
||||||
LOG.debug("Interrupted waiting for master to die", e);
|
|
||||||
}
|
}
|
||||||
|
if (!clusterStatusTracker.isClusterUp()) {
|
||||||
|
this.master.stop(
|
||||||
|
"Cluster went down before this master became active");
|
||||||
|
}
|
||||||
|
if (this.master.isStopped()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// there is no active master so we can try to become active master again
|
||||||
}
|
}
|
||||||
if (!clusterStatusTracker.isClusterUp()) {
|
|
||||||
this.master.stop("Cluster went down before this master became active");
|
|
||||||
}
|
|
||||||
if (this.master.isStopped()) {
|
|
||||||
return cleanSetOfActiveMaster;
|
|
||||||
}
|
|
||||||
// Try to become active master again now that there is no active master
|
|
||||||
blockUntilBecomingActiveMaster(startupStatus,clusterStatusTracker);
|
|
||||||
}
|
}
|
||||||
return cleanSetOfActiveMaster;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1527,8 +1527,7 @@ Server {
|
||||||
private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
|
private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
|
||||||
IOException, KeeperException, ExecutionException {
|
IOException, KeeperException, ExecutionException {
|
||||||
|
|
||||||
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":"
|
this.zooKeeper.reconnectAfterExpiration();
|
||||||
+ this.serverName.getPort(), this, true);
|
|
||||||
|
|
||||||
Callable<Boolean> callable = new Callable<Boolean> () {
|
Callable<Boolean> callable = new Callable<Boolean> () {
|
||||||
public Boolean call() throws InterruptedException,
|
public Boolean call() throws InterruptedException,
|
||||||
|
|
|
@ -73,25 +73,41 @@ public class RecoverableZooKeeper {
|
||||||
// An identifier of this process in the cluster
|
// An identifier of this process in the cluster
|
||||||
private final String identifier;
|
private final String identifier;
|
||||||
private final byte[] id;
|
private final byte[] id;
|
||||||
private int retryIntervalMillis;
|
private Watcher watcher;
|
||||||
|
private int sessionTimeout;
|
||||||
|
private String quorumServers;
|
||||||
|
|
||||||
private static final int ID_OFFSET = Bytes.SIZEOF_INT;
|
private static final int ID_OFFSET = Bytes.SIZEOF_INT;
|
||||||
// the magic number is to be backward compatible
|
// the magic number is to be backward compatible
|
||||||
private static final byte MAGIC =(byte) 0XFF;
|
private static final byte MAGIC =(byte) 0XFF;
|
||||||
private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE;
|
private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE;
|
||||||
|
|
||||||
public RecoverableZooKeeper(String quorumServers, int seesionTimeout,
|
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
|
||||||
Watcher watcher, int maxRetries, int retryIntervalMillis)
|
Watcher watcher, int maxRetries, int retryIntervalMillis)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher);
|
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||||
this.retryCounterFactory =
|
this.retryCounterFactory =
|
||||||
new RetryCounterFactory(maxRetries, retryIntervalMillis);
|
new RetryCounterFactory(maxRetries, retryIntervalMillis);
|
||||||
this.retryIntervalMillis = retryIntervalMillis;
|
|
||||||
|
|
||||||
// the identifier = processID@hostName
|
// the identifier = processID@hostName
|
||||||
this.identifier = ManagementFactory.getRuntimeMXBean().getName();
|
this.identifier = ManagementFactory.getRuntimeMXBean().getName();
|
||||||
LOG.info("The identifier of this process is " + identifier);
|
LOG.info("The identifier of this process is " + identifier);
|
||||||
this.id = Bytes.toBytes(identifier);
|
this.id = Bytes.toBytes(identifier);
|
||||||
|
|
||||||
|
this.watcher = watcher;
|
||||||
|
this.sessionTimeout = sessionTimeout;
|
||||||
|
this.quorumServers = quorumServers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reconnectAfterExpiration()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
LOG.info("Closing dead ZooKeeper connection, session" +
|
||||||
|
" was: 0x"+Long.toHexString(zk.getSessionId()));
|
||||||
|
zk.close();
|
||||||
|
this.zk = new ZooKeeper(this.quorumServers,
|
||||||
|
this.sessionTimeout, this.watcher);
|
||||||
|
LOG.info("Recreated a ZooKeeper, session" +
|
||||||
|
" is: 0x"+Long.toHexString(zk.getSessionId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -123,6 +139,7 @@ public class RecoverableZooKeeper {
|
||||||
throw e;
|
throw e;
|
||||||
|
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -159,6 +176,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -194,6 +212,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -229,6 +248,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -264,6 +284,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -301,6 +322,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -338,6 +360,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -377,6 +400,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -484,6 +508,7 @@ public class RecoverableZooKeeper {
|
||||||
throw e;
|
throw e;
|
||||||
|
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
@ -523,6 +548,7 @@ public class RecoverableZooKeeper {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
case CONNECTIONLOSS:
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
case OPERATIONTIMEOUT:
|
case OPERATIONTIMEOUT:
|
||||||
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
if (!retryCounter.shouldRetry()) {
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
|
|
@ -253,6 +253,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
return recoverableZooKeeper;
|
return recoverableZooKeeper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void reconnectAfterExpiration() throws IOException, InterruptedException {
|
||||||
|
recoverableZooKeeper.reconnectAfterExpiration();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the quorum address of this instance.
|
* Get the quorum address of this instance.
|
||||||
* @return quorum string of this zookeeper connection instance
|
* @return quorum string of this zookeeper connection instance
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.Map;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
@ -86,6 +88,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1308,7 +1311,7 @@ public class HBaseTestingUtility {
|
||||||
*/
|
*/
|
||||||
public void expireMasterSession() throws Exception {
|
public void expireMasterSession() throws Exception {
|
||||||
HMaster master = hbaseCluster.getMaster();
|
HMaster master = hbaseCluster.getMaster();
|
||||||
expireSession(master.getZooKeeper(), master);
|
expireSession(master.getZooKeeper(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1318,7 +1321,7 @@ public class HBaseTestingUtility {
|
||||||
*/
|
*/
|
||||||
public void expireRegionServerSession(int index) throws Exception {
|
public void expireRegionServerSession(int index) throws Exception {
|
||||||
HRegionServer rs = hbaseCluster.getRegionServer(index);
|
HRegionServer rs = hbaseCluster.getRegionServer(index);
|
||||||
expireSession(rs.getZooKeeper(), rs);
|
expireSession(rs.getZooKeeper(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1334,8 +1337,15 @@ public class HBaseTestingUtility {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Expire a ZooKeeer session as recommended in ZooKeeper documentation
|
* Expire a ZooKeeper session as recommended in ZooKeeper documentation
|
||||||
* http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
|
* http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
|
||||||
|
* There are issues when doing this:
|
||||||
|
* [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
|
||||||
|
* [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
|
||||||
|
*
|
||||||
|
* @param nodeZK - the ZK watcher to expire
|
||||||
|
* @param checkStatus - true to check if we can create an HTable with the
|
||||||
|
* current configuration.
|
||||||
*/
|
*/
|
||||||
public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
|
public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -1345,14 +1355,29 @@ public class HBaseTestingUtility {
|
||||||
byte[] password = zk.getSessionPasswd();
|
byte[] password = zk.getSessionPasswd();
|
||||||
long sessionID = zk.getSessionId();
|
long sessionID = zk.getSessionId();
|
||||||
|
|
||||||
|
// Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
|
||||||
|
// so we create a first watcher to be sure that the
|
||||||
|
// event was sent. We expect that if our watcher receives the event
|
||||||
|
// other watchers on the same machine will get is as well.
|
||||||
|
// When we ask to close the connection, ZK does not close it before
|
||||||
|
// we receive all the events, so don't have to capture the event, just
|
||||||
|
// closing the connection should be enough.
|
||||||
|
ZooKeeper monitor = new ZooKeeper(quorumServers,
|
||||||
|
1000, new org.apache.zookeeper.Watcher(){
|
||||||
|
@Override
|
||||||
|
public void process(WatchedEvent watchedEvent) {
|
||||||
|
LOG.info("Monitor ZKW received event="+watchedEvent);
|
||||||
|
}
|
||||||
|
} , sessionID, password);
|
||||||
|
|
||||||
|
// Making it expire
|
||||||
ZooKeeper newZK = new ZooKeeper(quorumServers,
|
ZooKeeper newZK = new ZooKeeper(quorumServers,
|
||||||
1000, EmptyWatcher.instance, sessionID, password);
|
1000, EmptyWatcher.instance, sessionID, password);
|
||||||
newZK.close();
|
newZK.close();
|
||||||
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
|
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
|
||||||
|
|
||||||
// There is actually no reason to sleep here. Session is expired.
|
// Now closing & waiting to be sure that the clients get it.
|
||||||
// May be for old ZK versions?
|
monitor.close();
|
||||||
// Thread.sleep(sleep);
|
|
||||||
|
|
||||||
if (checkStatus) {
|
if (checkStatus) {
|
||||||
new HTable(new Configuration(conf), HConstants.META_TABLE_NAME).close();
|
new HTable(new Configuration(conf), HConstants.META_TABLE_NAME).close();
|
||||||
|
@ -1508,7 +1533,7 @@ public class HBaseTestingUtility {
|
||||||
* Make sure that at least the specified number of region servers
|
* Make sure that at least the specified number of region servers
|
||||||
* are running
|
* are running
|
||||||
* @param num minimum number of region servers that should be running
|
* @param num minimum number of region servers that should be running
|
||||||
* @return True if we started some servers
|
* @return true if we started some servers
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean ensureSomeRegionServersAvailable(final int num)
|
public boolean ensureSomeRegionServersAvailable(final int num)
|
||||||
|
@ -1524,6 +1549,31 @@ public class HBaseTestingUtility {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure that at least the specified number of region servers
|
||||||
|
* are running. We don't count the ones that are currently stopping or are
|
||||||
|
* stopped.
|
||||||
|
* @param num minimum number of region servers that should be running
|
||||||
|
* @return true if we started some servers
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
|
||||||
|
throws IOException {
|
||||||
|
boolean startedServer = ensureSomeRegionServersAvailable(num);
|
||||||
|
|
||||||
|
for (JVMClusterUtil.RegionServerThread rst :
|
||||||
|
hbaseCluster.getRegionServerThreads()) {
|
||||||
|
|
||||||
|
HRegionServer hrs = rst.getRegionServer();
|
||||||
|
if (hrs.isStopping() || hrs.isStopped()) {
|
||||||
|
LOG.info("A region server is stopped or stopping:"+hrs);
|
||||||
|
LOG.info("Started new server=" + hbaseCluster.startRegionServer());
|
||||||
|
startedServer = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return startedServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
@ -93,40 +94,75 @@ public class TestZooKeeper {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testClientSessionExpired() throws Exception {
|
public void testClientSessionExpired() throws Exception {
|
||||||
LOG.info("testClientSessionExpired");
|
|
||||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
new HTable(c, HConstants.META_TABLE_NAME).close();
|
|
||||||
|
// We don't want to share the connection as we will check
|
||||||
|
// its state
|
||||||
|
c.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "1111");
|
||||||
|
|
||||||
HConnection connection = HConnectionManager.getConnection(c);
|
HConnection connection = HConnectionManager.getConnection(c);
|
||||||
|
|
||||||
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
|
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
|
||||||
|
LOG.info("ZooKeeperWatcher= 0x"+ Integer.toHexString(
|
||||||
|
connectionZK.hashCode()));
|
||||||
|
LOG.info("getRecoverableZooKeeper= 0x"+ Integer.toHexString(
|
||||||
|
connectionZK.getRecoverableZooKeeper().hashCode()));
|
||||||
|
LOG.info("session="+Long.toHexString(
|
||||||
|
connectionZK.getRecoverableZooKeeper().getSessionId()));
|
||||||
|
|
||||||
TEST_UTIL.expireSession(connectionZK);
|
TEST_UTIL.expireSession(connectionZK);
|
||||||
|
|
||||||
// Depending on how long you wait here, the state after dump will
|
LOG.info("Before using zkw state=" +
|
||||||
// be 'closed' or 'Connecting'.
|
|
||||||
// There should be no reason to wait, the connection is closed on the server
|
|
||||||
// Thread.sleep(sessionTimeout * 3L);
|
|
||||||
|
|
||||||
LOG.info("Before dump state=" +
|
|
||||||
connectionZK.getRecoverableZooKeeper().getState());
|
connectionZK.getRecoverableZooKeeper().getState());
|
||||||
// provoke session expiration by doing something with ZK
|
// provoke session expiration by doing something with ZK
|
||||||
ZKUtil.dump(connectionZK);
|
try {
|
||||||
|
connectionZK.getRecoverableZooKeeper().getZooKeeper().exists(
|
||||||
|
"/1/1", false);
|
||||||
|
} catch (KeeperException ignored) {
|
||||||
|
}
|
||||||
|
|
||||||
// Check that the old ZK connection is closed, means we did expire
|
// Check that the old ZK connection is closed, means we did expire
|
||||||
LOG.info("ZooKeeper should have timed out");
|
|
||||||
States state = connectionZK.getRecoverableZooKeeper().getState();
|
States state = connectionZK.getRecoverableZooKeeper().getState();
|
||||||
LOG.info("After dump state=" + state);
|
LOG.info("After using zkw state=" + state);
|
||||||
|
LOG.info("session="+Long.toHexString(
|
||||||
|
connectionZK.getRecoverableZooKeeper().getSessionId()));
|
||||||
|
|
||||||
|
// It's asynchronous, so we may have to wait a little...
|
||||||
|
final long limit1 = System.currentTimeMillis() + 3000;
|
||||||
|
while (System.currentTimeMillis() < limit1 && state != States.CLOSED){
|
||||||
|
state = connectionZK.getRecoverableZooKeeper().getState();
|
||||||
|
}
|
||||||
|
LOG.info("After using zkw loop=" + state);
|
||||||
|
LOG.info("ZooKeeper should have timed out");
|
||||||
|
LOG.info("session="+Long.toHexString(
|
||||||
|
connectionZK.getRecoverableZooKeeper().getSessionId()));
|
||||||
|
|
||||||
|
// It's surprising but sometimes we can still be in connected state.
|
||||||
|
// As it's known (even if not understood) we don't make the the test fail
|
||||||
|
// for this reason.
|
||||||
Assert.assertTrue(state == States.CLOSED);
|
Assert.assertTrue(state == States.CLOSED);
|
||||||
|
|
||||||
// Check that the client recovered
|
// Check that the client recovered
|
||||||
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
|
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
|
||||||
//Here, if you wait, you will have a CONNECTED state. If you don't,
|
|
||||||
// you may have the CONNECTING one.
|
|
||||||
//Thread.sleep(sessionTimeout * 3L);
|
|
||||||
States state2 = newConnectionZK.getRecoverableZooKeeper().getState();
|
States state2 = newConnectionZK.getRecoverableZooKeeper().getState();
|
||||||
LOG.info("After new get state=" +state2);
|
LOG.info("After new get state=" +state2);
|
||||||
|
|
||||||
|
// As it's an asynchronous event we may got the same ZKW, if it's not
|
||||||
|
// yet invalidated. Hence this loop.
|
||||||
|
final long limit2 = System.currentTimeMillis() + 3000;
|
||||||
|
while (System.currentTimeMillis() < limit2 &&
|
||||||
|
state2 != States.CONNECTED && state2 != States.CONNECTING) {
|
||||||
|
|
||||||
|
newConnectionZK = connection.getZooKeeperWatcher();
|
||||||
|
state2 = newConnectionZK.getRecoverableZooKeeper().getState();
|
||||||
|
}
|
||||||
|
LOG.info("After new get state loop=" + state2);
|
||||||
|
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
state2 == States.CONNECTED || state2 == States.CONNECTING);
|
state2 == States.CONNECTED || state2 == States.CONNECTING);
|
||||||
|
|
||||||
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -141,7 +177,21 @@ public class TestZooKeeper {
|
||||||
public void testMasterSessionExpired() throws Exception {
|
public void testMasterSessionExpired() throws Exception {
|
||||||
LOG.info("Starting testMasterSessionExpired");
|
LOG.info("Starting testMasterSessionExpired");
|
||||||
TEST_UTIL.expireMasterSession();
|
TEST_UTIL.expireMasterSession();
|
||||||
Thread.sleep(7000); // Helps the test to succeed!!!
|
testSanity();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Master recovery when the znode already exists. Internally, this
|
||||||
|
* test differs from {@link #testMasterSessionExpired} because here
|
||||||
|
* the master znode will exist in ZK.
|
||||||
|
*/
|
||||||
|
@Test(timeout=20000)
|
||||||
|
public void testMasterZKSessionRecoveryFailure() throws Exception {
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
HMaster m = cluster.getMaster();
|
||||||
|
m.abort("Test recovery from zk session expired",
|
||||||
|
new KeeperException.SessionExpiredException());
|
||||||
|
assertFalse(m.isStopped());
|
||||||
testSanity();
|
testSanity();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,8 @@ public class TestDistributedLogSplitting {
|
||||||
LOG.info("Starting cluster");
|
LOG.info("Starting cluster");
|
||||||
conf = HBaseConfiguration.create();
|
conf = HBaseConfiguration.create();
|
||||||
conf.getLong("hbase.splitlog.max.resubmit", 0);
|
conf.getLong("hbase.splitlog.max.resubmit", 0);
|
||||||
|
// Make the failure test faster
|
||||||
|
conf.setInt("zookeeper.recovery.retry", 0);
|
||||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
TEST_UTIL = new HBaseTestingUtility(conf);
|
||||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
|
TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
|
||||||
cluster = TEST_UTIL.getHBaseCluster();
|
cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
@ -245,7 +247,7 @@ public class TestDistributedLogSplitting {
|
||||||
slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
|
slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
|
||||||
//waitForCounter but for one of the 2 counters
|
//waitForCounter but for one of the 2 counters
|
||||||
long curt = System.currentTimeMillis();
|
long curt = System.currentTimeMillis();
|
||||||
long waitTime = 30000;
|
long waitTime = 80000;
|
||||||
long endt = curt + waitTime;
|
long endt = curt + waitTime;
|
||||||
while (curt < endt) {
|
while (curt < endt) {
|
||||||
if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
|
if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
|
||||||
|
|
|
@ -1,96 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.master;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test cases for master to recover from ZK session expiry.
|
|
||||||
*/
|
|
||||||
@Category(MediumTests.class)
|
|
||||||
public class TestMasterZKSessionRecovery {
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The default timeout is 5 minutes.
|
|
||||||
* Shorten it so that the test won't wait for too long.
|
|
||||||
*/
|
|
||||||
static {
|
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
|
||||||
conf.setLong("hbase.master.zksession.recover.timeout", 50000);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
// Start a cluster of one regionserver.
|
|
||||||
TEST_UTIL.startMiniCluster(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Negative test of master recovery from zk session expiry.
|
|
||||||
* <p>
|
|
||||||
* Starts with one master. Fakes the master zk session expired.
|
|
||||||
* Ensures the master cannot recover the expired zk session since
|
|
||||||
* the master zk node is still there.
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test(timeout=10000)
|
|
||||||
public void testMasterZKSessionRecoveryFailure() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
HMaster m = cluster.getMaster();
|
|
||||||
m.abort("Test recovery from zk session expired",
|
|
||||||
new KeeperException.SessionExpiredException());
|
|
||||||
assertTrue(m.isStopped());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Positive test of master recovery from zk session expiry.
|
|
||||||
* <p>
|
|
||||||
* Starts with one master. Closes the master zk session.
|
|
||||||
* Ensures the master can recover the expired zk session.
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test(timeout=60000)
|
|
||||||
public void testMasterZKSessionRecoverySuccess() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
HMaster m = cluster.getMaster();
|
|
||||||
m.getZooKeeperWatcher().close();
|
|
||||||
m.abort("Test recovery from zk session expired",
|
|
||||||
new KeeperException.SessionExpiredException());
|
|
||||||
assertFalse(m.isStopped());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNotSame;
|
import static org.junit.Assert.assertNotSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -78,7 +79,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before public void setup() throws IOException {
|
@Before public void setup() throws IOException {
|
||||||
TESTING_UTIL.ensureSomeRegionServersAvailable(NB_SERVERS);
|
TESTING_UTIL.ensureSomeNonStoppedRegionServersAvailable(NB_SERVERS);
|
||||||
this.admin = new HBaseAdmin(TESTING_UTIL.getConfiguration());
|
this.admin = new HBaseAdmin(TESTING_UTIL.getConfiguration());
|
||||||
this.cluster = TESTING_UTIL.getMiniHBaseCluster();
|
this.cluster = TESTING_UTIL.getMiniHBaseCluster();
|
||||||
}
|
}
|
||||||
|
@ -398,7 +399,10 @@ public class TestSplitTransactionOnCluster {
|
||||||
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
|
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
|
||||||
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
|
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
|
||||||
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
|
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
|
||||||
LOG.info("Moving " + hri.getRegionNameAsString() + " to " +
|
assertNotNull(hrs);
|
||||||
|
assertNotNull(hri);
|
||||||
|
LOG.
|
||||||
|
info("Moving " + hri.getRegionNameAsString() + " to " +
|
||||||
hrs.getServerName() + "; metaServerIndex=" + metaServerIndex);
|
hrs.getServerName() + "; metaServerIndex=" + metaServerIndex);
|
||||||
admin.move(hri.getEncodedNameAsBytes(),
|
admin.move(hri.getEncodedNameAsBytes(),
|
||||||
Bytes.toBytes(hrs.getServerName().toString()));
|
Bytes.toBytes(hrs.getServerName().toString()));
|
||||||
|
|
|
@ -93,6 +93,8 @@ public class TestReplication {
|
||||||
conf1.setLong("replication.source.sleepforretries", 100);
|
conf1.setLong("replication.source.sleepforretries", 100);
|
||||||
conf1.setInt("hbase.regionserver.maxlogs", 10);
|
conf1.setInt("hbase.regionserver.maxlogs", 10);
|
||||||
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
||||||
|
conf1.setInt("zookeeper.recovery.retry", 1);
|
||||||
|
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
|
||||||
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
|
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
|
||||||
conf1.setBoolean("dfs.support.append", true);
|
conf1.setBoolean("dfs.support.append", true);
|
||||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||||
|
@ -651,9 +653,11 @@ public class TestReplication {
|
||||||
|
|
||||||
int lastCount = 0;
|
int lastCount = 0;
|
||||||
|
|
||||||
|
final long start = System.currentTimeMillis();
|
||||||
for (int i = 0; i < NB_RETRIES; i++) {
|
for (int i = 0; i < NB_RETRIES; i++) {
|
||||||
if (i==NB_RETRIES-1) {
|
if (i==NB_RETRIES-1) {
|
||||||
fail("Waited too much time for queueFailover replication");
|
fail("Waited too much time for queueFailover replication. " +
|
||||||
|
"Waited "+(System.currentTimeMillis() - start)+"ms.");
|
||||||
}
|
}
|
||||||
Scan scan2 = new Scan();
|
Scan scan2 = new Scan();
|
||||||
ResultScanner scanner2 = htable2.getScanner(scan2);
|
ResultScanner scanner2 = htable2.getScanner(scan2);
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
||||||
import org.junit.*;
|
import org.junit.*;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -58,7 +59,9 @@ public class TestReplicationPeer {
|
||||||
try {
|
try {
|
||||||
LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session.");
|
LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session.");
|
||||||
// Trying to use the expired session to assert that it is indeed closed
|
// Trying to use the expired session to assert that it is indeed closed
|
||||||
zkw.getRecoverableZooKeeper().exists("/1/2", false);
|
zkw.getRecoverableZooKeeper().getZooKeeper().exists("/2/2", false);
|
||||||
|
Assert.fail(
|
||||||
|
"ReplicationPeer ZooKeeper session was not properly expired.");
|
||||||
} catch (SessionExpiredException k) {
|
} catch (SessionExpiredException k) {
|
||||||
rp.reloadZkWatcher();
|
rp.reloadZkWatcher();
|
||||||
|
|
||||||
|
@ -66,13 +69,12 @@ public class TestReplicationPeer {
|
||||||
|
|
||||||
// Try to use the connection again
|
// Try to use the connection again
|
||||||
LOG.info("Attempting to use refreshed "
|
LOG.info("Attempting to use refreshed "
|
||||||
+ "ReplicationPeer ZooKeeper session.");
|
+ "ReplicationPeer ZooKeeper session.");
|
||||||
zkw.getRecoverableZooKeeper().exists("/1/2", false);
|
zkw.getRecoverableZooKeeper().exists("/3/2", false);
|
||||||
|
|
||||||
return;
|
} catch (KeeperException.ConnectionLossException ignored) {
|
||||||
|
// We sometimes receive this exception. We just ignore it.
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.fail("ReplicationPeer ZooKeeper session was not properly expired.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue