Revert "HBASE-19772 ReadOnlyZKClient improvements"

Pushed by mistake. Reverting from master.

This reverts commit 70515f5311.
This commit is contained in:
Michael Stack 2018-01-11 14:27:23 -08:00
parent 09ae5abbe7
commit a4a4ce8eac
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
1 changed files with 23 additions and 35 deletions

View File

@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -117,8 +116,6 @@ public final class ReadOnlyZKClient implements Closeable {
private ZooKeeper zookeeper; private ZooKeeper zookeeper;
private int pendingRequests = 0;
private String getId() { private String getId() {
return String.format("0x%08x", System.identityHashCode(this)); return String.format("0x%08x", System.identityHashCode(this));
} }
@ -130,12 +127,12 @@ public final class ReadOnlyZKClient implements Closeable {
this.retryIntervalMs = this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
LOG.info( LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString +
"Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " + ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries +
"retry interval {} ms, keep alive {} ms", ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms");
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); Thread t = new Thread(this::run, "ReadOnlyZKClient");
Threads.setDaemonThreadRunning(new Thread(this::run), t.setDaemon(true);
"ReadOnlyZKClient-" + connectString + "@" + getId()); t.start();
} }
private abstract class ZKTask<T> extends Task { private abstract class ZKTask<T> extends Task {
@ -159,7 +156,6 @@ public final class ReadOnlyZKClient implements Closeable {
@Override @Override
public void exec(ZooKeeper alwaysNull) { public void exec(ZooKeeper alwaysNull) {
pendingRequests--;
Code code = Code.get(rc); Code code = Code.get(rc);
if (code == Code.OK) { if (code == Code.OK) {
future.complete(ret); future.complete(ret);
@ -173,19 +169,19 @@ public final class ReadOnlyZKClient implements Closeable {
future.completeExceptionally(KeeperException.create(code, path)); future.completeExceptionally(KeeperException.create(code, path));
} else { } else {
if (code == Code.SESSIONEXPIRED) { if (code == Code.SESSIONEXPIRED) {
LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); LOG.warn(getId() + " session expired, close and reconnect");
try { try {
zk.close(); zk.close();
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
} }
if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
connectString, operationType, path, code, ZKTask.this.retries); code + ", retries = " + ZKTask.this.retries);
tasks.add(ZKTask.this); tasks.add(ZKTask.this);
} else { } else {
LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
connectString, operationType, path, code, ZKTask.this.retries); code + ", retries = " + ZKTask.this.retries + ", give up");
future.completeExceptionally(KeeperException.create(code, path)); future.completeExceptionally(KeeperException.create(code, path));
} }
} }
@ -209,14 +205,6 @@ public final class ReadOnlyZKClient implements Closeable {
return true; return true;
} }
protected abstract void doExec(ZooKeeper zk);
@Override
public final void exec(ZooKeeper zk) {
pendingRequests++;
doExec(zk);
}
public boolean delay(long intervalMs, int maxRetries) { public boolean delay(long intervalMs, int maxRetries) {
if (retries >= maxRetries) { if (retries >= maxRetries) {
return false; return false;
@ -229,12 +217,14 @@ public final class ReadOnlyZKClient implements Closeable {
@Override @Override
public void connectFailed(IOException e) { public void connectFailed(IOException e) {
if (delay(retryIntervalMs, maxRetries)) { if (delay(retryIntervalMs, maxRetries)) {
LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
connectString, operationType, path, retries, e); ", retries = " + retries,
e);
tasks.add(this); tasks.add(this);
} else { } else {
LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
connectString, operationType, path, retries, e); ", retries = " + retries + ", give up",
e);
future.completeExceptionally(e); future.completeExceptionally(e);
} }
} }
@ -259,7 +249,7 @@ public final class ReadOnlyZKClient implements Closeable {
tasks.add(new ZKTask<byte[]>(path, future, "get") { tasks.add(new ZKTask<byte[]>(path, future, "get") {
@Override @Override
protected void doExec(ZooKeeper zk) { public void exec(ZooKeeper zk) {
zk.getData(path, false, zk.getData(path, false,
(rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
} }
@ -275,7 +265,7 @@ public final class ReadOnlyZKClient implements Closeable {
tasks.add(new ZKTask<Stat>(path, future, "exists") { tasks.add(new ZKTask<Stat>(path, future, "exists") {
@Override @Override
protected void doExec(ZooKeeper zk) { public void exec(ZooKeeper zk) {
zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
} }
}); });
@ -321,11 +311,9 @@ public final class ReadOnlyZKClient implements Closeable {
if (task == CLOSE) { if (task == CLOSE) {
break; break;
} }
if (task == null && pendingRequests == 0) { if (task == null) {
LOG.info( LOG.info(getId() + " no activities for " + keepAliveTimeMs +
"{} to {} no activities for {} ms, close active connection. " + " ms, close active connection. Will reconnect next time when there are new requests.");
"Will reconnect next time when there are new requests",
getId(), connectString, keepAliveTimeMs);
closeZk(); closeZk();
continue; continue;
} }
@ -351,7 +339,7 @@ public final class ReadOnlyZKClient implements Closeable {
@Override @Override
public void close() { public void close() {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
LOG.info("Close zookeeper connection {} to {}", getId(), connectString); LOG.info("Close zookeeper connection " + getId() + " to " + connectString);
tasks.add(CLOSE); tasks.add(CLOSE);
} }
} }