HBASE-14498 Master stuck in infinite loop when all Zookeeper servers are unreachable

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Pankaj Kumar 2017-08-17 17:06:50 +08:00 committed by Michael Stack
parent c606a565c1
commit 12a7d2bace
3 changed files with 131 additions and 3 deletions

View File

@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@ -76,7 +80,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
private final RecoverableZooKeeper recoverableZooKeeper;
// abortable in case of zk failure
protected Abortable abortable;
protected final Abortable abortable;
// Used if abortable is null
private boolean aborted = false;
@ -89,6 +93,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// negotiation to complete
public CountDownLatch saslLatch = new CountDownLatch(1);
// Connection timeout on disconnect event
private long connWaitTimeOut;
private AtomicBoolean connected = new AtomicBoolean(false);
private boolean forceAbortOnZKDisconnect;
// Execute service for zookeeper disconnect event watcher
private ExecutorService zkEventWatcherExecService = null;
private final Configuration conf;
@ -122,6 +133,24 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public ZooKeeperWatcher(Configuration conf, String identifier,
Abortable abortable, boolean canCreateBaseZNode)
throws IOException, ZooKeeperConnectionException {
this(conf, identifier, abortable, canCreateBaseZNode, false);
}
/**
* Instantiate a ZooKeeper connection and watcher.
* @param conf Configuration
* @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
* this instance. Use null for default.
* @param abortable Can be null if there is on error there is no host to abort: e.g. client
* context.
* @param canCreateBaseZNode whether create base node.
* @param forceAbortOnZKDisconnect abort the watcher if true.
* @throws IOException when any IO exception
* @throws ZooKeeperConnectionException when any zookeeper connection exception
*/
public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable,
boolean canCreateBaseZNode, boolean forceAbortOnZKDisconnect) throws IOException,
ZooKeeperConnectionException {
this.conf = conf;
this.quorum = ZKConfig.getZKQuorumServersString(conf);
this.prefix = identifier;
@ -130,6 +159,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
this.identifier = identifier + "0x0";
this.abortable = abortable;
this.znodePaths = new ZNodePaths(conf);
// On Disconnected event a thread will wait for sometime (2/3 of zookeeper.session.timeout),
// it will abort the process if no SyncConnected event reported by the time.
connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 2 / 3;
PendingWatcher pendingWatcher = new PendingWatcher();
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
pendingWatcher.prepare(this);
@ -146,6 +178,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
throw zce;
}
}
this.forceAbortOnZKDisconnect = forceAbortOnZKDisconnect;
if (this.forceAbortOnZKDisconnect) {
this.zkEventWatcherExecService = Executors.newSingleThreadExecutor();
}
}
private void createBaseZNodes() throws ZooKeeperConnectionException {
@ -526,11 +562,19 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
Long.toHexString(this.recoverableZooKeeper.getSessionId());
// Update our identifier. Otherwise ignore.
LOG.debug(this.identifier + " connected");
connected.set(true);
break;
// Abort the server if Disconnected or Expired
case Disconnected:
LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
LOG.debug(prefix("Received Disconnected from ZooKeeper."));
if (forceAbortOnZKDisconnect) {
connected.set(false);
ZKDisconnectEventWatcher task = new ZKDisconnectEventWatcher();
zkEventWatcherExecService.execute(task);
} else {
LOG.debug("Received Disconnected from ZooKeeper, ignoring.");
}
break;
case Expired:
@ -553,6 +597,39 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
}
}
/*
* Task to watch zookeper disconnect event.
*/
class ZKDisconnectEventWatcher implements Runnable {
@Override
public void run() {
if (connected.get()) {
return;
}
long startTime = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - startTime < connWaitTimeOut) {
if (connected.get()) {
LOG.debug("Client got reconnected to zookeeper.");
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
if (!connected.get() && abortable != null) {
String msg =
prefix("Couldn't connect to ZooKeeper after waiting " + connWaitTimeOut
+ " ms, aborting");
abortable.abort(msg, new KeeperException.ConnectionLossException());
}
}
}
/**
* Forces a synchronization of this ZooKeeper client connection.
* <p>
@ -617,6 +694,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public void close() {
try {
recoverableZooKeeper.close();
if (zkEventWatcherExecService != null) {
zkEventWatcherExecService.shutdown();
zkEventWatcherExecService = null;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

View File

@ -23,14 +23,22 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class })
public class TestZooKeeperWatcher {
private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class);
@Test
public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
@ -57,4 +65,43 @@ public class TestZooKeeperWatcher {
watcher.close();
}
@Test
public void testConnectionEvent() throws ZooKeeperConnectionException, IOException {
long zkSessionTimeout = 15000l;
Configuration conf = HBaseConfiguration.create();
conf.set("zookeeper.session.timeout", "15000");
Abortable abortable = new Abortable() {
boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
aborted = true;
LOG.error(why, e);
}
@Override
public boolean isAborted() {
return aborted;
}
};
ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "testConnectionEvent", abortable, false, true);
WatchedEvent event =
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
long startTime = EnvironmentEdgeManager.currentTime();
while (!abortable.isAborted()
&& (EnvironmentEdgeManager.currentTime() - startTime < zkSessionTimeout)) {
watcher.process(event);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
assertTrue(abortable.isAborted());
watcher.close();
}
}

View File

@ -642,7 +642,7 @@ public class HRegionServer extends HasThread implements
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
// Open connection to zookeeper and set primary watcher
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
rpcServices.isa.getPort(), this, canCreateBaseZNode());
rpcServices.isa.getPort(), this, canCreateBaseZNode(), true);
this.csm = (BaseCoordinatedStateManager) csm;
this.csm.initialize(this);