HBASE-21658 Should get the meta replica number from zk instead of config at client side
This commit is contained in:
parent
3641e7a97d
commit
df27820958
|
@ -26,7 +26,9 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||||
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
|
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.mutable.MutableInt;
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ClusterId;
|
import org.apache.hadoop.hbase.ClusterId;
|
||||||
|
@ -134,12 +136,13 @@ class ZKAsyncRegistry implements AsyncRegistry {
|
||||||
ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
|
ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
|
||||||
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
|
List<String> metaReplicaZNodes) {
|
||||||
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
|
HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
|
||||||
HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()];
|
|
||||||
MutableInt remaining = new MutableInt(locs.length);
|
MutableInt remaining = new MutableInt(locs.length);
|
||||||
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
|
for (String metaReplicaZNode : metaReplicaZNodes) {
|
||||||
|
int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
|
||||||
|
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
|
||||||
if (replicaId == DEFAULT_REPLICA_ID) {
|
if (replicaId == DEFAULT_REPLICA_ID) {
|
||||||
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
|
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
|
@ -186,6 +189,22 @@ class ZKAsyncRegistry implements AsyncRegistry {
|
||||||
tryComplete(remaining, locs, future);
|
tryComplete(remaining, locs, future);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
|
||||||
|
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
|
||||||
|
addListener(
|
||||||
|
zk.list(znodePaths.baseZNode)
|
||||||
|
.thenApply(children -> children.stream()
|
||||||
|
.filter(c -> c.startsWith(znodePaths.metaZNodePrefix)).collect(Collectors.toList())),
|
||||||
|
(metaReplicaZNodes, error) -> {
|
||||||
|
if (error != null) {
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
getMetaRegionLocation(future, metaReplicaZNodes);
|
||||||
});
|
});
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.DelayQueue;
|
import java.util.concurrent.DelayQueue;
|
||||||
import java.util.concurrent.Delayed;
|
import java.util.concurrent.Delayed;
|
||||||
|
@ -284,6 +285,22 @@ public final class ReadOnlyZKClient implements Closeable {
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<List<String>> list(String path) {
|
||||||
|
if (closed.get()) {
|
||||||
|
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
|
||||||
|
}
|
||||||
|
CompletableFuture<List<String>> future = new CompletableFuture<>();
|
||||||
|
tasks.add(new ZKTask<List<String>>(path, future, "list") {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doExec(ZooKeeper zk) {
|
||||||
|
zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
private void closeZk() {
|
private void closeZk() {
|
||||||
if (zookeeper != null) {
|
if (zookeeper != null) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -166,7 +166,7 @@ public class ZNodePaths {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the meta replicaId from the passed znode
|
* Parse the meta replicaId from the passed znode
|
||||||
* @param znode
|
* @param znode the name of the znode, does not include baseZNode
|
||||||
* @return replicaId
|
* @return replicaId
|
||||||
*/
|
*/
|
||||||
public int getMetaReplicaIdFromZnode(String znode) {
|
public int getMetaReplicaIdFromZnode(String znode) {
|
||||||
|
@ -178,7 +178,7 @@ public class ZNodePaths {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is it the default meta replica's znode
|
* Is it the default meta replica's znode
|
||||||
* @param znode
|
* @param znode the name of the znode, does not include baseZNode
|
||||||
* @return true or false
|
* @return true or false
|
||||||
*/
|
*/
|
||||||
public boolean isDefaultMetaReplicaZnode(String znode) {
|
public boolean isDefaultMetaReplicaZnode(String znode) {
|
||||||
|
|
|
@ -61,7 +61,11 @@ public class TestZKAsyncRegistry {
|
||||||
public static void setUp() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
|
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(3);
|
||||||
REGISTRY = new ZKAsyncRegistry(TEST_UTIL.getConfiguration());
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
// make sure that we do not depend on this config when getting locations for meta replicas, see
|
||||||
|
// HBASE-21658.
|
||||||
|
conf.setInt(META_REPLICAS_NUM, 1);
|
||||||
|
REGISTRY = new ZKAsyncRegistry(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
|
|
@ -38,6 +38,8 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.Exchanger;
|
import java.util.concurrent.Exchanger;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -126,9 +128,15 @@ public class TestReadOnlyZKClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAndExists() throws Exception {
|
public void testRead() throws Exception {
|
||||||
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
|
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
|
||||||
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
|
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
|
||||||
|
List<String> children = RO_ZK.list(PATH).get();
|
||||||
|
assertEquals(CHILDREN, children.size());
|
||||||
|
Collections.sort(children);
|
||||||
|
for (int i = 0; i < CHILDREN; i++) {
|
||||||
|
assertEquals("c" + i, children.get(i));
|
||||||
|
}
|
||||||
assertNotNull(RO_ZK.zookeeper);
|
assertNotNull(RO_ZK.zookeeper);
|
||||||
waitForIdleConnectionClosed();
|
waitForIdleConnectionClosed();
|
||||||
}
|
}
|
||||||
|
@ -145,6 +153,15 @@ public class TestReadOnlyZKClient {
|
||||||
assertEquals(Code.NONODE, ke.code());
|
assertEquals(Code.NONODE, ke.code());
|
||||||
assertEquals(pathNotExists, ke.getPath());
|
assertEquals(pathNotExists, ke.getPath());
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
RO_ZK.list(pathNotExists).get();
|
||||||
|
fail("should fail because of " + pathNotExists + " does not exist");
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
assertThat(e.getCause(), instanceOf(KeeperException.class));
|
||||||
|
KeeperException ke = (KeeperException) e.getCause();
|
||||||
|
assertEquals(Code.NONODE, ke.code());
|
||||||
|
assertEquals(pathNotExists, ke.getPath());
|
||||||
|
}
|
||||||
// exists will not throw exception.
|
// exists will not throw exception.
|
||||||
assertNull(RO_ZK.exists(pathNotExists).get());
|
assertNull(RO_ZK.exists(pathNotExists).get());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue