HBASE-14498 Master stuck in infinite loop when all Zookeeper servers are unreachable (Pankaj Kumar)
This commit is contained in:
parent
c725356e4a
commit
c36b9489aa
|
@ -26,6 +26,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.security.Superusers;
|
import org.apache.hadoop.hbase.security.Superusers;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
@ -76,7 +78,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
private RecoverableZooKeeper recoverableZooKeeper;
|
private RecoverableZooKeeper recoverableZooKeeper;
|
||||||
|
|
||||||
// abortable in case of zk failure
|
// abortable in case of zk failure
|
||||||
protected Abortable abortable;
|
protected final Abortable abortable;
|
||||||
// Used if abortable is null
|
// Used if abortable is null
|
||||||
private boolean aborted = false;
|
private boolean aborted = false;
|
||||||
|
|
||||||
|
@ -88,6 +90,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
// negotiation to complete
|
// negotiation to complete
|
||||||
public CountDownLatch saslLatch = new CountDownLatch(1);
|
public CountDownLatch saslLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// Connection timeout on disconnect event
|
||||||
|
private long connWaitTimeOut;
|
||||||
|
private AtomicBoolean isConnected = new AtomicBoolean(false);
|
||||||
|
|
||||||
// node names
|
// node names
|
||||||
|
|
||||||
// base znode for this cluster
|
// base znode for this cluster
|
||||||
|
@ -177,6 +183,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
this.identifier = identifier + "0x0";
|
this.identifier = identifier + "0x0";
|
||||||
this.abortable = abortable;
|
this.abortable = abortable;
|
||||||
setNodeNames(conf);
|
setNodeNames(conf);
|
||||||
|
// On Disconnected event a thread will wait for sometime (2/3 of zookeeper.session.timeout),
|
||||||
|
// it will abort the process if no SyncConnected event reported by the time.
|
||||||
|
connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 2 / 3;
|
||||||
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
|
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
|
||||||
if (canCreateBaseZNode) {
|
if (canCreateBaseZNode) {
|
||||||
createBaseZNodes();
|
createBaseZNodes();
|
||||||
|
@ -611,6 +620,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
private void connectionEvent(WatchedEvent event) {
|
private void connectionEvent(WatchedEvent event) {
|
||||||
switch(event.getState()) {
|
switch(event.getState()) {
|
||||||
case SyncConnected:
|
case SyncConnected:
|
||||||
|
isConnected.set(true);
|
||||||
// Now, this callback can be invoked before the this.zookeeper is set.
|
// Now, this callback can be invoked before the this.zookeeper is set.
|
||||||
// Wait a little while.
|
// Wait a little while.
|
||||||
long finished = System.currentTimeMillis() +
|
long finished = System.currentTimeMillis() +
|
||||||
|
@ -640,7 +650,35 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
|
|
||||||
// Abort the server if Disconnected or Expired
|
// Abort the server if Disconnected or Expired
|
||||||
case Disconnected:
|
case Disconnected:
|
||||||
LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
|
LOG.debug("Received Disconnected from ZooKeeper.");
|
||||||
|
isConnected.set(false);
|
||||||
|
|
||||||
|
Thread t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
long startTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
while (EnvironmentEdgeManager.currentTime() - startTime < connWaitTimeOut) {
|
||||||
|
if (isConnected.get()) {
|
||||||
|
LOG.debug("Client got reconnected to zookeeper.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isConnected.get() && abortable != null) {
|
||||||
|
String msg =
|
||||||
|
prefix("Couldn't connect to ZooKeeper after waiting " + connWaitTimeOut
|
||||||
|
+ " ms, aborting");
|
||||||
|
abortable.abort(msg, new KeeperException.ConnectionLossException());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
t.setDaemon(true);
|
||||||
|
t.start();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Expired:
|
case Expired:
|
||||||
|
|
|
@ -22,14 +22,22 @@ import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({SmallTests.class})
|
@Category({SmallTests.class})
|
||||||
public class TestZooKeeperWatcher {
|
public class TestZooKeeperWatcher {
|
||||||
|
private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
|
public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
|
||||||
|
@ -57,4 +65,43 @@ public class TestZooKeeperWatcher {
|
||||||
watcher.close();
|
watcher.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionEvent() throws ZooKeeperConnectionException, IOException {
|
||||||
|
long zkSessionTimeout = 15000l;
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.set("zookeeper.session.timeout", "15000");
|
||||||
|
|
||||||
|
Abortable abortable = new Abortable() {
|
||||||
|
boolean aborted = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
aborted = true;
|
||||||
|
LOG.error(why, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return aborted;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "testConnectionEvent", abortable, false);
|
||||||
|
|
||||||
|
WatchedEvent event =
|
||||||
|
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
|
||||||
|
|
||||||
|
long startTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
while (!abortable.isAborted()
|
||||||
|
&& (EnvironmentEdgeManager.currentTime() - startTime < zkSessionTimeout)) {
|
||||||
|
watcher.process(event);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(abortable.isAborted());
|
||||||
|
watcher.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue