From 7a5b0783068415eaca70ea2ca938ecbfe3bed30f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 3 Dec 2017 08:30:30 +0800 Subject: [PATCH] HBASE-19399 Purge curator dependency from hbase-client --- .../hbase/testclassification/ZKTests.java | 21 ++ hbase-client/pom.xml | 8 - .../hadoop/hbase/client/ZKAsyncRegistry.java | 113 ++---- .../hbase/zookeeper/ReadOnlyZKClient.java | 347 ++++++++++++++++++ .../hbase/client/TestZKAsyncRegistry.java | 79 ++-- .../hbase/zookeeper/MiniZooKeeperCluster.java | 4 + .../hadoop/hbase/zookeeper/ZKMainServer.java | 6 +- .../hbase/zookeeper/TestInstancePending.java | 6 +- .../hbase/zookeeper/TestReadOnlyZKClient.java | 141 +++++++ .../hadoop/hbase/zookeeper/TestZKMetrics.java | 6 +- .../hadoop/hbase/zookeeper/TestZKUtil.java | 8 +- .../hadoop/hbase/zookeeper/TestZKWatcher.java | 3 +- .../src/test/resources/log4j.properties | 68 ++++ pom.xml | 15 + 14 files changed, 689 insertions(+), 136 deletions(-) create mode 100644 hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java create mode 100644 hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java create mode 100644 hbase-zookeeper/src/test/resources/log4j.properties diff --git a/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java b/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java new file mode 100644 index 00000000000..ad869fa2fea --- /dev/null +++ b/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.testclassification; + +public interface ZKTests { +} diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index cc112d4652c..45393b5bf77 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -195,14 +195,6 @@ mockito-core test - - org.apache.curator - curator-framework - - - org.apache.curator - curator-client - org.apache.commons commons-crypto diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index 6d49b7f8570..bd8325e9d99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; @@ -28,16 +26,10 @@ import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.retry.RetryNTimes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HRegionLocation; @@ -45,16 +37,14 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.data.Stat; - /** * Fetch the registry data from zookeeper. @@ -64,53 +54,36 @@ class ZKAsyncRegistry implements AsyncRegistry { private static final Log LOG = LogFactory.getLog(ZKAsyncRegistry.class); - private final CuratorFramework zk; + private final ReadOnlyZKClient zk; private final ZNodePaths znodePaths; ZKAsyncRegistry(Configuration conf) { this.znodePaths = new ZNodePaths(conf); - int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); - int zkRetry = conf.getInt("zookeeper.recovery.retry", 30); - int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); - this.zk = CuratorFrameworkFactory.builder() - .connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout) - .retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs)) - .threadFactory( - Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode()))) - .build(); - this.zk.start(); - // TODO: temporary workaround for HBASE-19312, must be removed before 2.0.0 release! - try { - this.zk.blockUntilConnected(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - return; - } + this.zk = new ReadOnlyZKClient(conf); } - private interface CuratorEventProcessor { - T process(CuratorEvent event) throws Exception; + private interface Converter { + T convert(byte[] data) throws Exception; } - private static CompletableFuture exec(BackgroundPathable opBuilder, String path, - CuratorEventProcessor processor) { + private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - try { - opBuilder.inBackground((client, event) -> { - try { - future.complete(processor.process(event)); - } catch (Exception e) { - future.completeExceptionally(e); - } - }).withUnhandledErrorListener((msg, e) -> future.completeExceptionally(e)).forPath(path); - } catch (Exception e) { - future.completeExceptionally(e); - } + zk.get(path).whenComplete((data, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + try { + future.complete(converter.convert(data)); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); return future; } - private static String getClusterId(CuratorEvent event) throws DeserializationException { - byte[] data = event.getData(); + private static String getClusterId(byte[] data) throws DeserializationException { if (data == null || data.length == 0) { return null; } @@ -120,17 +93,15 @@ class ZKAsyncRegistry implements AsyncRegistry { @Override public CompletableFuture getClusterId() { - return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); + return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); } @VisibleForTesting - CuratorFramework getCuratorFramework() { + ReadOnlyZKClient getZKClient() { return zk; } - private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event) - throws IOException { - byte[] data = event.getData(); + private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { if (data == null || data.length == 0) { return null; } @@ -169,7 +140,7 @@ class ZKAsyncRegistry implements AsyncRegistry { MutableInt remaining = new MutableInt(locs.length); znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> { if (replicaId == DEFAULT_REPLICA_ID) { - exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { + getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -184,13 +155,13 @@ class ZKAsyncRegistry implements AsyncRegistry { new IOException("Meta region is in state " + stateAndServerName.getFirst())); return; } - locs[DEFAULT_REPLICA_ID] = new HRegionLocation( - getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), - stateAndServerName.getSecond()); + locs[DEFAULT_REPLICA_ID] = + new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), + stateAndServerName.getSecond()); tryComplete(remaining, locs, future); }); } else { - exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { + getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { if (future.isDone()) { return; } @@ -203,13 +174,13 @@ class ZKAsyncRegistry implements AsyncRegistry { } else { Pair stateAndServerName = getStateAndServerName(proto); if (stateAndServerName.getFirst() != RegionState.State.OPEN) { - LOG.warn("Meta region for replica " + replicaId + " is in state " - + stateAndServerName.getFirst()); + LOG.warn("Meta region for replica " + replicaId + " is in state " + + stateAndServerName.getFirst()); locs[replicaId] = null; } else { - locs[replicaId] = new HRegionLocation( - getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), - stateAndServerName.getSecond()); + locs[replicaId] = + new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), + stateAndServerName.getSecond()); } } tryComplete(remaining, locs, future); @@ -219,18 +190,12 @@ class ZKAsyncRegistry implements AsyncRegistry { return future; } - private static int getCurrentNrHRS(CuratorEvent event) { - Stat stat = event.getStat(); - return stat != null ? stat.getNumChildren() : 0; - } - @Override public CompletableFuture getCurrentNrHRS() { - return exec(zk.checkExists(), znodePaths.rsZNode, ZKAsyncRegistry::getCurrentNrHRS); + return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0); } - private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws IOException { - byte[] data = event.getData(); + private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { if (data == null || data.length == 0) { return null; } @@ -241,7 +206,7 @@ class ZKAsyncRegistry implements AsyncRegistry { @Override public CompletableFuture getMasterAddress() { - return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) + return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) .thenApply(proto -> { if (proto == null) { return null; @@ -254,7 +219,7 @@ class ZKAsyncRegistry implements AsyncRegistry { @Override public CompletableFuture getMasterInfoPort() { - return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) + return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) .thenApply(proto -> proto != null ? proto.getInfoPort() : 0); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java new file mode 100644 index 00000000000..965a243937e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * A very simple read only zookeeper implementation without watcher support. + */ +@InterfaceAudience.Private +public final class ReadOnlyZKClient implements Closeable { + + private static final Log LOG = LogFactory.getLog(ReadOnlyZKClient.class); + + public static final String RECOVERY_RETRY = "zookeeper.recovery.retry"; + + private static final int DEFAULT_RECOVERY_RETRY = 30; + + public static final String RECOVERY_RETRY_INTERVAL_MILLIS = + "zookeeper.recovery.retry.intervalmill"; + + private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000; + + public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time"; + + private static final int DEFAULT_KEEPALIVE_MILLIS = 60000; + + private static final EnumSet FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED); + + private final String connectString; + + private final int sessionTimeoutMs; + + private final int maxRetries; + + private final int retryIntervalMs; + + private final int keepAliveTimeMs; + + private static abstract class Task implements Delayed { + + protected long time = System.nanoTime(); + + public boolean needZk() { + return false; + } + + public void exec(ZooKeeper zk) { + } + + public void connectFailed(IOException e) { + } + + public void closed(IOException e) { + } + + @Override + public int compareTo(Delayed o) { + Task that = (Task) o; + int c = Long.compare(time, that.time); + if (c != 0) { + return c; + } + return Integer.compare(System.identityHashCode(this), System.identityHashCode(that)); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); + } + } + + private static final Task CLOSE = new Task() { + }; + + private final DelayQueue tasks = new DelayQueue<>(); + + private final AtomicBoolean closed = new AtomicBoolean(false); + + private ZooKeeper zookeeper; + + private String getId() { + return String.format("0x%08x", System.identityHashCode(this)); + } + + public ReadOnlyZKClient(Configuration conf) { + this.connectString = ZKConfig.getZKQuorumServersString(conf); + this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); + this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); + this.retryIntervalMs = + conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); + this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); + LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString + + ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries + + ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms"); + Thread t = new Thread(this::run, "ReadOnlyZKClient"); + t.setDaemon(true); + t.start(); + } + + private abstract class ZKTask extends Task { + + protected final String path; + + private final CompletableFuture future; + + private final String operationType; + + private int retries; + + protected ZKTask(String path, CompletableFuture future, String operationType) { + this.path = path; + this.future = future; + this.operationType = operationType; + } + + protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) { + tasks.add(new Task() { + + @Override + public void exec(ZooKeeper alwaysNull) { + Code code = Code.get(rc); + if (code == Code.OK) { + future.complete(ret); + } else if (code == Code.NONODE) { + if (errorIfNoNode) { + future.completeExceptionally(KeeperException.create(code, path)); + } else { + future.complete(ret); + } + } else if (FAIL_FAST_CODES.contains(code)) { + future.completeExceptionally(KeeperException.create(code, path)); + } else { + if (code == Code.SESSIONEXPIRED) { + LOG.warn(getId() + " session expired, close and reconnect"); + try { + zk.close(); + } catch (InterruptedException e) { + } + } + if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { + LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + + code + ", retries = " + ZKTask.this.retries); + tasks.add(ZKTask.this); + } else { + LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + + code + ", retries = " + ZKTask.this.retries + ", give up"); + future.completeExceptionally(KeeperException.create(code, path)); + } + } + } + + @Override + public void closed(IOException e) { + // It may happen that a request is succeeded and the onComplete has been called and pushed + // us into the task queue, but before we get called a close is called and here we will + // fail the request, although it is succeeded actually. + // This is not a perfect solution but anyway, it is better than hang the requests for + // ever, and also acceptable as if you close the zk client before actually getting the + // response then a failure is always possible. + future.completeExceptionally(e); + } + }); + } + + @Override + public boolean needZk() { + return true; + } + + public boolean delay(long intervalMs, int maxRetries) { + if (retries >= maxRetries) { + return false; + } + retries++; + time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs); + return true; + } + + @Override + public void connectFailed(IOException e) { + if (delay(retryIntervalMs, maxRetries)) { + LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + + ", retries = " + retries, + e); + tasks.add(this); + } else { + LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + + ", retries = " + retries + ", give up", + e); + future.completeExceptionally(e); + } + } + + @Override + public void closed(IOException e) { + future.completeExceptionally(e); + } + } + + private static CompletableFuture failed(Throwable e) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + + public CompletableFuture get(String path) { + if (closed.get()) { + return failed(new IOException("Client already closed")); + } + CompletableFuture future = new CompletableFuture<>(); + tasks.add(new ZKTask(path, future, "get") { + + @Override + public void exec(ZooKeeper zk) { + zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), + null); + } + }); + return future; + } + + public CompletableFuture exists(String path) { + if (closed.get()) { + return failed(new IOException("Client already closed")); + } + CompletableFuture future = new CompletableFuture<>(); + tasks.add(new ZKTask(path, future, "exists") { + + @Override + public void exec(ZooKeeper zk) { + zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); + } + }); + return future; + } + + private void closeZk() { + if (zookeeper != null) { + try { + zookeeper.close(); + } catch (InterruptedException e) { + } + zookeeper = null; + } + } + + private ZooKeeper getZk() throws IOException { + // may be closed when session expired + if (zookeeper == null || !zookeeper.getState().isAlive()) { + zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> { + }); + } + return zookeeper; + } + + private void run() { + for (;;) { + Task task; + try { + task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + continue; + } + if (task == CLOSE) { + break; + } + if (task == null) { + LOG.info(getId() + " no activities for " + keepAliveTimeMs + + " ms, close active connection. Will reconnect next time when there are new requests."); + closeZk(); + continue; + } + if (!task.needZk()) { + task.exec(null); + } else { + ZooKeeper zk; + try { + zk = getZk(); + } catch (IOException e) { + task.connectFailed(e); + continue; + } + task.exec(zk); + } + } + closeZk(); + IOException error = new IOException("Client already closed"); + Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error)); + tasks.clear(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + LOG.info("Close zookeeper connection " + getId() + " to " + connectString); + tasks.add(CLOSE); + } + } + + @VisibleForTesting + ZooKeeper getZooKeeper() { + return zookeeper; + } + + @VisibleForTesting + public String getConnectString() { + return connectString; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java index a8a7de04426..0ca8e739027 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; -import org.apache.curator.CuratorZookeeperClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -38,14 +36,15 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({ MediumTests.class, ClientTests.class }) +@Category({ MediumTests.class, ZKTests.class }) public class TestZKAsyncRegistry { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -54,33 +53,34 @@ public class TestZKAsyncRegistry { // waits for all replicas to have region location static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException { - TEST_UTIL.waitFor(TEST_UTIL.getConfiguration() - .getLong("hbase.client.sync.wait.timeout.msec", 60000), - 200, true, new ExplainingPredicate() { - @Override - public String explainFailure() throws IOException { - return TEST_UTIL.explainTableAvailability(tbl); - } - - @Override - public boolean evaluate() throws IOException { - AtomicBoolean ready = new AtomicBoolean(true); - try { - RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); - assertEquals(3, locs.getRegionLocations().length); - IntStream.range(0, 3).forEach(i -> { - HRegionLocation loc = locs.getRegionLocation(i); - if (loc == null) { - ready.set(false); - } - }); - } catch (Exception e) { - ready.set(false); + TEST_UTIL.waitFor( + TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true, + new ExplainingPredicate() { + @Override + public String explainFailure() throws IOException { + return TEST_UTIL.explainTableAvailability(tbl); } - return ready.get(); - } - }); + + @Override + public boolean evaluate() throws IOException { + AtomicBoolean ready = new AtomicBoolean(true); + try { + RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); + assertEquals(3, locs.getRegionLocations().length); + IntStream.range(0, 3).forEach(i -> { + HRegionLocation loc = locs.getRegionLocation(i); + if (loc == null) { + ready.set(false); + } + }); + } catch (Exception e) { + ready.set(false); + } + return ready.get(); + } + }); } + @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); @@ -109,25 +109,26 @@ public class TestZKAsyncRegistry { IntStream.range(0, 3).forEach(i -> { HRegionLocation loc = locs.getRegionLocation(i); assertNotNull("Replica " + i + " doesn't have location", loc); - assertTrue(loc.getRegionInfo().getTable().equals(TableName.META_TABLE_NAME)); - assertEquals(i, loc.getRegionInfo().getReplicaId()); + assertEquals(TableName.META_TABLE_NAME, loc.getRegion().getTable()); + assertEquals(i, loc.getRegion().getReplicaId()); }); } @Test public void testIndependentZKConnections() throws IOException { - final CuratorZookeeperClient zk1 = REGISTRY.getCuratorFramework().getZookeeperClient(); + ReadOnlyZKClient zk1 = REGISTRY.getZKClient(); - final Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); + Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); - try (final ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) { - final CuratorZookeeperClient zk2 = otherRegistry.getCuratorFramework().getZookeeperClient(); + try (ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) { + ReadOnlyZKClient zk2 = otherRegistry.getZKClient(); assertNotSame("Using a different configuration / quorum should result in different backing " + - "zk connection.", zk1, zk2); - assertNotEquals("Using a different configrution / quorum should be reflected in the " + - "zk connection.", zk1.getCurrentConnectionString(), zk2.getCurrentConnectionString()); + "zk connection.", + zk1, zk2); + assertNotEquals( + "Using a different configrution / quorum should be reflected in the " + "zk connection.", + zk1.getConnectString(), zk2.getConnectString()); } } - } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java index ef643bf6f4a..cb8681c826e 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java @@ -469,4 +469,8 @@ public class MiniZooKeeperCluster { return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1 : clientPortList.get(activeZKServerIndex); } + + List getZooKeeperServers() { + return zooKeeperServers; + } } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java index 9cb0e7d28da..2db83eb615a 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +21,6 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; import java.util.concurrent.TimeUnit; -import org.apache.curator.shaded.com.google.common.base.Stopwatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -30,6 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeperMain; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch; + /** * Tool for running ZookeeperMain from HBase by reading a ZooKeeper server * from HBase XML configuration. diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java index e67c9fdbdf5..5c984a55f16 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,17 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.zookeeper; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) +@Category({ ZKTests.class, SmallTests.class }) public class TestInstancePending { @Test(timeout = 1000) public void test() throws Exception { diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java new file mode 100644 index 00000000000..765ddf96463 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ZKTests.class, MediumTests.class }) +public class TestReadOnlyZKClient { + + private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private static MiniZooKeeperCluster CLUSTER; + + private static int PORT; + + private static String PATH = "/test"; + + private static byte[] DATA; + + private static int CHILDREN = 5; + + private static ReadOnlyZKClient RO_ZK; + + @BeforeClass + public static void setUp() throws IOException, InterruptedException, KeeperException { + File file = + new File(UTIL.getDataTestDir("zkcluster_" + UUID.randomUUID().toString()).toString()); + CLUSTER = new MiniZooKeeperCluster(UTIL.getConfiguration()); + PORT = CLUSTER.startup(file); + ZooKeeper zk = new ZooKeeper("localhost:" + PORT, 10000, e -> { + }); + DATA = new byte[10]; + ThreadLocalRandom.current().nextBytes(DATA); + zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + for (int i = 0; i < CHILDREN; i++) { + zk.create(PATH + "/c" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + zk.close(); + Configuration conf = UTIL.getConfiguration(); + conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:" + PORT); + conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3); + conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100); + conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); + RO_ZK = new ReadOnlyZKClient(conf); + // only connect when necessary + assertNull(RO_ZK.getZooKeeper()); + } + + @AfterClass + public static void tearDown() throws IOException { + RO_ZK.close(); + CLUSTER.shutdown(); + UTIL.cleanupTestDir(); + } + + @Test + public void testGetAndExists() throws InterruptedException, ExecutionException { + assertArrayEquals(DATA, RO_ZK.get(PATH).get()); + assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren()); + assertNotNull(RO_ZK.getZooKeeper()); + // a little longer than keep alive millis + Thread.sleep(5000); + assertNull(RO_ZK.getZooKeeper()); + } + + @Test + public void testNoNode() throws InterruptedException, ExecutionException { + String pathNotExists = PATH + "_whatever"; + try { + RO_ZK.get(pathNotExists).get(); + fail("should fail because of " + pathNotExists + " does not exist"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(KeeperException.class)); + KeeperException ke = (KeeperException) e.getCause(); + assertEquals(Code.NONODE, ke.code()); + assertEquals(pathNotExists, ke.getPath()); + } + // exists will not throw exception. + assertNull(RO_ZK.exists(pathNotExists).get()); + } + + @Test + public void testSessionExpire() throws Exception { + assertArrayEquals(DATA, RO_ZK.get(PATH).get()); + ZooKeeper zk = RO_ZK.getZooKeeper(); + long sessionId = zk.getSessionId(); + CLUSTER.getZooKeeperServers().get(0).closeSession(sessionId); + // should not reach keep alive so still the same instance + assertSame(zk, RO_ZK.getZooKeeper()); + + assertArrayEquals(DATA, RO_ZK.get(PATH).get()); + assertNotNull(RO_ZK.getZooKeeper()); + assertNotSame(zk, RO_ZK.getZooKeeper()); + assertNotEquals(sessionId, RO_ZK.getZooKeeper().getSessionId()); + } +} diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java index 2811cc5ee08..e43a5c898bb 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,10 +22,11 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) +@Category({ ZKTests.class, SmallTests.class }) public class TestZKMetrics { @Test diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java index 7006040bc1f..db963922833 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; @@ -37,10 +38,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -/** - * - */ -@Category({SmallTests.class}) +@Category({ ZKTests.class, SmallTests.class }) public class TestZKUtil { @Test diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java index bd4575d6dd6..f3d0b035132 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java @@ -24,10 +24,11 @@ import java.io.IOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({ SmallTests.class }) +@Category({ ZKTests.class, SmallTests.class }) public class TestZKWatcher { @Test diff --git a/hbase-zookeeper/src/test/resources/log4j.properties b/hbase-zookeeper/src/test/resources/log4j.properties new file mode 100644 index 00000000000..c322699ced2 --- /dev/null +++ b/hbase-zookeeper/src/test/resources/log4j.properties @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hbase.root.logger=INFO,console +hbase.log.dir=. +hbase.log.file=hbase.log + +# Define the root logger to the system property "hbase.root.logger". +log4j.rootLogger=${hbase.root.logger} + +# Logging Threshold +log4j.threshold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG + +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.hadoop.hbase=DEBUG + +#These settings are workarounds against spurious logs from the minicluster. +#See HBASE-4709 +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN +log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN +# Enable this to get detailed connection error/retry logging. +# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE diff --git a/pom.xml b/pom.xml index 932bc265bef..53aa115ba3e 100755 --- a/pom.xml +++ b/pom.xml @@ -3273,6 +3273,21 @@ + + runZKTests + + false + + + 1 + 1 + false + true + org.apache.hadoop.hbase.testclassification.ZKTests + + + +