HBASE-16835 Revisit the zookeeper usage at client side

This commit is contained in:
zhangduo 2016-10-28 09:50:20 +08:00
parent e108a4f815
commit 3283bc7c91
10 changed files with 434 additions and 117 deletions

View File

@ -203,6 +203,18 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -66,7 +66,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private final User user;
private final ClusterRegistry registry;
private final AsyncRegistry registry;
private final String clusterId;

View File

@ -18,16 +18,28 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Implementations hold cluster information such as this cluster's id.
* Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc..
* All stuffs that may be related to zookeeper at client side are placed here.
* <p>
* Most methods are executed asynchronously except getClusterId. It will be executed synchronously
* and should be called only once when initialization.
* <p>
* Internal use only.
*/
@InterfaceAudience.Private
interface ClusterRegistry extends Closeable {
interface AsyncRegistry extends Closeable {
/**
* Get the location of meta region.
*/
CompletableFuture<RegionLocations> getMetaRegionLocation();
/**
* Should only be called once.
@ -36,6 +48,24 @@ interface ClusterRegistry extends Closeable {
*/
String getClusterId();
/**
* Get the number of 'running' regionservers.
*/
CompletableFuture<Integer> getCurrentNrHRS();
/**
* Get the address of HMaster.
*/
CompletableFuture<ServerName> getMasterAddress();
/**
* Get the info port of HMaster.
*/
CompletableFuture<Integer> getMasterInfoPort();
/**
* Closes this instance and releases any system resources associated with it
*/
@Override
void close();
}

View File

@ -35,9 +35,9 @@ final class ClusterRegistryFactory {
/**
* @return The cluster registry implementation to use.
*/
static ClusterRegistry getRegistry(Configuration conf) {
Class<? extends ClusterRegistry> clazz =
conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, ClusterRegistry.class);
static AsyncRegistry getRegistry(Configuration conf) {
Class<? extends AsyncRegistry> clazz =
conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
}

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HRegionInfo.DEFAULT_REPLICA_ID;
import static org.apache.hadoop.hbase.HRegionInfo.FIRST_META_REGIONINFO;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.removeMetaData;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.data.Stat;
/**
* Cache the cluster registry data in memory and use zk watcher to update. The only exception is
* {@link #getClusterId()}, it will fetch the data from zk directly.
*/
@InterfaceAudience.Private
class ZKAsyncRegistry implements AsyncRegistry {
private static final Log LOG = LogFactory.getLog(ZKAsyncRegistry.class);
private final CuratorFramework zk;
private final ZNodePaths znodePaths;
ZKAsyncRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf);
int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
int zkRetry = conf.getInt("zookeeper.recovery.retry", 3);
int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
this.zk = CuratorFrameworkFactory.builder()
.connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs))
.threadFactory(
Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode())))
.build();
this.zk.start();
}
@Override
public String getClusterId() {
try {
byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode);
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
return ClusterId.parseFrom(data).toString();
} catch (Exception e) {
LOG.warn("failed to get cluster id", e);
return null;
}
}
@Override
public void close() {
zk.close();
}
private interface CuratorEventProcessor<T> {
T process(CuratorEvent event) throws Exception;
}
private static <T> CompletableFuture<T> exec(BackgroundPathable<?> opBuilder, String path,
CuratorEventProcessor<T> processor) {
CompletableFuture<T> future = new CompletableFuture<>();
try {
opBuilder.inBackground((client, event) -> {
try {
future.complete(processor.process(event));
} catch (Exception e) {
future.completeExceptionally(e);
}
}).withUnhandledErrorListener((msg, e) -> future.completeExceptionally(e)).forPath(path);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
throws IOException {
byte[] data = event.getData();
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
int prefixLen = lengthOfPBMagic();
return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
data.length - prefixLen);
}
private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
CompletableFuture<RegionLocations> future) {
remaining.decrement();
if (remaining.intValue() > 0) {
return;
}
future.complete(new RegionLocations(locs));
}
private Pair<RegionState.State, ServerName> getStateAndServerName(
ZooKeeperProtos.MetaRegionServer proto) {
RegionState.State state;
if (proto.hasState()) {
state = RegionState.State.convert(proto.getState());
} else {
state = RegionState.State.OPEN;
}
HBaseProtos.ServerName snProto = proto.getServer();
return Pair.newPair(state,
ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()];
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
if (replicaId == DEFAULT_REPLICA_ID) {
exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (proto == null) {
future.completeExceptionally(new IOException("Meta znode is null"));
return;
}
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
future.completeExceptionally(
new IOException("Meta region is in state " + stateAndServerName.getFirst()));
return;
}
locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
stateAndServerName.getSecond());
tryComplete(remaining, locs, future);
});
} else {
exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
if (future.isDone()) {
return;
}
if (error != null) {
LOG.warn("Failed to fetch " + path, error);
locs[replicaId] = null;
} else if (proto == null) {
LOG.warn("Meta znode for replica " + replicaId + " is null");
locs[replicaId] = null;
} else {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state "
+ stateAndServerName.getFirst());
locs[replicaId] = null;
} else {
locs[replicaId] = new HRegionLocation(
getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
}
}
tryComplete(remaining, locs, future);
});
}
});
return future;
}
private static int getCurrentNrHRS(CuratorEvent event) {
Stat stat = event.getStat();
return stat != null ? stat.getNumChildren() : 0;
}
@Override
public CompletableFuture<Integer> getCurrentNrHRS() {
return exec(zk.checkExists(), znodePaths.rsZNode, ZKAsyncRegistry::getCurrentNrHRS);
}
private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws IOException {
byte[] data = event.getData();
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
int prefixLen = lengthOfPBMagic();
return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
}
@Override
public CompletableFuture<ServerName> getMasterAddress() {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
}
HBaseProtos.ServerName snProto = proto.getMaster();
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode());
});
}
@Override
public CompletableFuture<Integer> getMasterInfoPort() {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
}
}

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
/**
* Cache the cluster registry data in memory and use zk watcher to update. The only exception is
* {@link #getClusterId()}, it will fetch the data from zk directly.
*/
@InterfaceAudience.Private
class ZKClusterRegistry implements ClusterRegistry {
private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class);
private final RecoverableZooKeeper zk;
private final ZNodePaths znodePaths;
ZKClusterRegistry(Configuration conf) throws IOException {
this.znodePaths = new ZNodePaths(conf);
int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
int zkRetry = conf.getInt("zookeeper.recovery.retry", 3);
int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
this.zk = new RecoverableZooKeeper(ZKConfig.getZKQuorumServersString(conf), zkSessionTimeout,
null, zkRetry, zkRetryIntervalMs);
}
@Override
public String getClusterId() {
try {
byte[] data = zk.getData(znodePaths.clusterIdZNode, false, null);
if (data == null || data.length == 0) {
return null;
}
return ClusterId.parseFrom(data).toString();
} catch (Exception e) {
LOG.warn("failed to get cluster id", e);
return null;
}
}
@Override
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
LOG.warn("close zookeeper failed", e);
}
}
}

View File

@ -352,7 +352,7 @@ public class RecoverableZooKeeper {
while (true) {
try {
byte[] revData = checkZk().getData(path, watcher, stat);
return this.removeMetaData(revData);
return removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@ -384,7 +384,7 @@ public class RecoverableZooKeeper {
while (true) {
try {
byte[] revData = checkZk().getData(path, watch, stat);
return this.removeMetaData(revData);
return removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@ -707,7 +707,7 @@ public class RecoverableZooKeeper {
return null;
}
public byte[] removeMetaData(byte[] data) {
public static byte[] removeMetaData(byte[] data) {
if(data == null || data.length == 0) {
return data;
}

View File

@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestZKAsyncRegistry {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static ZKAsyncRegistry REGISTRY;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
REGISTRY = new ZKAsyncRegistry(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDown() throws Exception {
IOUtils.closeQuietly(REGISTRY);
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void test() throws InterruptedException, ExecutionException, IOException {
assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getClusterId(),
REGISTRY.getClusterId());
assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getServersSize(),
REGISTRY.getCurrentNrHRS().get().intValue());
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getMasterAddress().get());
assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
HRegionLocation loc = locs.getRegionLocation(i);
assertNotNull(loc);
assertTrue(loc.getRegionInfo().getTable().equals(TableName.META_TABLE_NAME));
assertEquals(i, loc.getRegionInfo().getReplicaId());
});
}
}

80
pom.xml
View File

@ -1230,6 +1230,7 @@
<bouncycastle.version>1.46</bouncycastle.version>
<kerby.version>1.0.0-RC2</kerby.version>
<commons-crypto.version>1.0.0</commons-crypto.version>
<curator.version>2.11.0</curator.version>
<!-- Plugin Dependencies -->
<maven.assembly.version>2.4</maven.assembly.version>
<maven.antrun.version>1.8</maven.antrun.version>
@ -1786,39 +1787,60 @@
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
<dependency>
<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>${spy.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk16</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-client</artifactId>
<version>${kerby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-simplekdc</artifactId>
<version>${kerby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
<version>${commons-crypto.version}</version>
<exclusions>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk16</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-client</artifactId>
<version>${kerby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-simplekdc</artifactId>
<version>${kerby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
<version>${commons-crypto.version}</version>
<exclusions>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Dependencies needed by subprojects -->