HBASE-14498 Revert for on-going review

This commit is contained in:
tedyu 2015-11-12 12:57:23 -08:00
parent 290ecbe829
commit 789f8a5a70
2 changed files with 3 additions and 88 deletions

View File

@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
@ -76,7 +74,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
private RecoverableZooKeeper recoverableZooKeeper; private RecoverableZooKeeper recoverableZooKeeper;
// abortable in case of zk failure // abortable in case of zk failure
protected final Abortable abortable; protected Abortable abortable;
// Used if abortable is null // Used if abortable is null
private boolean aborted = false; private boolean aborted = false;
@ -88,10 +86,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// negotiation to complete // negotiation to complete
public CountDownLatch saslLatch = new CountDownLatch(1); public CountDownLatch saslLatch = new CountDownLatch(1);
// Connection timeout on disconnect event
private long connWaitTimeOut;
private AtomicBoolean isConnected = new AtomicBoolean(false);
// node names // node names
// base znode for this cluster // base znode for this cluster
@ -173,9 +167,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
this.identifier = identifier + "0x0"; this.identifier = identifier + "0x0";
this.abortable = abortable; this.abortable = abortable;
setNodeNames(conf); setNodeNames(conf);
// On Disconnected event a thread will wait for sometime (2/3 of zookeeper.session.timeout),
// it will abort the process if no SyncConnected event reported by the time.
connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 2 / 3;
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
if (canCreateBaseZNode) { if (canCreateBaseZNode) {
createBaseZNodes(); createBaseZNodes();
@ -605,7 +596,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
private void connectionEvent(WatchedEvent event) { private void connectionEvent(WatchedEvent event) {
switch(event.getState()) { switch(event.getState()) {
case SyncConnected: case SyncConnected:
isConnected.set(true);
// Now, this callback can be invoked before the this.zookeeper is set. // Now, this callback can be invoked before the this.zookeeper is set.
// Wait a little while. // Wait a little while.
long finished = System.currentTimeMillis() + long finished = System.currentTimeMillis() +
@ -635,35 +625,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// Abort the server if Disconnected or Expired // Abort the server if Disconnected or Expired
case Disconnected: case Disconnected:
LOG.debug("Received Disconnected from ZooKeeper."); LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
isConnected.set(false);
Thread t = new Thread() {
public void run() {
long startTime = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - startTime < connWaitTimeOut) {
if (isConnected.get()) {
LOG.debug("Client got reconnected to zookeeper.");
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
if (!isConnected.get() && abortable != null) {
String msg =
prefix("Couldn't connect to ZooKeeper after waiting " + connWaitTimeOut
+ " ms, aborting");
abortable.abort(msg, new KeeperException.ConnectionLossException());
}
};
};
t.setDaemon(true);
t.start();
break; break;
case Expired: case Expired:

View File

@ -22,22 +22,14 @@ import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category({SmallTests.class}) @Category({SmallTests.class})
public class TestZooKeeperWatcher { public class TestZooKeeperWatcher {
private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class);
@Test @Test
public void testIsClientReadable() throws ZooKeeperConnectionException, IOException { public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
@ -65,43 +57,4 @@ public class TestZooKeeperWatcher {
watcher.close(); watcher.close();
} }
@Test
public void testConnectionEvent() throws ZooKeeperConnectionException, IOException {
long zkSessionTimeout = 15000l;
Configuration conf = HBaseConfiguration.create();
conf.set("zookeeper.session.timeout", "15000");
Abortable abortable = new Abortable() {
boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
aborted = true;
LOG.error(why, e);
}
@Override
public boolean isAborted() {
return aborted;
}
};
ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "testConnectionEvent", abortable, false);
WatchedEvent event =
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
long startTime = EnvironmentEdgeManager.currentTime();
while (!abortable.isAborted()
&& (EnvironmentEdgeManager.currentTime() - startTime < zkSessionTimeout)) {
watcher.process(event);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
assertTrue(abortable.isAborted());
watcher.close();
}
} }