HBASE-23304: RPCs needed for client meta information lookup (apache#904) (#1098)
* HBASE-23257: Track clusterID in stand by masters (#798) This patch implements a simple cache that all the masters can lookup to serve cluster ID to clients. Active HMaster is still responsible for creating it but all the masters will read it from fs to serve clients. RPCs exposing it will come in a separate patch as a part of HBASE-18095. Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Guangxu Cheng <guangxucheng@gmail.com> (cherry picked from commitc2e01f2398
) * HBASE-23275: Track active master's address in ActiveMasterManager (#812) Currently we just track whether an active master exists. It helps to also track the address of the active master in all the masters to help serve the client RPC requests to know which master is active. Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> (cherry picked from commitefebb843af
) * HBASE-23281: Track meta region locations in masters (#830) * HBASE-23281: Track meta region changes on masters This patch adds a simple cache that tracks the meta region replica locations. It keeps an eye on the region movements so that the cached locations are not stale. This information is used for servicing client RPCs for connections that use master based registry (HBASE-18095). The RPC end points will be added in a separate patch. Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> (cherry picked from commit8571d389cf
) * HBASE-23304: RPCs needed for client meta information lookup (#904) * HBASE-23304: RPCs needed for client meta information lookup This patch implements the RPCs needed for the meta information lookup during connection init. New tests added to cover the RPC code paths. HBASE-23305 builds on this to implement the client side logic. Fixed a bunch of checkstyle nits around the places the patch touches. Signed-off-by: Andrew Purtell <apurtell@apache.org> (cherry picked from commit4f8fbba0c0
)
This commit is contained in:
parent
36cdcad43f
commit
71f035450d
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionLoadStats;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -93,6 +94,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
|||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufMessageConverter;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaScope;
|
||||
|
@ -375,7 +377,9 @@ public final class ProtobufUtil {
|
|||
* @see #toServerName(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName)
|
||||
*/
|
||||
public static HBaseProtos.ServerName toServerName(final ServerName serverName) {
|
||||
if (serverName == null) return null;
|
||||
if (serverName == null) {
|
||||
return null;
|
||||
}
|
||||
HBaseProtos.ServerName.Builder builder =
|
||||
HBaseProtos.ServerName.newBuilder();
|
||||
builder.setHostName(serverName.getHostname());
|
||||
|
@ -3071,6 +3075,44 @@ public final class ProtobufUtil {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Meta region state from the passed data bytes. Can handle both old and new style
|
||||
* server names.
|
||||
* @param data protobuf serialized data with meta server name.
|
||||
* @param replicaId replica ID for this region
|
||||
* @return RegionState instance corresponding to the serialized data.
|
||||
* @throws DeserializationException if the data is invalid.
|
||||
*/
|
||||
public static RegionState parseMetaRegionStateFrom(final byte[] data, int replicaId)
|
||||
throws DeserializationException {
|
||||
RegionState.State state = RegionState.State.OPEN;
|
||||
ServerName serverName;
|
||||
if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) {
|
||||
try {
|
||||
int prefixLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ZooKeeperProtos.MetaRegionServer rl =
|
||||
ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
|
||||
data.length - prefixLen);
|
||||
if (rl.hasState()) {
|
||||
state = RegionState.State.convert(rl.getState());
|
||||
}
|
||||
HBaseProtos.ServerName sn = rl.getServer();
|
||||
serverName = ServerName.valueOf(
|
||||
sn.getHostName(), sn.getPort(), sn.getStartCode());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException("Unable to parse meta region location");
|
||||
}
|
||||
} else {
|
||||
// old style of meta region location?
|
||||
serverName = parseServerNameFrom(data);
|
||||
}
|
||||
if (serverName == null) {
|
||||
state = RegionState.State.OFFLINE;
|
||||
}
|
||||
return new RegionState(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), state, serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a ServerName from the passed in data bytes.
|
||||
* @param data Data with a serialize server name in it; can handle the old style
|
||||
|
|
|
@ -41,7 +41,8 @@ public class ZNodePaths {
|
|||
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
|
||||
public static final char ZNODE_PATH_SEPARATOR = '/';
|
||||
|
||||
private static final String META_ZNODE_PREFIX = "meta-region-server";
|
||||
public static final String META_ZNODE_PREFIX_CONF_KEY = "zookeeper.znode.metaserver";
|
||||
public static final String META_ZNODE_PREFIX = "meta-region-server";
|
||||
private static final String DEFAULT_SNAPSHOT_CLEANUP_ZNODE = "snapshot-cleanup";
|
||||
|
||||
// base znode for this cluster
|
||||
|
@ -104,7 +105,7 @@ public class ZNodePaths {
|
|||
public ZNodePaths(Configuration conf) {
|
||||
baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
ImmutableMap.Builder<Integer, String> builder = ImmutableMap.builder();
|
||||
metaZNodePrefix = conf.get("zookeeper.znode.metaserver", META_ZNODE_PREFIX);
|
||||
metaZNodePrefix = conf.get(META_ZNODE_PREFIX_CONF_KEY, META_ZNODE_PREFIX);
|
||||
String defaultMetaReplicaZNode = ZNodePaths.joinZNode(baseZNode, metaZNodePrefix);
|
||||
builder.put(DEFAULT_REPLICA_ID, defaultMetaReplicaZNode);
|
||||
int numMetaReplicas = conf.getInt(META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM);
|
||||
|
@ -189,7 +190,19 @@ public class ZNodePaths {
|
|||
}
|
||||
|
||||
/**
|
||||
* Parse the meta replicaId from the passed znode name.
|
||||
* Parses the meta replicaId from the passed path.
|
||||
* @param path the name of the full path which includes baseZNode.
|
||||
* @return replicaId
|
||||
*/
|
||||
public int getMetaReplicaIdFromPath(String path) {
|
||||
// Extract the znode from path. The prefix is of the following format.
|
||||
// baseZNode + PATH_SEPARATOR.
|
||||
int prefixLen = baseZNode.length() + 1;
|
||||
return getMetaReplicaIdFromZnode(path.substring(prefixLen));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the meta replicaId from the passed znode
|
||||
* @param znode the name of the znode, does not include baseZNode
|
||||
* @return replicaId
|
||||
*/
|
||||
|
|
|
@ -1196,3 +1196,47 @@ service HbckService {
|
|||
rpc FixMeta(FixMetaRequest)
|
||||
returns(FixMetaResponse);
|
||||
}
|
||||
|
||||
/** Request and response to get the clusterID for this cluster */
|
||||
message GetClusterIdRequest {
|
||||
}
|
||||
message GetClusterIdResponse {
|
||||
/** Not set if cluster ID could not be determined. */
|
||||
optional string cluster_id = 1;
|
||||
}
|
||||
|
||||
/** Request and response to get the currently active master name for this cluster */
|
||||
message GetActiveMasterRequest {
|
||||
}
|
||||
message GetActiveMasterResponse {
|
||||
/** Not set if an active master could not be determined. */
|
||||
optional ServerName server_name = 1;
|
||||
}
|
||||
|
||||
/** Request and response to get the current list of meta region locations */
|
||||
message GetMetaRegionLocationsRequest {
|
||||
}
|
||||
message GetMetaRegionLocationsResponse {
|
||||
/** Not set if meta region locations could not be determined. */
|
||||
repeated RegionLocation meta_locations = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
|
||||
*/
|
||||
service ClientMetaService {
|
||||
/**
|
||||
* Get Cluster ID for this cluster.
|
||||
*/
|
||||
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
|
||||
|
||||
/**
|
||||
* Get active master server name for this cluster.
|
||||
*/
|
||||
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
|
||||
|
||||
/**
|
||||
* Get current meta replicas' region locations.
|
||||
*/
|
||||
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -17,25 +17,24 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZNodeClearer;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
* Handles everything on master-side related to master election.
|
||||
|
@ -57,12 +56,18 @@ public class ActiveMasterManager extends ZKListener {
|
|||
final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
|
||||
final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
|
||||
|
||||
// This server's information.
|
||||
private final ServerName sn;
|
||||
private int infoPort;
|
||||
private final Server master;
|
||||
|
||||
// Active master's server name. Invalidated anytime active master changes (based on ZK
|
||||
// notifications) and lazily fetched on-demand.
|
||||
// ServerName is immutable, so we don't need heavy synchronization around it.
|
||||
private volatile ServerName activeMasterServerName;
|
||||
|
||||
/**
|
||||
* @param watcher
|
||||
* @param watcher ZK watcher
|
||||
* @param sn ServerName
|
||||
* @param master In an instance of a Master.
|
||||
*/
|
||||
|
@ -106,6 +111,30 @@ public class ActiveMasterManager extends ZKListener {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the active master's ServerName from zookeeper.
|
||||
*/
|
||||
private void fetchAndSetActiveMasterServerName() {
|
||||
LOG.debug("Attempting to fetch active master sn from zk");
|
||||
try {
|
||||
activeMasterServerName = MasterAddressTracker.getMasterAddress(watcher);
|
||||
} catch (IOException | KeeperException e) {
|
||||
// Log and ignore for now and re-fetch later if needed.
|
||||
LOG.error("Error fetching active master information", e);
|
||||
}
|
||||
}
|
||||
|
||||
public Optional<ServerName> getActiveMasterServerName() {
|
||||
if (!clusterHasActiveMaster.get()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
if (activeMasterServerName == null) {
|
||||
fetchAndSetActiveMasterServerName();
|
||||
}
|
||||
// It could still be null, but return whatever we have.
|
||||
return Optional.ofNullable(activeMasterServerName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a change in the master node. Doesn't matter whether this was called
|
||||
* from a nodeCreated or nodeDeleted event because there are no guarantees
|
||||
|
@ -134,6 +163,9 @@ public class ActiveMasterManager extends ZKListener {
|
|||
// Notify any thread waiting to become the active master
|
||||
clusterHasActiveMaster.notifyAll();
|
||||
}
|
||||
// Reset the active master sn. Will be re-fetched later if needed.
|
||||
// We don't want to make a synchronous RPC under a monitor.
|
||||
activeMasterServerName = null;
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
master.abort("Received an unexpected KeeperException, aborting", ke);
|
||||
|
@ -151,8 +183,8 @@ public class ActiveMasterManager extends ZKListener {
|
|||
* @param checkInterval the interval to check if the master is stopped
|
||||
* @param startupStatus the monitor status to track the progress
|
||||
* @return True if no issue becoming active master else false if another
|
||||
* master was running or if some other problem (zookeeper, stop flag has been
|
||||
* set on this Master)
|
||||
* master was running or if some other problem (zookeeper, stop flag has been
|
||||
* set on this Master)
|
||||
*/
|
||||
boolean blockUntilBecomingActiveMaster(
|
||||
int checkInterval, MonitoredTask startupStatus) {
|
||||
|
@ -178,10 +210,14 @@ public class ActiveMasterManager extends ZKListener {
|
|||
// We are the master, return
|
||||
startupStatus.setStatus("Successfully registered as active master.");
|
||||
this.clusterHasActiveMaster.set(true);
|
||||
activeMasterServerName = sn;
|
||||
LOG.info("Registered as active master=" + this.sn);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Invalidate the active master name so that subsequent requests do not get any stale
|
||||
// master information. Will be re-fetched if needed.
|
||||
activeMasterServerName = null;
|
||||
// There is another active master running elsewhere or this is a restart
|
||||
// and the master ephemeral node has not expired yet.
|
||||
this.clusterHasActiveMaster.set(true);
|
||||
|
@ -208,7 +244,8 @@ public class ActiveMasterManager extends ZKListener {
|
|||
ZKUtil.deleteNode(this.watcher, this.watcher.getZNodePaths().masterAddressZNode);
|
||||
|
||||
// We may have failed to delete the znode at the previous step, but
|
||||
// we delete the file anyway: a second attempt to delete the znode is likely to fail again.
|
||||
// we delete the file anyway: a second attempt to delete the znode is likely to fail
|
||||
// again.
|
||||
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
|
||||
} else {
|
||||
msg = "Another master is the active master, " + currentMaster +
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ClusterId;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Caches the cluster ID of the cluster. For standby masters, this is used to serve the client
|
||||
* RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not
|
||||
* already exist. Standby masters just read the information from the file system. This class is
|
||||
* thread-safe.
|
||||
*
|
||||
* TODO: Make it a singleton without affecting concurrent junit tests.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CachedClusterId {
|
||||
|
||||
public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class);
|
||||
private static final int MAX_FETCH_TIMEOUT_MS = 10000;
|
||||
|
||||
private Path rootDir;
|
||||
private FileSystem fs;
|
||||
|
||||
// When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to
|
||||
// avoid multiple fetches from FS and let only one thread fetch the information.
|
||||
AtomicBoolean fetchInProgress = new AtomicBoolean(false);
|
||||
|
||||
// When true, it means that the cluster ID has been fetched successfully from fs.
|
||||
private AtomicBoolean isClusterIdSet = new AtomicBoolean(false);
|
||||
// Immutable once set and read multiple times.
|
||||
private ClusterId clusterId;
|
||||
|
||||
// cache stats for testing.
|
||||
private AtomicInteger cacheMisses = new AtomicInteger(0);
|
||||
|
||||
public CachedClusterId(Configuration conf) throws IOException {
|
||||
rootDir = FSUtils.getRootDir(conf);
|
||||
fs = rootDir.getFileSystem(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Succeeds only once, when setting to a non-null value. Overwrites are not allowed.
|
||||
*/
|
||||
private void setClusterId(ClusterId id) {
|
||||
if (id == null || isClusterIdSet.get()) {
|
||||
return;
|
||||
}
|
||||
clusterId = id;
|
||||
isClusterIdSet.set(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a cached copy of the cluster ID. null if the cache is not populated.
|
||||
*/
|
||||
private String getClusterId() {
|
||||
if (!isClusterIdSet.get()) {
|
||||
return null;
|
||||
}
|
||||
// It is ok to read without a lock since clusterId is immutable once set.
|
||||
return clusterId.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to fetch the cluster ID from the file system. If no attempt is already in progress,
|
||||
* synchronously fetches the cluster ID and sets it. If an attempt is already in progress,
|
||||
* returns right away and the caller is expected to wait for the fetch to finish.
|
||||
* @return true if the attempt is done, false if another thread is already fetching it.
|
||||
*/
|
||||
private boolean attemptFetch() {
|
||||
if (fetchInProgress.compareAndSet(false, true)) {
|
||||
// A fetch is not in progress, so try fetching the cluster ID synchronously and then notify
|
||||
// the waiting threads.
|
||||
try {
|
||||
cacheMisses.incrementAndGet();
|
||||
setClusterId(FSUtils.getClusterId(fs, rootDir));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error fetching cluster ID", e);
|
||||
} finally {
|
||||
Preconditions.checkState(fetchInProgress.compareAndSet(true, false));
|
||||
synchronized (fetchInProgress) {
|
||||
fetchInProgress.notifyAll();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void waitForFetchToFinish() throws InterruptedException {
|
||||
synchronized (fetchInProgress) {
|
||||
while (fetchInProgress.get()) {
|
||||
// We don't want the fetches to block forever, for example if there are bugs
|
||||
// of missing notifications.
|
||||
fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached
|
||||
* copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are
|
||||
* trying get from a clean cache.
|
||||
*
|
||||
* @return ClusterId by reading from FileSystem or null in any error case or cluster ID does
|
||||
* not exist on the file system.
|
||||
*/
|
||||
public String getFromCacheOrFetch() {
|
||||
String id = getClusterId();
|
||||
if (id != null) {
|
||||
return id;
|
||||
}
|
||||
if (!attemptFetch()) {
|
||||
// A fetch is in progress.
|
||||
try {
|
||||
waitForFetchToFinish();
|
||||
} catch (InterruptedException e) {
|
||||
// pass and return whatever is in the cache.
|
||||
}
|
||||
}
|
||||
return getClusterId();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getCacheStats() {
|
||||
return cacheMisses.get();
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -350,6 +350,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// manager of assignment nodes in zookeeper
|
||||
private AssignmentManager assignmentManager;
|
||||
|
||||
/**
|
||||
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale
|
||||
* cache entries.
|
||||
*/
|
||||
private final MetaRegionLocationCache metaRegionLocationCache;
|
||||
|
||||
// manager of replication
|
||||
private ReplicationPeerManager replicationPeerManager;
|
||||
|
||||
|
@ -444,6 +450,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private final boolean maintenanceMode;
|
||||
static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode";
|
||||
|
||||
// Cached clusterId on stand by masters to serve clusterID requests from clients.
|
||||
private final CachedClusterId cachedClusterId;
|
||||
|
||||
public static class RedirectServlet extends HttpServlet {
|
||||
private static final long serialVersionUID = 2894774810058302473L;
|
||||
private final int regionServerInfoPort;
|
||||
|
@ -498,8 +507,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* #finishActiveMasterInitialization(MonitoredTask) after
|
||||
* the master becomes the active one.
|
||||
*/
|
||||
public HMaster(final Configuration conf)
|
||||
throws IOException, KeeperException {
|
||||
public HMaster(final Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
TraceUtil.initTracer(conf);
|
||||
try {
|
||||
|
@ -512,7 +520,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
} else {
|
||||
maintenanceMode = false;
|
||||
}
|
||||
|
||||
this.rsFatals = new MemoryBoundedLogMessageBuffer(
|
||||
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
|
||||
LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(),
|
||||
|
@ -560,10 +567,13 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
// Some unit tests don't need a cluster, so no zookeeper at all
|
||||
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
|
||||
this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
|
||||
this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
|
||||
} else {
|
||||
this.metaRegionLocationCache = null;
|
||||
this.activeMasterManager = null;
|
||||
}
|
||||
cachedClusterId = new CachedClusterId(conf);
|
||||
} catch (Throwable t) {
|
||||
// Make sure we log the exception. HMaster is often started via reflection and the
|
||||
// cause of failed startup is lost.
|
||||
|
@ -3765,7 +3775,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return this.hbckChore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ServerName> getActiveMaster() {
|
||||
return activeMasterManager.getActiveMasterServerName();
|
||||
}
|
||||
|
||||
public void runReplicationBarrierCleaner() {
|
||||
ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
|
||||
if (rbc != null) {
|
||||
|
@ -3776,4 +3789,15 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
|
||||
return this.snapshotQuotaChore;
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
if (activeMaster) {
|
||||
return super.getClusterId();
|
||||
}
|
||||
return cachedClusterId.getFromCacheOrFetch();
|
||||
}
|
||||
|
||||
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
||||
return this.metaRegionLocationCache;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.BindException;
|
||||
|
@ -30,6 +29,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
|
@ -116,11 +117,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
|
||||
|
@ -161,6 +160,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
|
||||
|
@ -185,12 +185,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
|
@ -349,9 +355,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.Snapshot
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("deprecation")
|
||||
public class MasterRpcServices extends RSRpcServices
|
||||
implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
|
||||
LockService.BlockingInterface, HbckService.BlockingInterface {
|
||||
public class MasterRpcServices extends RSRpcServices implements
|
||||
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
|
||||
LockService.BlockingInterface, HbckService.BlockingInterface,
|
||||
ClientMetaService.BlockingInterface {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
|
||||
private static final Logger AUDITLOG =
|
||||
LoggerFactory.getLogger("SecurityLogger."+MasterRpcServices.class.getName());
|
||||
|
@ -360,7 +367,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
/**
|
||||
* @return Subset of configuration to pass initializing regionservers: e.g.
|
||||
* the filesystem to use and root directory to use.
|
||||
* the filesystem to use and root directory to use.
|
||||
*/
|
||||
private RegionServerStartupResponse.Builder createConfigurationSubset() {
|
||||
RegionServerStartupResponse.Builder resp = addConfig(
|
||||
|
@ -486,15 +493,17 @@ public class MasterRpcServices extends RSRpcServices
|
|||
protected List<BlockingServiceAndInterface> getServices() {
|
||||
List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
|
||||
bssi.add(new BlockingServiceAndInterface(
|
||||
MasterService.newReflectiveBlockingService(this),
|
||||
MasterService.BlockingInterface.class));
|
||||
MasterService.newReflectiveBlockingService(this),
|
||||
MasterService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(
|
||||
RegionServerStatusService.newReflectiveBlockingService(this),
|
||||
RegionServerStatusService.BlockingInterface.class));
|
||||
RegionServerStatusService.newReflectiveBlockingService(this),
|
||||
RegionServerStatusService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this),
|
||||
LockService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this),
|
||||
HbckService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
|
||||
ClientMetaService.BlockingInterface.class));
|
||||
bssi.addAll(super.getServices());
|
||||
return bssi;
|
||||
}
|
||||
|
@ -621,7 +630,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
final byte[] regionName = req.getRegion().getValue().toByteArray();
|
||||
final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
|
||||
if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
if (regionInfo == null) {
|
||||
throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
}
|
||||
|
||||
final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
|
||||
if (master.cpHost != null) {
|
||||
|
@ -666,7 +677,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
@Override
|
||||
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
|
||||
throws ServiceException {
|
||||
throws ServiceException {
|
||||
TableDescriptor tableDescriptor = ProtobufUtil.toTableDescriptor(req.getTableSchema());
|
||||
byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
|
||||
try {
|
||||
|
@ -1063,7 +1074,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
* Get list of TableDescriptors for requested tables.
|
||||
* @param c Unused (set to null).
|
||||
* @param req GetTableDescriptorsRequest that contains:
|
||||
* - tableNames: requested tables, or if empty, all are requested
|
||||
* - tableNames: requested tables, or if empty, all are requested.
|
||||
* @return GetTableDescriptorsResponse
|
||||
* @throws ServiceException
|
||||
*/
|
||||
|
@ -1207,9 +1218,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
/**
|
||||
* Checks if the specified snapshot is done.
|
||||
* @return true if the snapshot is in file system ready to use,
|
||||
* false if the snapshot is in the process of completing
|
||||
* false if the snapshot is in the process of completing
|
||||
* @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or
|
||||
* a wrapped HBaseSnapshotException with progress failure reason.
|
||||
* a wrapped HBaseSnapshotException with progress failure reason.
|
||||
*/
|
||||
@Override
|
||||
public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
|
||||
|
@ -1451,7 +1462,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
final byte[] regionName = request.getRegion().getValue().toByteArray();
|
||||
final RegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName);
|
||||
if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
if (hri == null) {
|
||||
throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
}
|
||||
|
||||
if (master.cpHost != null) {
|
||||
master.cpHost.preRegionOffline(hri);
|
||||
|
@ -2298,8 +2311,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
report.getRegionSize(), now);
|
||||
}
|
||||
} else {
|
||||
LOG.debug(
|
||||
"Received region space usage report but HMaster is not ready to process it, skipping");
|
||||
LOG.debug("Received region space usage report but HMaster is not ready to process it, "
|
||||
+ "skipping");
|
||||
}
|
||||
return RegionSpaceUseReportResponse.newBuilder().build();
|
||||
} catch (Exception e) {
|
||||
|
@ -2335,8 +2348,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
return builder.build();
|
||||
} else {
|
||||
LOG.debug(
|
||||
"Received space quota region size report but HMaster is not ready to process it, skipping");
|
||||
LOG.debug("Received space quota region size report but HMaster is not ready to process it,"
|
||||
+ "skipping");
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
|
@ -2880,4 +2893,34 @@ public class MasterRpcServices extends RSRpcServices
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
|
||||
throws ServiceException {
|
||||
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
|
||||
String clusterId = master.getClusterId();
|
||||
if (clusterId != null) {
|
||||
resp.setClusterId(clusterId);
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
|
||||
GetActiveMasterRequest request) throws ServiceException {
|
||||
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
|
||||
Optional<ServerName> serverName = master.getActiveMaster();
|
||||
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
|
||||
GetMetaRegionLocationsRequest request) throws ServiceException {
|
||||
GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
|
||||
Optional<List<HRegionLocation>> metaLocations =
|
||||
master.getMetaRegionLocationCache().getMetaRegionLocations();
|
||||
metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
|
||||
location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
|
||||
return response.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
* A cache of meta region location metadata. Registers a listener on ZK to track changes to the
|
||||
* meta table znodes. Clients are expected to retry if the meta information is stale. This class
|
||||
* is thread-safe (a single instance of this class can be shared by multiple threads without race
|
||||
* conditions).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetaRegionLocationCache extends ZKListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class);
|
||||
|
||||
/**
|
||||
* Maximum number of times we retry when ZK operation times out.
|
||||
*/
|
||||
private static final int MAX_ZK_META_FETCH_RETRIES = 10;
|
||||
/**
|
||||
* Sleep interval ms between ZK operation retries.
|
||||
*/
|
||||
private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000;
|
||||
private static final int SLEEP_INTERVAL_MS_MAX = 10000;
|
||||
private final RetryCounterFactory retryCounterFactory =
|
||||
new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES);
|
||||
|
||||
/**
|
||||
* Cached meta region locations indexed by replica ID.
|
||||
* CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during
|
||||
* client requests. Even though CopyOnWriteArrayMap copies the data structure for every write,
|
||||
* that should be OK since the size of the list is often small and mutations are not too often
|
||||
* and we do not need to block client requests while mutations are in progress.
|
||||
*/
|
||||
private final CopyOnWriteArrayMap<Integer, HRegionLocation> cachedMetaLocations;
|
||||
|
||||
private enum ZNodeOpType {
|
||||
INIT,
|
||||
CREATED,
|
||||
CHANGED,
|
||||
DELETED
|
||||
}
|
||||
|
||||
public MetaRegionLocationCache(ZKWatcher zkWatcher) {
|
||||
super(zkWatcher);
|
||||
cachedMetaLocations = new CopyOnWriteArrayMap<>();
|
||||
watcher.registerListener(this);
|
||||
// Populate the initial snapshot of data from meta znodes.
|
||||
// This is needed because stand-by masters can potentially start after the initial znode
|
||||
// creation. It blocks forever until the initial meta locations are loaded from ZK and watchers
|
||||
// are established. Subsequent updates are handled by the registered listener. Also, this runs
|
||||
// in a separate thread in the background to not block master init.
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
|
||||
RetryCounterFactory retryFactory = new RetryCounterFactory(
|
||||
Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
|
||||
threadFactory.newThread(
|
||||
()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers
|
||||
* a watcher on base znode to check for any CREATE/DELETE events on the children.
|
||||
* @param retryCounter controls the number of retries and sleep between retries.
|
||||
*/
|
||||
private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
|
||||
List<String> znodes = null;
|
||||
while (retryCounter.shouldRetry()) {
|
||||
try {
|
||||
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
|
||||
break;
|
||||
} catch (KeeperException ke) {
|
||||
LOG.debug("Error populating initial meta locations", ke);
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
// Retries exhausted and watchers not set. This is not a desirable state since the cache
|
||||
// could remain stale forever. Propagate the exception.
|
||||
watcher.abort("Error populating meta locations", ke);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while loading meta locations from ZK", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (znodes == null || znodes.isEmpty()) {
|
||||
// No meta znodes exist at this point but we registered a watcher on the base znode to listen
|
||||
// for updates. They will be handled via nodeChildrenChanged().
|
||||
return;
|
||||
}
|
||||
if (znodes.size() == cachedMetaLocations.size()) {
|
||||
// No new meta znodes got added.
|
||||
return;
|
||||
}
|
||||
for (String znode: znodes) {
|
||||
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
|
||||
updateMetaLocation(path, opType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for
|
||||
* future updates.
|
||||
* @param replicaId ReplicaID of the region.
|
||||
* @return HRegionLocation for the meta replica.
|
||||
* @throws KeeperException if there is any issue fetching/parsing the serialized data.
|
||||
*/
|
||||
private HRegionLocation getMetaRegionLocation(int replicaId)
|
||||
throws KeeperException {
|
||||
RegionState metaRegionState;
|
||||
try {
|
||||
byte[] data = ZKUtil.getDataAndWatch(watcher,
|
||||
watcher.getZNodePaths().getZNodeForReplica(replicaId));
|
||||
metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
}
|
||||
return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName());
|
||||
}
|
||||
|
||||
private void updateMetaLocation(String path, ZNodeOpType opType) {
|
||||
if (!isValidMetaZNode(path)) {
|
||||
return;
|
||||
}
|
||||
LOG.debug("Updating meta znode for path {}: {}", path, opType.name());
|
||||
int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path);
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
HRegionLocation location = null;
|
||||
while (retryCounter.shouldRetry()) {
|
||||
try {
|
||||
if (opType == ZNodeOpType.DELETED) {
|
||||
if (!ZKUtil.watchAndCheckExists(watcher, path)) {
|
||||
// The path does not exist, we've set the watcher and we can break for now.
|
||||
break;
|
||||
}
|
||||
// If it is a transient error and the node appears right away, we fetch the
|
||||
// latest meta state.
|
||||
}
|
||||
location = getMetaRegionLocation(replicaId);
|
||||
break;
|
||||
} catch (KeeperException e) {
|
||||
LOG.debug("Error getting meta location for path {}", path, e);
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
|
||||
break;
|
||||
}
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (location == null) {
|
||||
cachedMetaLocations.remove(replicaId);
|
||||
return;
|
||||
}
|
||||
cachedMetaLocations.put(replicaId, location);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
|
||||
*
|
||||
*/
|
||||
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
|
||||
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
|
||||
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
|
||||
if (snapshot.isEmpty()) {
|
||||
// This could be possible if the master has not successfully initialized yet or meta region
|
||||
// is stuck in some weird state.
|
||||
return Optional.empty();
|
||||
}
|
||||
List<HRegionLocation> result = new ArrayList<>();
|
||||
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
|
||||
// ArrayValueCollection does not implement toArray().
|
||||
snapshot.values().forEach(location -> result.add(location));
|
||||
return Optional.of(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to check if the given 'path' corresponds to a meta znode. This listener is only
|
||||
* interested in changes to meta znodes.
|
||||
*/
|
||||
private boolean isValidMetaZNode(String path) {
|
||||
return watcher.getZNodePaths().isAnyMetaReplicaZNode(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
updateMetaLocation(path, ZNodeOpType.CREATED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDeleted(String path) {
|
||||
updateMetaLocation(path, ZNodeOpType.DELETED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDataChanged(String path) {
|
||||
updateMetaLocation(path, ZNodeOpType.CHANGED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if (!path.equals(watcher.getZNodePaths().baseZNode)) {
|
||||
return;
|
||||
}
|
||||
loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.master.CachedClusterId;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestCachedClusterId {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCachedClusterId.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static String clusterId;
|
||||
private static HMaster activeMaster;
|
||||
private static HMaster standByMaster;
|
||||
|
||||
private static class GetClusterIdThread extends TestThread {
|
||||
CachedClusterId cachedClusterId;
|
||||
public GetClusterIdThread(TestContext ctx, CachedClusterId clusterId) {
|
||||
super(ctx);
|
||||
cachedClusterId = clusterId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWork() throws Exception {
|
||||
assertEquals(clusterId, cachedClusterId.getFromCacheOrFetch());
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
clusterId = activeMaster.getClusterId();
|
||||
standByMaster = TEST_UTIL.getHBaseCluster().startMaster().getMaster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterIdMatch() {
|
||||
assertEquals(clusterId, standByMaster.getClusterId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiThreadedGetClusterId() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
CachedClusterId cachedClusterId = new CachedClusterId(conf);
|
||||
TestContext context = new TestContext(conf);
|
||||
int numThreads = 100;
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
context.addThread(new GetClusterIdThread(context, cachedClusterId));
|
||||
}
|
||||
context.startThreads();
|
||||
context.stop();
|
||||
int cacheMisses = cachedClusterId.getCacheStats();
|
||||
assertEquals(cacheMisses, 1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,186 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MetaRegionLocationCache;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({SmallTests.class, MasterTests.class })
|
||||
public class TestMetaRegionLocationCache {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static AsyncRegistry REGISTRY;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(
|
||||
TEST_UTIL.getConfiguration(), REGISTRY, 3);
|
||||
TEST_UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUp() throws Exception {
|
||||
IOUtils.closeQuietly(REGISTRY);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Exception {
|
||||
List<HRegionLocation> result = new ArrayList<>();
|
||||
for (String znode: zk.getMetaReplicaNodes()) {
|
||||
String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode);
|
||||
int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path);
|
||||
RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId);
|
||||
result.add(new HRegionLocation(state.getRegion(), state.getServerName()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Verifies that the cached meta locations in the given master are in sync with what is in ZK.
|
||||
private void verifyCachedMetaLocations(HMaster master) throws Exception {
|
||||
// Wait until initial meta locations are loaded.
|
||||
int retries = 0;
|
||||
while (!master.getMetaRegionLocationCache().getMetaRegionLocations().isPresent()) {
|
||||
Thread.sleep(1000);
|
||||
if (++retries == 10) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
List<HRegionLocation> metaHRLs =
|
||||
master.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
assertFalse(metaHRLs.isEmpty());
|
||||
ZKWatcher zk = master.getZooKeeper();
|
||||
List<String> metaZnodes = zk.getMetaReplicaNodes();
|
||||
assertEquals(metaZnodes.size(), metaHRLs.size());
|
||||
List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
|
||||
Collections.sort(metaHRLs);
|
||||
Collections.sort(actualHRLs);
|
||||
assertEquals(actualHRLs, metaHRLs);
|
||||
}
|
||||
|
||||
@Test public void testInitialMetaLocations() throws Exception {
|
||||
verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
|
||||
}
|
||||
|
||||
@Test public void testStandByMetaLocations() throws Exception {
|
||||
HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
|
||||
verifyCachedMetaLocations(standBy);
|
||||
}
|
||||
|
||||
/*
|
||||
* Shuffles the meta region replicas around the cluster and makes sure the cache is not stale.
|
||||
*/
|
||||
@Test public void testMetaLocationsChange() throws Exception {
|
||||
List<HRegionLocation> currentMetaLocs =
|
||||
getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
|
||||
// Move these replicas to random servers.
|
||||
for (HRegionLocation location: currentMetaLocs) {
|
||||
RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
|
||||
}
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(
|
||||
TEST_UTIL.getConfiguration(), REGISTRY, 3);
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
verifyCachedMetaLocations(masterThread.getMaster());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base
|
||||
* znode for notifications.
|
||||
*/
|
||||
@Test public void testMetaRegionLocationCache() throws Exception {
|
||||
final String parentZnodeName = "/randomznodename";
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName);
|
||||
ServerName sn = ServerName.valueOf("localhost", 1234, 5678);
|
||||
try (ZKWatcher zkWatcher = new ZKWatcher(conf, null, null, true)) {
|
||||
// A thread that repeatedly creates and drops an unrelated child znode. This is to simulate
|
||||
// some ZK activity in the background.
|
||||
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
|
||||
ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
|
||||
@Override public void doAnAction() throws Exception {
|
||||
final String testZnode = parentZnodeName + "/child";
|
||||
ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes());
|
||||
ZKUtil.deleteNode(zkWatcher, testZnode);
|
||||
}
|
||||
});
|
||||
ctx.startThreads();
|
||||
try {
|
||||
MetaRegionLocationCache metaCache = new MetaRegionLocationCache(zkWatcher);
|
||||
// meta znodes do not exist at this point, cache should be empty.
|
||||
assertFalse(metaCache.getMetaRegionLocations().isPresent());
|
||||
// Set the meta locations for a random meta replicas, simulating an active hmaster meta
|
||||
// assignment.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
// Updates the meta znodes.
|
||||
MetaTableLocator.setMetaLocation(zkWatcher, sn, i, RegionState.State.OPEN);
|
||||
}
|
||||
// Wait until the meta cache is populated.
|
||||
int iters = 0;
|
||||
while (iters++ < 10) {
|
||||
if (metaCache.getMetaRegionLocations().isPresent()
|
||||
&& metaCache.getMetaRegionLocations().get().size() == 3) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations().get();
|
||||
assertEquals(3, metaLocations.size());
|
||||
for (HRegionLocation location : metaLocations) {
|
||||
assertEquals(sn, location.getServerName());
|
||||
}
|
||||
} finally {
|
||||
// clean up.
|
||||
ctx.stop();
|
||||
ZKUtil.deleteChildrenRecursively(zkWatcher, parentZnodeName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -91,6 +92,7 @@ public class TestActiveMasterManager {
|
|||
ActiveMasterManager activeMasterManager =
|
||||
dummyMaster.getActiveMasterManager();
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
|
||||
|
||||
// First test becoming the active master uninterrupted
|
||||
MonitoredTask status = Mockito.mock(MonitoredTask.class);
|
||||
|
@ -99,6 +101,7 @@ public class TestActiveMasterManager {
|
|||
activeMasterManager.blockUntilBecomingActiveMaster(100, status);
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
|
||||
|
||||
// Now pretend master restart
|
||||
DummyMaster secondDummyMaster = new DummyMaster(zk,master);
|
||||
|
@ -108,6 +111,8 @@ public class TestActiveMasterManager {
|
|||
activeMasterManager.blockUntilBecomingActiveMaster(100, status);
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
|
||||
assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -135,6 +140,7 @@ public class TestActiveMasterManager {
|
|||
ActiveMasterManager activeMasterManager =
|
||||
ms1.getActiveMasterManager();
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
|
||||
|
||||
// First test becoming the active master uninterrupted
|
||||
ClusterStatusTracker clusterStatusTracker =
|
||||
|
@ -144,6 +150,7 @@ public class TestActiveMasterManager {
|
|||
Mockito.mock(MonitoredTask.class));
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, firstMasterAddress);
|
||||
assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
|
||||
|
||||
// New manager will now try to become the active master in another thread
|
||||
WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
|
||||
|
@ -161,6 +168,8 @@ public class TestActiveMasterManager {
|
|||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
// But secondary one should not be the active master
|
||||
assertFalse(t.isActiveMaster);
|
||||
// Verify the active master ServerName is populated in standby master.
|
||||
assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get());
|
||||
|
||||
// Close the first server and delete it's master node
|
||||
ms1.stop("stopping first server");
|
||||
|
@ -189,6 +198,7 @@ public class TestActiveMasterManager {
|
|||
|
||||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
assertTrue(t.isActiveMaster);
|
||||
assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get());
|
||||
|
||||
LOG.info("Deleting master node");
|
||||
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
|
||||
|
||||
@Category({MediumTests.class, MasterTests.class})
|
||||
public class TestClientMetaServiceRPCs {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestClientMetaServiceRPCs.class);
|
||||
|
||||
// Total number of masters (active + stand by) for the purpose of this test.
|
||||
private static final int MASTER_COUNT = 3;
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Configuration conf;
|
||||
private static int rpcTimeout;
|
||||
private static RpcClient rpcClient;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
// Start the mini cluster with stand-by masters.
|
||||
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
|
||||
builder.numMasters(MASTER_COUNT).numRegionServers(3);
|
||||
TEST_UTIL.startMiniCluster(builder.build());
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
|
||||
rpcClient = RpcClientFactory.createClient(conf,
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (rpcClient != null) {
|
||||
rpcClient.close();
|
||||
}
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private static ClientMetaService.BlockingInterface getMasterStub(ServerName server)
|
||||
throws IOException {
|
||||
return ClientMetaService.newBlockingStub(
|
||||
rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout));
|
||||
}
|
||||
|
||||
private static HBaseRpcController getRpcController() {
|
||||
return RpcControllerFactory.instantiate(conf).newController();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the cluster ID from all running masters.
|
||||
*/
|
||||
@Test public void TestClusterID() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
String clusterID = TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId();
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetClusterIdResponse resp =
|
||||
stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance());
|
||||
assertEquals(clusterID, resp.getClusterId());
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the active master ServerName as seen by all masters.
|
||||
*/
|
||||
@Test public void TestActiveMaster() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
ServerName activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName();
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetActiveMasterResponse resp =
|
||||
stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance());
|
||||
assertEquals(activeMaster, ProtobufUtil.toServerName(resp.getServerName()));
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the meta region locations RPC returns consistent results across all masters.
|
||||
*/
|
||||
@Test public void TestMetaLocations() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
List<HRegionLocation> metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
|
||||
.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
Collections.sort(metaLocations);
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetMetaRegionLocationsResponse resp = stub.getMetaRegionLocations(
|
||||
rpcController, GetMetaRegionLocationsRequest.getDefaultInstance());
|
||||
List<HRegionLocation> result = new ArrayList<>();
|
||||
resp.getMetaLocationsList().forEach(
|
||||
location -> result.add(ProtobufUtil.toRegionLocation(location)));
|
||||
Collections.sort(result);
|
||||
assertEquals(metaLocations, result);
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -65,7 +64,7 @@ public class TestCloseAnOpeningRegion {
|
|||
|
||||
public static final class MockHMaster extends HMaster {
|
||||
|
||||
public MockHMaster(Configuration conf) throws IOException, KeeperException {
|
||||
public MockHMaster(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
@ -141,4 +140,4 @@ public class TestCloseAnOpeningRegion {
|
|||
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(0)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -164,7 +163,7 @@ public class TestClusterRestartFailover extends AbstractTestRestartCluster {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
|
|||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -120,7 +119,7 @@ public class TestRegionsRecoveryConfigManager {
|
|||
|
||||
// Make it public so that JVMClusterUtil can access it.
|
||||
public static class TestHMaster extends HMaster {
|
||||
public TestHMaster(Configuration conf) throws IOException, KeeperException {
|
||||
public TestHMaster(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
}
|
||||
|
@ -144,4 +143,4 @@ public class TestRegionsRecoveryConfigManager {
|
|||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -56,7 +55,7 @@ public class TestShutdownBackupMaster {
|
|||
|
||||
public static final class MockHMaster extends HMaster {
|
||||
|
||||
public MockHMaster(Configuration conf) throws IOException, KeeperException {
|
||||
public MockHMaster(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -74,7 +73,7 @@ public class TestOpenRegionProcedureBackoff {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ public class TestOpenRegionProcedureHang {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -110,7 +109,7 @@ public class TestRegionAssignedToMultipleRegionServers {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -106,7 +105,7 @@ public class TestReportOnlineRegionsRace {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -117,7 +116,7 @@ public class TestReportRegionStateTransitionFromDeadServer {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -84,7 +83,7 @@ public class TestReportRegionStateTransitionRetry {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -130,7 +129,7 @@ public class TestSCPGetRegionsRace {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -198,7 +197,7 @@ public class TestWakeUpUnexpectedProcedure {
|
|||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
public HMasterForTest(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.protobuf;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -29,13 +28,17 @@ import org.apache.hadoop.hbase.CellBuilderType;
|
|||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
||||
|
@ -51,11 +54,12 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
|
||||
|
||||
/**
|
||||
* Class to test ProtobufUtil.
|
||||
*/
|
||||
@Category({MiscTests.class, SmallTests.class})
|
||||
@Category({ MiscTests.class, SmallTests.class})
|
||||
public class TestProtobufUtil {
|
||||
|
||||
@ClassRule
|
||||
|
@ -348,4 +352,32 @@ public class TestProtobufUtil {
|
|||
ProtobufUtil.toCell(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY), cell);
|
||||
assertTrue(CellComparatorImpl.COMPARATOR.compare(offheapKV, newOffheapKV) == 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaRegionState() throws Exception {
|
||||
ServerName serverName = ServerName.valueOf("localhost", 1234, 5678);
|
||||
// New region state style.
|
||||
for (RegionState.State state: RegionState.State.values()) {
|
||||
RegionState regionState =
|
||||
new RegionState(RegionInfoBuilder.FIRST_META_REGIONINFO, state, serverName);
|
||||
MetaRegionServer metars = MetaRegionServer.newBuilder()
|
||||
.setServer(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(serverName))
|
||||
.setRpcVersion(HConstants.RPC_CURRENT_VERSION)
|
||||
.setState(state.convert()).build();
|
||||
// Serialize
|
||||
byte[] data = ProtobufUtil.prependPBMagic(metars.toByteArray());
|
||||
ProtobufUtil.prependPBMagic(data);
|
||||
// Deserialize
|
||||
RegionState regionStateNew =
|
||||
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.parseMetaRegionStateFrom(data, 1);
|
||||
assertEquals(regionState.getServerName(), regionStateNew.getServerName());
|
||||
assertEquals(regionState.getState(), regionStateNew.getState());
|
||||
}
|
||||
// old style.
|
||||
RegionState rs =
|
||||
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.parseMetaRegionStateFrom(
|
||||
serverName.getVersionedBytes(), 1);
|
||||
assertEquals(serverName, rs.getServerName());
|
||||
assertEquals(rs.getState(), RegionState.State.OPEN);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -122,7 +122,7 @@ public class TestRegionServerReportForDuty {
|
|||
* This test HMaster class will always throw ServerNotRunningYetException if checked.
|
||||
*/
|
||||
public static class NeverInitializedMaster extends HMaster {
|
||||
public NeverInitializedMaster(Configuration conf) throws IOException, KeeperException {
|
||||
public NeverInitializedMaster(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.master.HMaster;
|
|||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -134,7 +133,7 @@ public class TestReplicationProcedureRetry {
|
|||
|
||||
private ReplicationPeerManager manager;
|
||||
|
||||
public MockHMaster(Configuration conf) throws IOException, KeeperException {
|
||||
public MockHMaster(Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,12 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
|
||||
|
||||
/**
|
||||
|
@ -266,40 +261,17 @@ public final class MetaTableLocator {
|
|||
* @throws KeeperException if a ZooKeeper operation fails
|
||||
*/
|
||||
public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
|
||||
throws KeeperException {
|
||||
RegionState.State state = RegionState.State.OPEN;
|
||||
ServerName serverName = null;
|
||||
throws KeeperException {
|
||||
RegionState regionState = null;
|
||||
try {
|
||||
byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
|
||||
if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) {
|
||||
try {
|
||||
int prefixLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ZooKeeperProtos.MetaRegionServer rl =
|
||||
ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
|
||||
data.length - prefixLen);
|
||||
if (rl.hasState()) {
|
||||
state = RegionState.State.convert(rl.getState());
|
||||
}
|
||||
HBaseProtos.ServerName sn = rl.getServer();
|
||||
serverName = ServerName.valueOf(
|
||||
sn.getHostName(), sn.getPort(), sn.getStartCode());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException("Unable to parse meta region location");
|
||||
}
|
||||
} else {
|
||||
// old style of meta region location?
|
||||
serverName = ProtobufUtil.parseServerNameFrom(data);
|
||||
}
|
||||
regionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (serverName == null) {
|
||||
state = RegionState.State.OFFLINE;
|
||||
}
|
||||
return new RegionState(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), state, serverName);
|
||||
return regionState;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,10 +23,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.AuthUtil;
|
||||
|
@ -81,10 +79,6 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
|||
// listeners to be notified
|
||||
private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
// Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
|
||||
// negotiation to complete
|
||||
private CountDownLatch saslLatch = new CountDownLatch(1);
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
/* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
|
||||
|
@ -383,13 +377,32 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
|||
*/
|
||||
public List<String> getMetaReplicaNodes() throws KeeperException {
|
||||
List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
|
||||
return filterMetaReplicaNodes(childrenOfBaseNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode
|
||||
* for subsequent CREATE/DELETE operations on child nodes.
|
||||
*/
|
||||
public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException {
|
||||
List<String> childrenOfBaseNode =
|
||||
ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode);
|
||||
return filterMetaReplicaNodes(childrenOfBaseNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param nodes Input list of znodes
|
||||
* @return Filtered list of znodes from nodes that belong to meta replica(s).
|
||||
*/
|
||||
private List<String> filterMetaReplicaNodes(List<String> nodes) {
|
||||
if (nodes == null || nodes.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<String> metaReplicaNodes = new ArrayList<>(2);
|
||||
if (childrenOfBaseNode != null) {
|
||||
String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
|
||||
for (String child : childrenOfBaseNode) {
|
||||
if (child.startsWith(pattern)) {
|
||||
metaReplicaNodes.add(child);
|
||||
}
|
||||
String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX);
|
||||
for (String child : nodes) {
|
||||
if (child.startsWith(pattern)) {
|
||||
metaReplicaNodes.add(child);
|
||||
}
|
||||
}
|
||||
return metaReplicaNodes;
|
||||
|
|
Loading…
Reference in New Issue