HBASE-19399 Purge curator dependency from hbase-client

This commit is contained in:
zhangduo 2017-12-03 08:30:30 +08:00
parent 8354a563f7
commit 7a5b078306
14 changed files with 689 additions and 136 deletions

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testclassification;
public interface ZKTests {
}

View File

@ -195,14 +195,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
@ -28,16 +26,10 @@ import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HRegionLocation;
@ -45,16 +37,14 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.data.Stat;
/**
* Fetch the registry data from zookeeper.
@ -64,53 +54,36 @@ class ZKAsyncRegistry implements AsyncRegistry {
private static final Log LOG = LogFactory.getLog(ZKAsyncRegistry.class);
private final CuratorFramework zk;
private final ReadOnlyZKClient zk;
private final ZNodePaths znodePaths;
ZKAsyncRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf);
int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
int zkRetry = conf.getInt("zookeeper.recovery.retry", 30);
int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
this.zk = CuratorFrameworkFactory.builder()
.connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs))
.threadFactory(
Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode())))
.build();
this.zk.start();
// TODO: temporary workaround for HBASE-19312, must be removed before 2.0.0 release!
try {
this.zk.blockUntilConnected(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
this.zk = new ReadOnlyZKClient(conf);
}
private interface CuratorEventProcessor<T> {
T process(CuratorEvent event) throws Exception;
private interface Converter<T> {
T convert(byte[] data) throws Exception;
}
private static <T> CompletableFuture<T> exec(BackgroundPathable<?> opBuilder, String path,
CuratorEventProcessor<T> processor) {
private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
try {
opBuilder.inBackground((client, event) -> {
try {
future.complete(processor.process(event));
} catch (Exception e) {
future.completeExceptionally(e);
}
}).withUnhandledErrorListener((msg, e) -> future.completeExceptionally(e)).forPath(path);
} catch (Exception e) {
future.completeExceptionally(e);
}
zk.get(path).whenComplete((data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
try {
future.complete(converter.convert(data));
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
private static String getClusterId(CuratorEvent event) throws DeserializationException {
byte[] data = event.getData();
private static String getClusterId(byte[] data) throws DeserializationException {
if (data == null || data.length == 0) {
return null;
}
@ -120,17 +93,15 @@ class ZKAsyncRegistry implements AsyncRegistry {
@Override
public CompletableFuture<String> getClusterId() {
return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
}
@VisibleForTesting
CuratorFramework getCuratorFramework() {
ReadOnlyZKClient getZKClient() {
return zk;
}
private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
throws IOException {
byte[] data = event.getData();
private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
@ -169,7 +140,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
if (replicaId == DEFAULT_REPLICA_ID) {
exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@ -184,13 +155,13 @@ class ZKAsyncRegistry implements AsyncRegistry {
new IOException("Meta region is in state " + stateAndServerName.getFirst()));
return;
}
locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
stateAndServerName.getSecond());
locs[DEFAULT_REPLICA_ID] =
new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
stateAndServerName.getSecond());
tryComplete(remaining, locs, future);
});
} else {
exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
if (future.isDone()) {
return;
}
@ -203,13 +174,13 @@ class ZKAsyncRegistry implements AsyncRegistry {
} else {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state "
+ stateAndServerName.getFirst());
LOG.warn("Meta region for replica " + replicaId + " is in state " +
stateAndServerName.getFirst());
locs[replicaId] = null;
} else {
locs[replicaId] = new HRegionLocation(
getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
locs[replicaId] =
new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
}
}
tryComplete(remaining, locs, future);
@ -219,18 +190,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
return future;
}
private static int getCurrentNrHRS(CuratorEvent event) {
Stat stat = event.getStat();
return stat != null ? stat.getNumChildren() : 0;
}
@Override
public CompletableFuture<Integer> getCurrentNrHRS() {
return exec(zk.checkExists(), znodePaths.rsZNode, ZKAsyncRegistry::getCurrentNrHRS);
return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0);
}
private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws IOException {
byte[] data = event.getData();
private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
@ -241,7 +206,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
@Override
public CompletableFuture<ServerName> getMasterAddress() {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
@ -254,7 +219,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
@Override
public CompletableFuture<Integer> getMasterInfoPort() {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
}

View File

@ -0,0 +1,347 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* A very simple read only zookeeper implementation without watcher support.
*/
@InterfaceAudience.Private
public final class ReadOnlyZKClient implements Closeable {
private static final Log LOG = LogFactory.getLog(ReadOnlyZKClient.class);
public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
private static final int DEFAULT_RECOVERY_RETRY = 30;
public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
"zookeeper.recovery.retry.intervalmill";
private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
private final String connectString;
private final int sessionTimeoutMs;
private final int maxRetries;
private final int retryIntervalMs;
private final int keepAliveTimeMs;
private static abstract class Task implements Delayed {
protected long time = System.nanoTime();
public boolean needZk() {
return false;
}
public void exec(ZooKeeper zk) {
}
public void connectFailed(IOException e) {
}
public void closed(IOException e) {
}
@Override
public int compareTo(Delayed o) {
Task that = (Task) o;
int c = Long.compare(time, that.time);
if (c != 0) {
return c;
}
return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
private static final Task CLOSE = new Task() {
};
private final DelayQueue<Task> tasks = new DelayQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private ZooKeeper zookeeper;
private String getId() {
return String.format("0x%08x", System.identityHashCode(this));
}
public ReadOnlyZKClient(Configuration conf) {
this.connectString = ZKConfig.getZKQuorumServersString(conf);
this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString +
", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries +
", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms");
Thread t = new Thread(this::run, "ReadOnlyZKClient");
t.setDaemon(true);
t.start();
}
private abstract class ZKTask<T> extends Task {
protected final String path;
private final CompletableFuture<T> future;
private final String operationType;
private int retries;
protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
this.path = path;
this.future = future;
this.operationType = operationType;
}
protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
tasks.add(new Task() {
@Override
public void exec(ZooKeeper alwaysNull) {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(ret);
} else if (code == Code.NONODE) {
if (errorIfNoNode) {
future.completeExceptionally(KeeperException.create(code, path));
} else {
future.complete(ret);
}
} else if (FAIL_FAST_CODES.contains(code)) {
future.completeExceptionally(KeeperException.create(code, path));
} else {
if (code == Code.SESSIONEXPIRED) {
LOG.warn(getId() + " session expired, close and reconnect");
try {
zk.close();
} catch (InterruptedException e) {
}
}
if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
code + ", retries = " + ZKTask.this.retries);
tasks.add(ZKTask.this);
} else {
LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
code + ", retries = " + ZKTask.this.retries + ", give up");
future.completeExceptionally(KeeperException.create(code, path));
}
}
}
@Override
public void closed(IOException e) {
// It may happen that a request is succeeded and the onComplete has been called and pushed
// us into the task queue, but before we get called a close is called and here we will
// fail the request, although it is succeeded actually.
// This is not a perfect solution but anyway, it is better than hang the requests for
// ever, and also acceptable as if you close the zk client before actually getting the
// response then a failure is always possible.
future.completeExceptionally(e);
}
});
}
@Override
public boolean needZk() {
return true;
}
public boolean delay(long intervalMs, int maxRetries) {
if (retries >= maxRetries) {
return false;
}
retries++;
time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
return true;
}
@Override
public void connectFailed(IOException e) {
if (delay(retryIntervalMs, maxRetries)) {
LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
", retries = " + retries,
e);
tasks.add(this);
} else {
LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
", retries = " + retries + ", give up",
e);
future.completeExceptionally(e);
}
}
@Override
public void closed(IOException e) {
future.completeExceptionally(e);
}
}
private static <T> CompletableFuture<T> failed(Throwable e) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
public CompletableFuture<byte[]> get(String path) {
if (closed.get()) {
return failed(new IOException("Client already closed"));
}
CompletableFuture<byte[]> future = new CompletableFuture<>();
tasks.add(new ZKTask<byte[]>(path, future, "get") {
@Override
public void exec(ZooKeeper zk) {
zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true),
null);
}
});
return future;
}
public CompletableFuture<Stat> exists(String path) {
if (closed.get()) {
return failed(new IOException("Client already closed"));
}
CompletableFuture<Stat> future = new CompletableFuture<>();
tasks.add(new ZKTask<Stat>(path, future, "exists") {
@Override
public void exec(ZooKeeper zk) {
zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
}
});
return future;
}
private void closeZk() {
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e) {
}
zookeeper = null;
}
}
private ZooKeeper getZk() throws IOException {
// may be closed when session expired
if (zookeeper == null || !zookeeper.getState().isAlive()) {
zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
});
}
return zookeeper;
}
private void run() {
for (;;) {
Task task;
try {
task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
continue;
}
if (task == CLOSE) {
break;
}
if (task == null) {
LOG.info(getId() + " no activities for " + keepAliveTimeMs +
" ms, close active connection. Will reconnect next time when there are new requests.");
closeZk();
continue;
}
if (!task.needZk()) {
task.exec(null);
} else {
ZooKeeper zk;
try {
zk = getZk();
} catch (IOException e) {
task.connectFailed(e);
continue;
}
task.exec(zk);
}
}
closeZk();
IOException error = new IOException("Client already closed");
Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
tasks.clear();
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
LOG.info("Close zookeeper connection " + getId() + " to " + connectString);
tasks.add(CLOSE);
}
}
@VisibleForTesting
ZooKeeper getZooKeeper() {
return zookeeper;
}
@VisibleForTesting
public String getConnectString() {
return connectString;
}
}

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -38,14 +36,15 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
@Category({ MediumTests.class, ZKTests.class })
public class TestZKAsyncRegistry {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -54,33 +53,34 @@ public class TestZKAsyncRegistry {
// waits for all replicas to have region location
static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
TEST_UTIL.waitFor(TEST_UTIL.getConfiguration()
.getLong("hbase.client.sync.wait.timeout.msec", 60000),
200, true, new ExplainingPredicate<IOException>() {
@Override
public String explainFailure() throws IOException {
return TEST_UTIL.explainTableAvailability(tbl);
}
@Override
public boolean evaluate() throws IOException {
AtomicBoolean ready = new AtomicBoolean(true);
try {
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
HRegionLocation loc = locs.getRegionLocation(i);
if (loc == null) {
ready.set(false);
}
});
} catch (Exception e) {
ready.set(false);
TEST_UTIL.waitFor(
TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
new ExplainingPredicate<IOException>() {
@Override
public String explainFailure() throws IOException {
return TEST_UTIL.explainTableAvailability(tbl);
}
return ready.get();
}
});
@Override
public boolean evaluate() throws IOException {
AtomicBoolean ready = new AtomicBoolean(true);
try {
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
HRegionLocation loc = locs.getRegionLocation(i);
if (loc == null) {
ready.set(false);
}
});
} catch (Exception e) {
ready.set(false);
}
return ready.get();
}
});
}
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@ -109,25 +109,26 @@ public class TestZKAsyncRegistry {
IntStream.range(0, 3).forEach(i -> {
HRegionLocation loc = locs.getRegionLocation(i);
assertNotNull("Replica " + i + " doesn't have location", loc);
assertTrue(loc.getRegionInfo().getTable().equals(TableName.META_TABLE_NAME));
assertEquals(i, loc.getRegionInfo().getReplicaId());
assertEquals(TableName.META_TABLE_NAME, loc.getRegion().getTable());
assertEquals(i, loc.getRegion().getReplicaId());
});
}
@Test
public void testIndependentZKConnections() throws IOException {
final CuratorZookeeperClient zk1 = REGISTRY.getCuratorFramework().getZookeeperClient();
ReadOnlyZKClient zk1 = REGISTRY.getZKClient();
final Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
try (final ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) {
final CuratorZookeeperClient zk2 = otherRegistry.getCuratorFramework().getZookeeperClient();
try (ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) {
ReadOnlyZKClient zk2 = otherRegistry.getZKClient();
assertNotSame("Using a different configuration / quorum should result in different backing " +
"zk connection.", zk1, zk2);
assertNotEquals("Using a different configrution / quorum should be reflected in the " +
"zk connection.", zk1.getCurrentConnectionString(), zk2.getCurrentConnectionString());
"zk connection.",
zk1, zk2);
assertNotEquals(
"Using a different configrution / quorum should be reflected in the " + "zk connection.",
zk1.getConnectString(), zk2.getConnectString());
}
}
}

View File

@ -469,4 +469,8 @@ public class MiniZooKeeperCluster {
return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
: clientPortList.get(activeZKServerIndex);
}
List<ZooKeeperServer> getZooKeeperServers() {
return zooKeeperServers;
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -22,7 +21,6 @@ package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.curator.shaded.com.google.common.base.Stopwatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -30,6 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeperMain;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch;
/**
* Tool for running ZookeeperMain from HBase by reading a ZooKeeper server
* from HBase XML configuration.

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,17 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
@Category({ ZKTests.class, SmallTests.class })
public class TestInstancePending {
@Test(timeout = 1000)
public void test() throws Exception {

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ZKTests.class, MediumTests.class })
public class TestReadOnlyZKClient {
private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static MiniZooKeeperCluster CLUSTER;
private static int PORT;
private static String PATH = "/test";
private static byte[] DATA;
private static int CHILDREN = 5;
private static ReadOnlyZKClient RO_ZK;
@BeforeClass
public static void setUp() throws IOException, InterruptedException, KeeperException {
File file =
new File(UTIL.getDataTestDir("zkcluster_" + UUID.randomUUID().toString()).toString());
CLUSTER = new MiniZooKeeperCluster(UTIL.getConfiguration());
PORT = CLUSTER.startup(file);
ZooKeeper zk = new ZooKeeper("localhost:" + PORT, 10000, e -> {
});
DATA = new byte[10];
ThreadLocalRandom.current().nextBytes(DATA);
zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
for (int i = 0; i < CHILDREN; i++) {
zk.create(PATH + "/c" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
zk.close();
Configuration conf = UTIL.getConfiguration();
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:" + PORT);
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3);
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100);
conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
RO_ZK = new ReadOnlyZKClient(conf);
// only connect when necessary
assertNull(RO_ZK.getZooKeeper());
}
@AfterClass
public static void tearDown() throws IOException {
RO_ZK.close();
CLUSTER.shutdown();
UTIL.cleanupTestDir();
}
@Test
public void testGetAndExists() throws InterruptedException, ExecutionException {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
assertNotNull(RO_ZK.getZooKeeper());
// a little longer than keep alive millis
Thread.sleep(5000);
assertNull(RO_ZK.getZooKeeper());
}
@Test
public void testNoNode() throws InterruptedException, ExecutionException {
String pathNotExists = PATH + "_whatever";
try {
RO_ZK.get(pathNotExists).get();
fail("should fail because of " + pathNotExists + " does not exist");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(KeeperException.class));
KeeperException ke = (KeeperException) e.getCause();
assertEquals(Code.NONODE, ke.code());
assertEquals(pathNotExists, ke.getPath());
}
// exists will not throw exception.
assertNull(RO_ZK.exists(pathNotExists).get());
}
@Test
public void testSessionExpire() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
ZooKeeper zk = RO_ZK.getZooKeeper();
long sessionId = zk.getSessionId();
CLUSTER.getZooKeeperServers().get(0).closeSession(sessionId);
// should not reach keep alive so still the same instance
assertSame(zk, RO_ZK.getZooKeeper());
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
assertNotNull(RO_ZK.getZooKeeper());
assertNotSame(zk, RO_ZK.getZooKeeper());
assertNotEquals(sessionId, RO_ZK.getZooKeeper().getSessionId());
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -23,10 +22,11 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
@Category({ ZKTests.class, SmallTests.class })
public class TestZKMetrics {
@Test

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
@ -37,10 +38,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
*
*/
@Category({SmallTests.class})
@Category({ ZKTests.class, SmallTests.class })
public class TestZKUtil {
@Test

View File

@ -24,10 +24,11 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class })
@Category({ ZKTests.class, SmallTests.class })
public class TestZKWatcher {
@Test

View File

@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define some default values that can be overridden by system properties
hbase.root.logger=INFO,console
hbase.log.dir=.
hbase.log.file=hbase.log
# Define the root logger to the system property "hbase.root.logger".
log4j.rootLogger=${hbase.root.logger}
# Logging Threshold
log4j.threshold=ALL
#
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
# Debugging Pattern format
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
# Custom Logging levels
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.hadoop.hbase=DEBUG
#These settings are workarounds against spurious logs from the minicluster.
#See HBASE-4709
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
# Enable this to get detailed connection error/retry logging.
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE

15
pom.xml
View File

@ -3273,6 +3273,21 @@
<surefire.secondPartGroups></surefire.secondPartGroups>
</properties>
</profile>
<profile>
<id>runZKTests</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<surefire.firstPartForkCount>1</surefire.firstPartForkCount>
<surefire.secondPartForkCount>1</surefire.secondPartForkCount>
<surefire.skipFirstPart>false</surefire.skipFirstPart>
<surefire.skipSecondPart>true</surefire.skipSecondPart>
<surefire.firstPartGroups>org.apache.hadoop.hbase.testclassification.ZKTests
</surefire.firstPartGroups>
<surefire.secondPartGroups></surefire.secondPartGroups>
</properties>
</profile>
<profile>
<!-- Use it to launch tests locally-->