HBASE-25216 The client zk syncer should deal with meta replica count change (#2614)
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
1cc39ffd59
commit
04293ad6dd
|
@ -312,8 +312,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Tracker for load balancer state
|
||||
LoadBalancerTracker loadBalancerTracker;
|
||||
// Tracker for meta location, if any client ZK quorum specified
|
||||
MetaLocationSyncer metaLocationSyncer;
|
||||
private MetaLocationSyncer metaLocationSyncer;
|
||||
// Tracker for active master location, if any client ZK quorum specified
|
||||
@VisibleForTesting
|
||||
MasterAddressSyncer masterAddressSyncer;
|
||||
// Tracker for auto snapshot cleanup state
|
||||
SnapshotCleanupTracker snapshotCleanupTracker;
|
||||
|
@ -3805,4 +3806,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
return compactionState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaLocationSyncer getMetaLocationSyncer() {
|
||||
return metaLocationSyncer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
|
|||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
@ -542,4 +543,11 @@ public interface MasterServices extends Server {
|
|||
*/
|
||||
boolean normalizeRegions(
|
||||
final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the meta location syncer.
|
||||
* <p/>
|
||||
* We need to get this in MTP to tell the syncer the new meta replica count.
|
||||
*/
|
||||
MetaLocationSyncer getMetaLocationSyncer();
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -155,6 +156,12 @@ public class ModifyTableProcedure
|
|||
break;
|
||||
case MODIFY_TABLE_ASSIGN_NEW_REPLICAS:
|
||||
assignNewReplicasIfNeeded(env);
|
||||
if (TableName.isMetaTableName(getTableName())) {
|
||||
MetaLocationSyncer syncer = env.getMasterServices().getMetaLocationSyncer();
|
||||
if (syncer != null) {
|
||||
syncer.setMetaReplicaCount(modifiedTableDescriptor.getRegionReplication());
|
||||
}
|
||||
}
|
||||
if (deleteColumnFamilyInModify) {
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT);
|
||||
} else {
|
||||
|
|
|
@ -19,12 +19,11 @@
|
|||
package org.apache.hadoop.hbase.master.zksyncer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -42,22 +40,68 @@ import org.slf4j.LoggerFactory;
|
|||
* Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
|
||||
* changed
|
||||
* <p/>
|
||||
* The target znode(s) is given through {@link #getNodesToWatch()} method
|
||||
* The target znode(s) is given through {@link #getPathsToWatch()} method
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class ClientZKSyncer extends ZKListener {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
|
||||
private final Server server;
|
||||
private final ZKWatcher clientZkWatcher;
|
||||
|
||||
/**
|
||||
* Used to store the newest data which we want to sync to client zk.
|
||||
* <p/>
|
||||
* For meta location, since we may reduce the replica number, so here we add a {@code delete} flag
|
||||
* to tell the updater delete the znode on client zk and quit.
|
||||
*/
|
||||
private static final class ZKData {
|
||||
|
||||
byte[] data;
|
||||
|
||||
boolean delete = false;
|
||||
|
||||
synchronized void set(byte[] data) {
|
||||
this.data = data;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
synchronized byte[] get() throws InterruptedException {
|
||||
while (!delete && data == null) {
|
||||
wait();
|
||||
}
|
||||
byte[] d = data;
|
||||
data = null;
|
||||
return d;
|
||||
}
|
||||
|
||||
synchronized void delete() {
|
||||
this.delete = true;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
synchronized boolean isDeleted() {
|
||||
return delete;
|
||||
}
|
||||
}
|
||||
|
||||
// We use queues and daemon threads to synchronize the data to client ZK cluster
|
||||
// to avoid blocking the single event thread for watchers
|
||||
private final Map<String, BlockingQueue<byte[]>> queues;
|
||||
private final ConcurrentMap<String, ZKData> queues;
|
||||
|
||||
public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
|
||||
super(watcher);
|
||||
this.server = server;
|
||||
this.clientZkWatcher = clientZkWatcher;
|
||||
this.queues = new HashMap<>();
|
||||
this.queues = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
private void startNewSyncThread(String path) {
|
||||
ZKData zkData = new ZKData();
|
||||
queues.put(path, zkData);
|
||||
Thread updater = new ClientZkUpdater(path, zkData);
|
||||
updater.setDaemon(true);
|
||||
updater.start();
|
||||
watchAndCheckExists(path);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -69,17 +113,12 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
this.watcher.registerListener(this);
|
||||
// create base znode on remote ZK
|
||||
ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode);
|
||||
// set meta znodes for client ZK
|
||||
Collection<String> nodes = getNodesToWatch();
|
||||
LOG.debug("Znodes to watch: " + nodes);
|
||||
// set znodes for client ZK
|
||||
Set<String> paths = getPathsToWatch();
|
||||
LOG.debug("ZNodes to watch: {}", paths);
|
||||
// initialize queues and threads
|
||||
for (String node : nodes) {
|
||||
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1);
|
||||
queues.put(node, queue);
|
||||
Thread updater = new ClientZkUpdater(node, queue);
|
||||
updater.setDaemon(true);
|
||||
updater.start();
|
||||
watchAndCheckExists(node);
|
||||
for (String path : paths) {
|
||||
startNewSyncThread(path);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,10 +151,9 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
* @param data the data to write to queue
|
||||
*/
|
||||
private void upsertQueue(String node, byte[] data) {
|
||||
BlockingQueue<byte[]> queue = queues.get(node);
|
||||
synchronized (queue) {
|
||||
queue.poll();
|
||||
queue.offer(data);
|
||||
ZKData zkData = queues.get(node);
|
||||
if (zkData != null) {
|
||||
zkData.set(data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,35 +164,49 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
* @param data the data to set to client ZK
|
||||
* @throws InterruptedException if the thread is interrupted during process
|
||||
*/
|
||||
private final void setDataForClientZkUntilSuccess(String node, byte[] data)
|
||||
throws InterruptedException {
|
||||
private void setDataForClientZkUntilSuccess(String node, byte[] data)
|
||||
throws InterruptedException {
|
||||
boolean create = false;
|
||||
while (!server.isStopped()) {
|
||||
try {
|
||||
LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
|
||||
ZKUtil.setData(clientZkWatcher, node, data);
|
||||
break;
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
// Node doesn't exist, create it and set value
|
||||
try {
|
||||
if (create) {
|
||||
ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
|
||||
break;
|
||||
} catch (KeeperException.ConnectionLossException
|
||||
| KeeperException.SessionExpiredException ee) {
|
||||
reconnectAfterExpiration();
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn(
|
||||
"Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later");
|
||||
} else {
|
||||
ZKUtil.setData(clientZkWatcher, node, data);
|
||||
}
|
||||
} catch (KeeperException.ConnectionLossException
|
||||
| KeeperException.SessionExpiredException ee) {
|
||||
reconnectAfterExpiration();
|
||||
break;
|
||||
} catch (KeeperException e) {
|
||||
LOG.debug("Failed to set data to client ZK, will retry later", e);
|
||||
LOG.debug("Failed to set data for {} to client ZK, will retry later", node, e);
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||
reconnectAfterExpiration();
|
||||
}
|
||||
if (e.code() == KeeperException.Code.NONODE) {
|
||||
create = true;
|
||||
}
|
||||
if (e.code() == KeeperException.Code.NODEEXISTS) {
|
||||
create = false;
|
||||
}
|
||||
}
|
||||
Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException {
|
||||
while (!server.isStopped()) {
|
||||
LOG.debug("Delete remote " + node + ", client zk wather: " + clientZkWatcher);
|
||||
try {
|
||||
ZKUtil.deleteNode(clientZkWatcher, node);
|
||||
} catch (KeeperException e) {
|
||||
LOG.debug("Failed to delete node from client ZK, will retry later", e);
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||
reconnectAfterExpiration();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final void reconnectAfterExpiration() throws InterruptedException {
|
||||
LOG.warn("ZK session expired or lost. Retry a new connection...");
|
||||
try {
|
||||
|
@ -164,11 +216,7 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
if (!validate(path)) {
|
||||
return;
|
||||
}
|
||||
private void getDataAndWatch(String path) {
|
||||
try {
|
||||
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
|
||||
upsertQueue(path, data);
|
||||
|
@ -177,11 +225,25 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
}
|
||||
}
|
||||
|
||||
private void removeQueue(String path) {
|
||||
ZKData zkData = queues.remove(path);
|
||||
if (zkData != null) {
|
||||
zkData.delete();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
if (validate(path)) {
|
||||
getDataAndWatch(path);
|
||||
} else {
|
||||
removeQueue(path);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDataChanged(String path) {
|
||||
if (validate(path)) {
|
||||
nodeCreated(path);
|
||||
}
|
||||
nodeCreated(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,11 +251,13 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
if (validate(path)) {
|
||||
try {
|
||||
if (ZKUtil.watchAndCheckExists(watcher, path)) {
|
||||
nodeCreated(path);
|
||||
getDataAndWatch(path);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
|
||||
}
|
||||
} else {
|
||||
removeQueue(path);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,41 +266,67 @@ public abstract class ClientZKSyncer extends ZKListener {
|
|||
* @param path the path to validate
|
||||
* @return true if the znode is watched by us
|
||||
*/
|
||||
abstract boolean validate(String path);
|
||||
protected abstract boolean validate(String path);
|
||||
|
||||
/**
|
||||
* @return the znode(s) to watch
|
||||
* @return the zk path(s) to watch
|
||||
*/
|
||||
abstract Collection<String> getNodesToWatch() throws KeeperException;
|
||||
protected abstract Set<String> getPathsToWatch();
|
||||
|
||||
protected final void refreshWatchingList() {
|
||||
Set<String> newPaths = getPathsToWatch();
|
||||
LOG.debug("New ZNodes to watch: {}", newPaths);
|
||||
Iterator<Map.Entry<String, ZKData>> iter = queues.entrySet().iterator();
|
||||
// stop unused syncers
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<String, ZKData> entry = iter.next();
|
||||
if (!newPaths.contains(entry.getKey())) {
|
||||
iter.remove();
|
||||
entry.getValue().delete();
|
||||
}
|
||||
}
|
||||
// start new syncers
|
||||
for (String newPath : newPaths) {
|
||||
if (!queues.containsKey(newPath)) {
|
||||
startNewSyncThread(newPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread to synchronize znode data to client ZK cluster
|
||||
*/
|
||||
class ClientZkUpdater extends Thread {
|
||||
final String znode;
|
||||
final BlockingQueue<byte[]> queue;
|
||||
private final class ClientZkUpdater extends Thread {
|
||||
private final String znode;
|
||||
private final ZKData zkData;
|
||||
|
||||
public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) {
|
||||
public ClientZkUpdater(String znode, ZKData zkData) {
|
||||
this.znode = znode;
|
||||
this.queue = queue;
|
||||
this.zkData = zkData;
|
||||
setName("ClientZKUpdater-" + znode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Client zk updater for znode {} started", znode);
|
||||
while (!server.isStopped()) {
|
||||
try {
|
||||
byte[] data = queue.take();
|
||||
setDataForClientZkUntilSuccess(znode, data);
|
||||
} catch (InterruptedException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Interrupted while checking whether need to update meta location to client zk");
|
||||
byte[] data = zkData.get();
|
||||
if (data != null) {
|
||||
setDataForClientZkUntilSuccess(znode, data);
|
||||
} else {
|
||||
if (zkData.isDeleted()) {
|
||||
deleteDataForClientZkUntilSuccess(znode);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while checking whether need to update meta location to client zk");
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG.debug("Client zk updater for znode {} stopped", znode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.zksyncer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -39,14 +38,12 @@ public class MasterAddressSyncer extends ClientZKSyncer {
|
|||
}
|
||||
|
||||
@Override
|
||||
boolean validate(String path) {
|
||||
protected boolean validate(String path) {
|
||||
return path.equals(masterAddressZNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
Collection<String> getNodesToWatch() {
|
||||
ArrayList<String> toReturn = new ArrayList<>();
|
||||
toReturn.add(masterAddressZNode);
|
||||
return toReturn;
|
||||
protected Set<String> getPathsToWatch() {
|
||||
return Collections.singleton(masterAddressZNode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.zksyncer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Tracks the meta region locations on server ZK cluster and synchronize them to client ZK cluster
|
||||
|
@ -32,19 +31,28 @@ import org.apache.zookeeper.KeeperException;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetaLocationSyncer extends ClientZKSyncer {
|
||||
|
||||
private volatile int metaReplicaCount = 1;
|
||||
|
||||
public MetaLocationSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
|
||||
super(watcher, clientZkWatcher, server);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean validate(String path) {
|
||||
protected boolean validate(String path) {
|
||||
return watcher.getZNodePaths().isMetaZNodePath(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
Collection<String> getNodesToWatch() throws KeeperException {
|
||||
return watcher.getMetaReplicaNodes().stream()
|
||||
.map(znode -> ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode))
|
||||
.collect(Collectors.toList());
|
||||
protected Set<String> getPathsToWatch() {
|
||||
return IntStream.range(0, metaReplicaCount)
|
||||
.mapToObj(watcher.getZNodePaths()::getZNodeForReplica).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public void setMetaReplicaCount(int replicaCount) {
|
||||
if (replicaCount != metaReplicaCount) {
|
||||
metaReplicaCount = replicaCount;
|
||||
refreshWatchingList();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -26,25 +30,25 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
@Category({ ClientTests.class, MediumTests.class })
|
||||
public class TestSeparateClientZKCluster {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
@ -60,11 +64,11 @@ public class TestSeparateClientZKCluster {
|
|||
private final byte[] newVal = Bytes.toBytes("v2");
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
public TableNameTestRule name = new TableNameTestRule();
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
|
||||
HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
|
@ -77,13 +81,15 @@ public class TestSeparateClientZKCluster {
|
|||
TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
|
||||
TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
|
||||
// core settings for testing client ZK cluster
|
||||
TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
|
||||
ZKConnectionRegistry.class, ConnectionRegistry.class);
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
|
||||
// reduce zk session timeout to easier trigger session expiration
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
|
||||
// Start a cluster with 2 masters and 3 regionservers.
|
||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
||||
.numMasters(2).numRegionServers(3).numDataNodes(3).build();
|
||||
StartMiniClusterOption option =
|
||||
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
|
||||
TEST_UTIL.startMiniCluster(option);
|
||||
}
|
||||
|
||||
|
@ -96,16 +102,14 @@ public class TestSeparateClientZKCluster {
|
|||
|
||||
@Test
|
||||
public void testBasicOperation() throws Exception {
|
||||
TableName tn = TableName.valueOf(name.getMethodName());
|
||||
TableName tn = name.getTableName();
|
||||
// create table
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
HTable table = (HTable) conn.getTable(tn);
|
||||
try {
|
||||
try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
|
||||
ColumnFamilyDescriptorBuilder cfDescBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
TableDescriptorBuilder tableDescBuilder =
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
admin.createTable(tableDescBuilder.build());
|
||||
// test simple get and put
|
||||
Put put = new Put(row);
|
||||
|
@ -114,10 +118,7 @@ public class TestSeparateClientZKCluster {
|
|||
Get get = new Get(row);
|
||||
Result result = table.get(get);
|
||||
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
|
||||
} finally {
|
||||
admin.close();
|
||||
table.close();
|
||||
assertArrayEquals(value, result.getValue(family, qualifier));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,9 +126,8 @@ public class TestSeparateClientZKCluster {
|
|||
public void testMasterSwitch() throws Exception {
|
||||
// get an admin instance and issue some request first
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
LOG.debug("Tables: " + admin.listTableDescriptors());
|
||||
try {
|
||||
try (Admin admin = conn.getAdmin()) {
|
||||
LOG.debug("Tables: " + admin.listTableDescriptors());
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
// switch active master
|
||||
HMaster master = cluster.getMaster();
|
||||
|
@ -138,31 +138,29 @@ public class TestSeparateClientZKCluster {
|
|||
}
|
||||
LOG.info("Shutdown master {}", master.getServerName());
|
||||
while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
|
||||
LOG.info("Get master {}", cluster.getMaster() == null? "null":
|
||||
cluster.getMaster().getServerName());
|
||||
LOG.info("Get master {}",
|
||||
cluster.getMaster() == null ? "null" : cluster.getMaster().getServerName());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
LOG.info("Got master {}", cluster.getMaster().getServerName());
|
||||
// confirm client access still works
|
||||
Assert.assertTrue(admin.balance(false));
|
||||
} finally {
|
||||
admin.close();
|
||||
assertTrue(admin.balance(false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaRegionMove() throws Exception {
|
||||
TableName tn = TableName.valueOf(name.getMethodName());
|
||||
TableName tn = name.getTableName();
|
||||
// create table
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
HTable table = (HTable) conn.getTable(tn);
|
||||
try {
|
||||
try (Admin admin = conn.getAdmin();
|
||||
Table table = conn.getTable(tn);
|
||||
RegionLocator locator = conn.getRegionLocator(tn)) {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
ColumnFamilyDescriptorBuilder cfDescBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
TableDescriptorBuilder tableDescBuilder =
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
admin.createTable(tableDescBuilder.build());
|
||||
// issue some requests to cache the region location
|
||||
Put put = new Put(row);
|
||||
|
@ -182,8 +180,7 @@ public class TestSeparateClientZKCluster {
|
|||
admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
|
||||
LOG.debug("Finished moving meta");
|
||||
// invalidate client cache
|
||||
RegionInfo region =
|
||||
table.getRegionLocator().getRegionLocation(row).getRegion();
|
||||
RegionInfo region = locator.getRegionLocation(row).getRegion();
|
||||
ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
|
||||
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
|
||||
ServerName name = rst.getRegionServer().getServerName();
|
||||
|
@ -199,25 +196,20 @@ public class TestSeparateClientZKCluster {
|
|||
table.put(put);
|
||||
result = table.get(get);
|
||||
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||
Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
|
||||
} finally {
|
||||
admin.close();
|
||||
table.close();
|
||||
assertArrayEquals(newVal, result.getValue(family, qualifier));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
|
||||
TableName tn = TableName.valueOf(name.getMethodName());
|
||||
TableName tn = name.getTableName();
|
||||
// create table
|
||||
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
HTable table = (HTable) conn.getTable(tn);
|
||||
try {
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
|
||||
ColumnFamilyDescriptorBuilder cfDescBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
TableDescriptorBuilder tableDescBuilder =
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
admin.createTable(tableDescBuilder.build());
|
||||
// put some data
|
||||
Put put = new Put(row);
|
||||
|
@ -246,21 +238,18 @@ public class TestSeparateClientZKCluster {
|
|||
Get get = new Get(row);
|
||||
Result result = table.get(get);
|
||||
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
|
||||
} finally {
|
||||
admin.close();
|
||||
table.close();
|
||||
assertArrayEquals(value, result.getValue(family, qualifier));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncTable() throws Exception {
|
||||
TableName tn = TableName.valueOf(name.getMethodName());
|
||||
TableName tn = name.getTableName();
|
||||
ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||
TableDescriptorBuilder tableDescBuilder =
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||
try (AsyncConnection ASYNC_CONN =
|
||||
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
||||
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
||||
ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
|
||||
AsyncTable<?> table = ASYNC_CONN.getTable(tn);
|
||||
// put some data
|
||||
|
@ -271,7 +260,22 @@ public class TestSeparateClientZKCluster {
|
|||
Get get = new Get(row);
|
||||
Result result = table.get(get).get();
|
||||
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
|
||||
assertArrayEquals(value, result.getValue(family, qualifier));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangeMetaReplicaCount() throws Exception {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
try (RegionLocator locator =
|
||||
TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
|
||||
assertEquals(1, locator.getAllRegionLocations().size());
|
||||
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 3);
|
||||
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3);
|
||||
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 2);
|
||||
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2);
|
||||
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 1);
|
||||
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
|
|||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
@ -492,4 +493,9 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaLocationSyncer getMetaLocationSyncer() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -292,7 +292,7 @@ public class TestMasterNoCluster {
|
|||
while (!master.isInitialized()) {
|
||||
Threads.sleep(200);
|
||||
}
|
||||
Assert.assertNull(master.metaLocationSyncer);
|
||||
Assert.assertNull(master.getMetaLocationSyncer());
|
||||
Assert.assertNull(master.masterAddressSyncer);
|
||||
master.stopMaster();
|
||||
master.join();
|
||||
|
|
Loading…
Reference in New Issue