HBASE-18645 Loads of tests timing out.... Revert "HBASE-14498 Master stuck in infinite loop when all Zookeeper servers are unreachable"
This reverts commit 1ab6882f62
.
This commit is contained in:
parent
23ddf69c00
commit
05f07f692b
|
@ -26,9 +26,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -42,7 +39,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.security.Superusers;
|
import org.apache.hadoop.hbase.security.Superusers;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
@ -80,7 +76,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
private final RecoverableZooKeeper recoverableZooKeeper;
|
private final RecoverableZooKeeper recoverableZooKeeper;
|
||||||
|
|
||||||
// abortable in case of zk failure
|
// abortable in case of zk failure
|
||||||
protected final Abortable abortable;
|
protected Abortable abortable;
|
||||||
// Used if abortable is null
|
// Used if abortable is null
|
||||||
private boolean aborted = false;
|
private boolean aborted = false;
|
||||||
|
|
||||||
|
@ -93,13 +89,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
// negotiation to complete
|
// negotiation to complete
|
||||||
public CountDownLatch saslLatch = new CountDownLatch(1);
|
public CountDownLatch saslLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
// Connection timeout on disconnect event
|
|
||||||
private long connWaitTimeOut;
|
|
||||||
private AtomicBoolean connected = new AtomicBoolean(false);
|
|
||||||
private boolean forceAbortOnZKDisconnect;
|
|
||||||
|
|
||||||
// Execute service for zookeeper disconnect event watcher
|
|
||||||
private ExecutorService zkEventWatcherExecService = null;
|
|
||||||
|
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
@ -133,24 +122,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
public ZooKeeperWatcher(Configuration conf, String identifier,
|
public ZooKeeperWatcher(Configuration conf, String identifier,
|
||||||
Abortable abortable, boolean canCreateBaseZNode)
|
Abortable abortable, boolean canCreateBaseZNode)
|
||||||
throws IOException, ZooKeeperConnectionException {
|
throws IOException, ZooKeeperConnectionException {
|
||||||
this(conf, identifier, abortable, canCreateBaseZNode, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Instantiate a ZooKeeper connection and watcher.
|
|
||||||
* @param conf Configuration
|
|
||||||
* @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
|
|
||||||
* this instance. Use null for default.
|
|
||||||
* @param abortable Can be null if there is on error there is no host to abort: e.g. client
|
|
||||||
* context.
|
|
||||||
* @param canCreateBaseZNode whether create base node.
|
|
||||||
* @param forceAbortOnZKDisconnect abort the watcher if true.
|
|
||||||
* @throws IOException when any IO exception
|
|
||||||
* @throws ZooKeeperConnectionException when any zookeeper connection exception
|
|
||||||
*/
|
|
||||||
public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable,
|
|
||||||
boolean canCreateBaseZNode, boolean forceAbortOnZKDisconnect) throws IOException,
|
|
||||||
ZooKeeperConnectionException {
|
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.quorum = ZKConfig.getZKQuorumServersString(conf);
|
this.quorum = ZKConfig.getZKQuorumServersString(conf);
|
||||||
this.prefix = identifier;
|
this.prefix = identifier;
|
||||||
|
@ -159,9 +130,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
this.identifier = identifier + "0x0";
|
this.identifier = identifier + "0x0";
|
||||||
this.abortable = abortable;
|
this.abortable = abortable;
|
||||||
this.znodePaths = new ZNodePaths(conf);
|
this.znodePaths = new ZNodePaths(conf);
|
||||||
// On Disconnected event a thread will wait for sometime (2/3 of zookeeper.session.timeout),
|
|
||||||
// it will abort the process if no SyncConnected event reported by the time.
|
|
||||||
connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 2 / 3;
|
|
||||||
PendingWatcher pendingWatcher = new PendingWatcher();
|
PendingWatcher pendingWatcher = new PendingWatcher();
|
||||||
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
|
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
|
||||||
pendingWatcher.prepare(this);
|
pendingWatcher.prepare(this);
|
||||||
|
@ -178,10 +146,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
throw zce;
|
throw zce;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.forceAbortOnZKDisconnect = forceAbortOnZKDisconnect;
|
|
||||||
if (this.forceAbortOnZKDisconnect) {
|
|
||||||
this.zkEventWatcherExecService = Executors.newSingleThreadExecutor();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createBaseZNodes() throws ZooKeeperConnectionException {
|
private void createBaseZNodes() throws ZooKeeperConnectionException {
|
||||||
|
@ -562,19 +526,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
Long.toHexString(this.recoverableZooKeeper.getSessionId());
|
Long.toHexString(this.recoverableZooKeeper.getSessionId());
|
||||||
// Update our identifier. Otherwise ignore.
|
// Update our identifier. Otherwise ignore.
|
||||||
LOG.debug(this.identifier + " connected");
|
LOG.debug(this.identifier + " connected");
|
||||||
connected.set(true);
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// Abort the server if Disconnected or Expired
|
// Abort the server if Disconnected or Expired
|
||||||
case Disconnected:
|
case Disconnected:
|
||||||
LOG.debug(prefix("Received Disconnected from ZooKeeper."));
|
LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
|
||||||
if (forceAbortOnZKDisconnect) {
|
|
||||||
connected.set(false);
|
|
||||||
ZKDisconnectEventWatcher task = new ZKDisconnectEventWatcher();
|
|
||||||
zkEventWatcherExecService.execute(task);
|
|
||||||
} else {
|
|
||||||
LOG.debug("Received Disconnected from ZooKeeper, ignoring.");
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Expired:
|
case Expired:
|
||||||
|
@ -597,39 +553,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Task to watch zookeper disconnect event.
|
|
||||||
*/
|
|
||||||
class ZKDisconnectEventWatcher implements Runnable {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
if (connected.get()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
while (EnvironmentEdgeManager.currentTime() - startTime < connWaitTimeOut) {
|
|
||||||
if (connected.get()) {
|
|
||||||
LOG.debug("Client got reconnected to zookeeper.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!connected.get() && abortable != null) {
|
|
||||||
String msg =
|
|
||||||
prefix("Couldn't connect to ZooKeeper after waiting " + connWaitTimeOut
|
|
||||||
+ " ms, aborting");
|
|
||||||
abortable.abort(msg, new KeeperException.ConnectionLossException());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Forces a synchronization of this ZooKeeper client connection.
|
* Forces a synchronization of this ZooKeeper client connection.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -694,10 +617,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
recoverableZooKeeper.close();
|
recoverableZooKeeper.close();
|
||||||
if (zkEventWatcherExecService != null) {
|
|
||||||
zkEventWatcherExecService.shutdown();
|
|
||||||
zkEventWatcherExecService = null;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,22 +23,14 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
|
||||||
import org.apache.zookeeper.Watcher;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({ SmallTests.class })
|
@Category({ SmallTests.class })
|
||||||
public class TestZooKeeperWatcher {
|
public class TestZooKeeperWatcher {
|
||||||
private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class);
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
|
public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
|
||||||
|
@ -65,43 +57,4 @@ public class TestZooKeeperWatcher {
|
||||||
watcher.close();
|
watcher.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testConnectionEvent() throws ZooKeeperConnectionException, IOException {
|
|
||||||
long zkSessionTimeout = 15000l;
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
conf.set("zookeeper.session.timeout", "15000");
|
|
||||||
|
|
||||||
Abortable abortable = new Abortable() {
|
|
||||||
boolean aborted = false;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void abort(String why, Throwable e) {
|
|
||||||
aborted = true;
|
|
||||||
LOG.error(why, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAborted() {
|
|
||||||
return aborted;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "testConnectionEvent", abortable, false, true);
|
|
||||||
|
|
||||||
WatchedEvent event =
|
|
||||||
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
|
|
||||||
|
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
while (!abortable.isAborted()
|
|
||||||
&& (EnvironmentEdgeManager.currentTime() - startTime < zkSessionTimeout)) {
|
|
||||||
watcher.process(event);
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(abortable.isAborted());
|
|
||||||
watcher.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -642,7 +642,7 @@ public class HRegionServer extends HasThread implements
|
||||||
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
|
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
|
||||||
// Open connection to zookeeper and set primary watcher
|
// Open connection to zookeeper and set primary watcher
|
||||||
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
|
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
|
||||||
rpcServices.isa.getPort(), this, canCreateBaseZNode(), true);
|
rpcServices.isa.getPort(), this, canCreateBaseZNode());
|
||||||
|
|
||||||
this.csm = (BaseCoordinatedStateManager) csm;
|
this.csm = (BaseCoordinatedStateManager) csm;
|
||||||
this.csm.initialize(this);
|
this.csm.initialize(this);
|
||||||
|
|
Loading…
Reference in New Issue