HBASE-25288 Make MasterRpcServices not extends RSRpcServices and also HMaster not extends HRegionServer (#3612)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-09-13 09:56:13 +08:00 committed by GitHub
parent 04c38883af
commit d26bcaaa91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 2069 additions and 1959 deletions

View File

@ -0,0 +1,386 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.OOMEChecker;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Base class for Master and RegionServer RpcServices.
*/
@InterfaceAudience.Private
public abstract class HBaseRpcServicesBase<S extends HBaseServerBase<?>>
implements ClientMetaService.BlockingInterface, AdminService.BlockingInterface,
HBaseRPCErrorHandler, PriorityFunction, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(HBaseRpcServicesBase.class);
public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit";
public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10;
protected final S server;
// Server to handle client requests.
protected final RpcServer rpcServer;
private final InetSocketAddress isa;
protected final PriorityFunction priority;
private AccessChecker accessChecker;
private ZKPermissionWatcher zkPermissionWatcher;
protected HBaseRpcServicesBase(S server, String processName) throws IOException {
this.server = server;
Configuration conf = server.getConfiguration();
final RpcSchedulerFactory rpcSchedulerFactory;
try {
rpcSchedulerFactory = getRpcSchedulerFactoryClass(conf).asSubclass(RpcSchedulerFactory.class)
.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException
| IllegalAccessException e) {
throw new IllegalArgumentException(e);
}
String hostname = DNS.getHostname(conf, getDNSServerType());
int port = conf.getInt(getPortConfigName(), getDefaultPort());
// Creation of a HSA will force a resolve.
final InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
final InetSocketAddress bindAddress = new InetSocketAddress(getHostname(conf, hostname), port);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
priority = createPriority();
// Using Address means we don't get the IP too. Shorten it more even to just the host name
// w/o the domain.
final String name = processName + "/" +
Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
server.setName(name);
// Set how many times to retry talking to another server over Connection.
ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
boolean reservoirEnabled =
conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, defaultReservoirEnabled());
try {
// use final bindAddress for this server.
rpcServer = RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, conf,
rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
} catch (BindException be) {
throw new IOException(be.getMessage() + ". To switch ports use the '" + getPortConfigName() +
"' configuration property.", be.getCause() != null ? be.getCause() : be);
}
final InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
// Set our address, however we need the final port that was given to rpcServer
isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
}
protected abstract boolean defaultReservoirEnabled();
protected abstract DNS.ServerType getDNSServerType();
protected abstract String getHostname(Configuration conf, String defaultHostname);
protected abstract String getPortConfigName();
protected abstract int getDefaultPort();
protected abstract PriorityFunction createPriority();
protected abstract Class<?> getRpcSchedulerFactoryClass(Configuration conf);
protected abstract List<BlockingServiceAndInterface> getServices();
protected final void internalStart(ZKWatcher zkWatcher) {
if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
accessChecker = new AccessChecker(getConfiguration());
} else {
accessChecker = new NoopAccessChecker(getConfiguration());
}
zkPermissionWatcher =
new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
try {
zkPermissionWatcher.start();
} catch (KeeperException e) {
LOG.error("ZooKeeper permission watcher initialization failed", e);
}
rpcServer.start();
}
protected final void requirePermission(String request, Permission.Action perm)
throws IOException {
if (accessChecker != null) {
accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
}
}
public AccessChecker getAccessChecker() {
return accessChecker;
}
public ZKPermissionWatcher getZkPermissionWatcher() {
return zkPermissionWatcher;
}
protected final void internalStop() {
if (zkPermissionWatcher != null) {
zkPermissionWatcher.close();
}
rpcServer.stop();
}
public Configuration getConfiguration() {
return server.getConfiguration();
}
public S getServer() {
return server;
}
public InetSocketAddress getSocketAddress() {
return isa;
}
public RpcServerInterface getRpcServer() {
return rpcServer;
}
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
}
@Override
public int getPriority(RequestHeader header, Message param, User user) {
return priority.getPriority(header, param, user);
}
@Override
public long getDeadline(RequestHeader header, Message param) {
return priority.getDeadline(header, param);
}
/**
* Check if an OOME and, if so, abort immediately to avoid creating more objects.
* @return True if we OOME'd and are aborting.
*/
@Override
public boolean checkOOME(Throwable e) {
return OOMEChecker.exitIfOOME(e, getClass().getSimpleName());
}
@Override
public void onConfigurationChange(Configuration conf) {
rpcServer.onConfigurationChange(conf);
}
@Override
public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
throws ServiceException {
return GetClusterIdResponse.newBuilder().setClusterId(server.getClusterId()).build();
}
@Override
public GetActiveMasterResponse getActiveMaster(RpcController controller,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
server.getActiveMaster()
.ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
return builder.build();
}
@Override
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
throws ServiceException {
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
server.getActiveMaster()
.ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
server.getBackupMasters()
.forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
return builder.build();
}
@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
GetMetaRegionLocationsRequest request) throws ServiceException {
GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
server.getMetaLocations()
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
return builder.build();
}
@Override
public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
List<ServerName> bootstrapNodes = new ArrayList<>(server.getRegionServers());
Collections.shuffle(bootstrapNodes, ThreadLocalRandom.current());
int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
bootstrapNodes.stream().limit(maxNodeCount).map(ProtobufUtil::toServerName)
.forEach(builder::addServerName);
return builder.build();
}
@Override
public UpdateConfigurationResponse updateConfiguration(RpcController controller,
UpdateConfigurationRequest request) throws ServiceException {
try {
requirePermission("updateConfiguration", Permission.Action.ADMIN);
this.server.updateConfiguration();
} catch (Exception e) {
throw new ServiceException(e);
}
return UpdateConfigurationResponse.getDefaultInstance();
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
final ClearSlowLogResponseRequest request) throws ServiceException {
try {
requirePermission("clearSlowLogsResponses", Permission.Action.ADMIN);
} catch (IOException e) {
throw new ServiceException(e);
}
final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
.map(
queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
.orElse(false);
ClearSlowLogResponses clearSlowLogResponses =
ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build();
return clearSlowLogResponses;
}
private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
NamedQueueRecorder namedQueueRecorder) {
if (namedQueueRecorder == null) {
return Collections.emptyList();
}
List<SlowLogPayload> slowLogPayloads;
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
slowLogPayloads = namedQueueGetResponse != null ? namedQueueGetResponse.getSlowLogPayloads() :
Collections.emptyList();
return slowLogPayloads;
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
HBaseProtos.LogRequest request) throws ServiceException {
try {
final String logClassName = request.getLogClassName();
Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
Method method = logClass.getMethod("parseFrom", ByteString.class);
if (logClassName.contains("SlowLogResponseRequest")) {
SlowLogResponseRequest slowLogResponseRequest =
(SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
final List<SlowLogPayload> slowLogPayloads =
getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
SlowLogResponses slowLogResponses =
SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
return HBaseProtos.LogEntry.newBuilder()
.setLogClassName(slowLogResponses.getClass().getName())
.setLogMessage(slowLogResponses.toByteString()).build();
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
LOG.error("Error while retrieving log entries.", e);
throw new ServiceException(e);
}
throw new ServiceException("Invalid request params");
}
}

View File

@ -0,0 +1,600 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.lang.management.MemoryType;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;
/**
* Base class for hbase services, such as master or region server.
*/
@InterfaceAudience.Private
public abstract class HBaseServerBase<R extends HBaseRpcServicesBase<?>> extends Thread
implements Server, ConfigurationObserver, ConnectionRegistryEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(HBaseServerBase.class);
protected final Configuration conf;
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
protected final AtomicBoolean abortRequested = new AtomicBoolean(false);
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation.
protected volatile boolean stopped = false;
/**
* This servers startcode.
*/
protected final long startcode;
protected final UserProvider userProvider;
// zookeeper connection and watcher
protected final ZKWatcher zooKeeper;
/**
* The server name the Master sees us as. Its made from the hostname the master passes us, port,
* and server startcode. Gets set after registration against Master.
*/
protected ServerName serverName;
protected final R rpcServices;
/**
* hostname specified by hostname config
*/
protected final String useThisHostnameInstead;
/**
* Provide online slow log responses from ringbuffer
*/
protected final NamedQueueRecorder namedQueueRecorder;
/**
* Configuration manager is used to register/deregister and notify the configuration observers
* when the regionserver is notified that there was a change in the on disk configs.
*/
protected final ConfigurationManager configurationManager;
/**
* ChoreService used to schedule tasks that we want to run periodically
*/
protected final ChoreService choreService;
// Instance of the hbase executor executorService.
protected final ExecutorService executorService;
// Cluster Status Tracker
protected final ClusterStatusTracker clusterStatusTracker;
protected final CoordinatedStateManager csm;
// Info server. Default access so can be used by unit tests. REGIONSERVER
// is name of the webapp and the attribute name used stuffing this instance
// into web context.
protected InfoServer infoServer;
protected HFileSystem dataFs;
protected HFileSystem walFs;
protected Path dataRootDir;
protected Path walRootDir;
protected final int msgInterval;
// A sleeper that sleeps for msgInterval.
protected final Sleeper sleeper;
/**
* Go here to get table descriptors.
*/
protected TableDescriptors tableDescriptors;
/**
* The asynchronous cluster connection to be shared by services.
*/
protected AsyncClusterConnection asyncClusterConnection;
/**
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
* entries. Used for serving ClientMetaService.
*/
protected final MetaRegionLocationCache metaRegionLocationCache;
protected final NettyEventLoopGroupConfig eventLoopGroupConfig;
/**
* If running on Windows, do windows-specific setup.
*/
private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
if (!SystemUtils.IS_OS_WINDOWS) {
Signal.handle(new Signal("HUP"), signal -> {
conf.reloadConfiguration();
cm.notifyAllObservers(conf);
});
}
}
/**
* Setup our cluster connection if not already initialized.
*/
protected final synchronized void setupClusterConnection() throws IOException {
if (asyncClusterConnection == null) {
InetSocketAddress localAddress =
new InetSocketAddress(rpcServices.getSocketAddress().getAddress(), 0);
User user = userProvider.getCurrent();
asyncClusterConnection =
ClusterConnectionFactory.createAsyncClusterConnection(this, conf, localAddress, user);
}
}
protected final void initializeFileSystem() throws IOException {
// Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
// checksum verification enabled, then automatically switch off hdfs checksum verification.
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
String walDirUri = CommonFSUtils.getDirUri(this.conf,
new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR))));
// set WAL's uri
if (walDirUri != null) {
CommonFSUtils.setFsDefault(this.conf, walDirUri);
}
// init the WALFs
this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
// Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
// underlying hadoop hdfs accessors will be going against wrong filesystem
// (unless all is set to defaults).
String rootDirUri =
CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR)));
if (rootDirUri != null) {
CommonFSUtils.setFsDefault(this.conf, rootDirUri);
}
// init the filesystem
this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
!canUpdateTableDescriptor(), cacheTableDescriptor());
}
public HBaseServerBase(Configuration conf, String name)
throws ZooKeeperConnectionException, IOException {
super(name); // thread name
this.conf = conf;
this.eventLoopGroupConfig =
NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
this.startcode = EnvironmentEdgeManager.currentTime();
this.userProvider = UserProvider.instantiate(conf);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.namedQueueRecorder = createNamedQueueRecord();
this.rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
InetSocketAddress addr = rpcServices.getSocketAddress();
String hostName = StringUtils.isBlank(useThisHostnameInstead) ? addr.getHostName() :
this.useThisHostnameInstead;
serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode);
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
// login the server principal (if using secure Hadoop)
login(userProvider, hostName);
// init superusers and add the server principal (if using security)
// or process owner as default super user.
Superusers.initialize(conf);
zooKeeper =
new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode());
this.configurationManager = new ConfigurationManager();
setupWindows(conf, configurationManager);
initializeFileSystem();
this.choreService = new ChoreService(getName(), true);
this.executorService = new ExecutorService(getName());
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
if (clusterMode()) {
if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
csm = new ZkCoordinatedStateManager(this);
} else {
csm = null;
}
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
} else {
csm = null;
clusterStatusTracker = null;
}
putUpWebUI();
}
/**
* Puts up the webui.
*/
private void putUpWebUI() throws IOException {
int port =
this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT);
String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
if (this instanceof HMaster) {
port = conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT);
addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
}
// -1 is for disabling info server
if (port < 0) {
return;
}
if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
String msg = "Failed to start http info server. Address " + addr +
" does not belong to this host. Correct configuration parameter: " +
"hbase.regionserver.info.bindAddress";
LOG.error(msg);
throw new IOException(msg);
}
// check if auto port bind enabled
boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false);
while (true) {
try {
this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
configureInfoServer(infoServer);
this.infoServer.start();
break;
} catch (BindException e) {
if (!auto) {
// auto bind disabled throw BindException
LOG.error("Failed binding http info server to port: " + port);
throw e;
}
// auto bind enabled, try to use another port
LOG.info("Failed binding http info server to port: " + port);
port++;
LOG.info("Retry starting http info server with port: " + port);
}
}
port = this.infoServer.getPort();
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
int masterInfoPort =
conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT);
conf.setInt("hbase.master.info.port.orig", masterInfoPort);
conf.setInt(HConstants.MASTER_INFO_PORT, port);
}
/**
* Sets the abort state if not already set.
* @return True if abortRequested set to True successfully, false if an abort is already in
* progress.
*/
protected final boolean setAbortRequested() {
return abortRequested.compareAndSet(false, true);
}
@Override
public boolean isStopped() {
return stopped;
}
@Override
public boolean isAborted() {
return abortRequested.get();
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public AsyncClusterConnection getAsyncClusterConnection() {
return asyncClusterConnection;
}
@Override
public ZKWatcher getZooKeeper() {
return zooKeeper;
}
protected final void shutdownChore(ScheduledChore chore) {
if (chore != null) {
chore.shutdown();
}
}
protected final void initializeMemStoreChunkCreator(HeapMemoryManager hMemManager) {
if (MemStoreLAB.isEnabled(conf)) {
// MSLAB is enabled. So initialize MemStoreChunkPool
// By this time, the MemstoreFlusher is already initialized. We can get the global limits from
// it.
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
long globalMemStoreSize = pair.getFirst();
boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
// When off heap memstore in use, take full area for chunk pool.
float poolSizePercentage = offheap ? 1.0F :
conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
// init the chunkCreator
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, hMemManager, indexChunkSizePercent);
}
}
protected abstract void stopChores();
protected final void stopChoreService() {
// clean up the scheduled chores
if (choreService != null) {
LOG.info("Shutdown chores and chore service");
stopChores();
// cancel the remaining scheduled chores (in case we missed out any)
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
choreService.shutdown();
}
}
protected final void stopExecutorService() {
if (executorService != null) {
LOG.info("Shutdown executor service");
executorService.shutdown();
}
}
protected final void closeClusterConnection() {
if (asyncClusterConnection != null) {
LOG.info("Close async cluster connection");
try {
this.asyncClusterConnection.close();
} catch (IOException e) {
// Although the {@link Closeable} interface throws an {@link
// IOException}, in reality, the implementation would never do that.
LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
}
}
}
protected final void stopInfoServer() {
if (this.infoServer != null) {
LOG.info("Stop info server");
try {
this.infoServer.stop();
} catch (Exception e) {
LOG.error("Failed to stop infoServer", e);
}
}
}
protected final void closeZooKeeper() {
if (this.zooKeeper != null) {
LOG.info("Close zookeeper");
this.zooKeeper.close();
}
}
@Override
public ServerName getServerName() {
return serverName;
}
@Override
public ChoreService getChoreService() {
return choreService;
}
/**
* @return Return table descriptors implementation.
*/
public TableDescriptors getTableDescriptors() {
return this.tableDescriptors;
}
public ExecutorService getExecutorService() {
return executorService;
}
public AccessChecker getAccessChecker() {
return rpcServices.getAccessChecker();
}
public ZKPermissionWatcher getZKPermissionWatcher() {
return rpcServices.getZkPermissionWatcher();
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return csm;
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
return ConnectionFactory.createConnection(conf, null, user);
}
/**
* @return Return the rootDir.
*/
public Path getDataRootDir() {
return dataRootDir;
}
@Override
public FileSystem getFileSystem() {
return dataFs;
}
/**
* @return Return the walRootDir.
*/
public Path getWALRootDir() {
return walRootDir;
}
/**
* @return Return the walFs.
*/
public FileSystem getWALFileSystem() {
return walFs;
}
/**
* @return True if the cluster is up.
*/
public boolean isClusterUp() {
return !clusterMode() || this.clusterStatusTracker.isClusterUp();
}
/**
* @return time stamp in millis of when this server was started
*/
public long getStartcode() {
return this.startcode;
}
public InfoServer getInfoServer() {
return infoServer;
}
public int getMsgInterval() {
return msgInterval;
}
/**
* get NamedQueue Provider to add different logs to ringbuffer
* @return NamedQueueRecorder
*/
public NamedQueueRecorder getNamedQueueRecorder() {
return this.namedQueueRecorder;
}
public RpcServerInterface getRpcServer() {
return rpcServices.getRpcServer();
}
public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
return eventLoopGroupConfig;
}
public R getRpcServices() {
return rpcServices;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}
/**
* Reload the configuration from disk.
*/
public void updateConfiguration() {
LOG.info("Reloading the configuration from disk.");
// Reload the configuration from disk.
conf.reloadConfiguration();
configurationManager.notifyAllObservers(conf);
}
@Override
public String toString() {
return getServerName().toString();
}
protected abstract boolean canCreateBaseZNode();
protected abstract String getProcessName();
protected abstract R createRpcServices() throws IOException;
protected abstract String getUseThisHostnameInstead(Configuration conf) throws IOException;
protected abstract void login(UserProvider user, String host) throws IOException;
protected abstract NamedQueueRecorder createNamedQueueRecord();
protected abstract void configureInfoServer(InfoServer infoServer);
protected abstract Class<? extends HttpServlet> getDumpServlet();
protected abstract boolean canUpdateTableDescriptor();
protected abstract boolean cacheTableDescriptor();
protected abstract boolean clusterMode();
}

View File

@ -49,7 +49,9 @@ public interface Server extends Abortable, Stoppable {
* Important note: this method returns a reference to Connection which is managed
* by Server itself, so callers must NOT attempt to close connection obtained.
*/
Connection getConnection();
default Connection getConnection() {
return getAsyncConnection().toConnection();
}
Connection createConnection(Configuration conf) throws IOException;

View File

@ -21,8 +21,8 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HBaseRpcServicesBase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
// RegionSpecifier object. Methods can be invoked on the returned object
// to figure out whether it is a meta region or not.
@InterfaceAudience.Private
public abstract class AnnotationReadingPriorityFunction<T extends RSRpcServices>
public abstract class AnnotationReadingPriorityFunction<T extends HBaseRpcServicesBase<?>>
implements PriorityFunction {
protected final Map<String, Integer> annotatedQos;

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HBaseServerBase;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
@ -79,7 +80,7 @@ public class NettyRpcServer extends RpcServer {
EventLoopGroup eventLoopGroup;
Class<? extends ServerChannel> channelClass;
if (server instanceof HRegionServer) {
NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig();
NettyEventLoopGroupConfig config = ((HBaseServerBase) server).getEventLoopGroupConfig();
eventLoopGroup = config.group();
channelClass = config.serverChannelClass();
} else {

View File

@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.client.BalanceRequest;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
@ -68,7 +67,9 @@ import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HBaseServerBase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.client.BalanceRequest;
import org.apache.hadoop.hbase.client.BalanceResponse;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
@ -102,6 +104,7 @@ import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.http.HttpServer;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -114,6 +117,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer;
@ -166,6 +170,7 @@ import org.apache.hadoop.hbase.mob.MobFileCompactionChore;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@ -190,7 +195,6 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -199,16 +203,17 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.rsgroup.RSGroupUtil;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FutureUtils;
@ -266,8 +271,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.Snapshot
* @see org.apache.zookeeper.Watcher
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings("deprecation")
public class HMaster extends HRegionServer implements MasterServices {
public class HMaster extends HBaseServerBase<MasterRpcServices> implements MasterServices {
private static final Logger LOG = LoggerFactory.getLogger(HMaster.class);
@ -304,6 +308,8 @@ public class HMaster extends HRegionServer implements MasterServices {
public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
private String clusterId;
// Metrics for the HMaster
final MetricsMaster metricsMaster;
// file system manager for the master FS operations
@ -435,7 +441,7 @@ public class HMaster extends HRegionServer implements MasterServices {
* active one.
*/
public HMaster(final Configuration conf) throws IOException {
super(conf);
super(conf, "Master");
try {
if (conf.getBoolean(MAINTENANCE_MODE, false)) {
LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
@ -447,9 +453,10 @@ public class HMaster extends HRegionServer implements MasterServices {
maintenanceMode = false;
}
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(),
this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}",
CommonFSUtils.getRootDir(this.conf),
this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
// Disable usage of meta replicas in the master
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
@ -491,12 +498,10 @@ public class HMaster extends HRegionServer implements MasterServices {
getChoreService().scheduleChore(clusterStatusPublisherChore);
}
}
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
cachedClusterId = new CachedClusterId(this, conf);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
this.rpcServices.start(zooKeeper);
} catch (Throwable t) {
// Make sure we log the exception. HMaster is often started via reflection and the
// cause of failed startup is lost.
@ -519,11 +524,17 @@ public class HMaster extends HRegionServer implements MasterServices {
return conf.get(MASTER_HOSTNAME_KEY);
}
private void registerConfigurationObservers() {
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
}
// Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
// block in here until then.
@Override
public void run() {
try {
registerConfigurationObservers();
Threads.setDaemonThreadRunning(new Thread(() -> {
try {
int infoPort = putUpJettyServer();
@ -538,9 +549,16 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
}), getName() + ":becomeActiveMaster");
// Fall in here even if we have been aborted. Need to run the shutdown services and
// the super run call will do this for us.
super.run();
while (!isStopped() && !isAborted()) {
sleeper.sleep();
}
stopInfoServer();
closeClusterConnection();
stopServiceThreads();
if (this.rpcServices != null) {
this.rpcServices.stop();
}
closeZooKeeper();
} finally {
if (this.clusterSchemaService != null) {
// If on way out, then we are no longer active master.
@ -615,26 +633,16 @@ public class HMaster extends HRegionServer implements MasterServices {
@Override
protected void login(UserProvider user, String host) throws IOException {
try {
super.login(user, host);
user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
} catch (IOException ie) {
user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE,
SecurityConstants.MASTER_KRB_PRINCIPAL, host);
user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE, SecurityConstants.MASTER_KRB_PRINCIPAL,
host);
}
}
/**
* Loop till the server is stopped or aborted.
*/
@Override
protected void waitForMasterActive() {
while (!isStopped() && !isAborted()) {
sleeper.sleep();
}
}
@InterfaceAudience.Private
public MasterRpcServices getMasterRpcServices() {
return (MasterRpcServices)rpcServices;
return rpcServices;
}
public boolean balanceSwitch(final boolean b) throws IOException {
@ -661,13 +669,12 @@ public class HMaster extends HRegionServer implements MasterServices {
return true;
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
protected MasterRpcServices createRpcServices() throws IOException {
return new MasterRpcServices(this);
}
@Override
protected void configureInfoServer() {
protected void configureInfoServer(InfoServer infoServer) {
infoServer.addUnprivilegedServlet("master-status", "/master-status", MasterStatusServlet.class);
infoServer.setAttribute(MASTER, this);
}
@ -860,7 +867,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// always initialize the MemStoreLAB as we use a region to store data in master now, see
// localStore.
initializeMemStoreChunkCreator();
initializeMemStoreChunkCreator(null);
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
@ -1539,7 +1546,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
@Override
protected void stopServiceThreads() {
if (masterJettyServer != null) {
LOG.info("Stopping master jetty server");
@ -1549,9 +1555,8 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.error("Failed to stop master jetty server", e);
}
}
stopChores();
super.stopServiceThreads();
stopChoreService();
stopExecutorService();
if (cleanerPool != null) {
cleanerPool.shutdownNow();
cleanerPool = null;
@ -1680,25 +1685,23 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
private void stopChores() {
if (getChoreService() != null) {
shutdownChore(mobFileCleanerChore);
shutdownChore(mobFileCompactionChore);
shutdownChore(balancerChore);
if (regionNormalizerManager != null) {
shutdownChore(regionNormalizerManager.getRegionNormalizerChore());
}
shutdownChore(clusterStatusChore);
shutdownChore(catalogJanitorChore);
shutdownChore(clusterStatusPublisherChore);
shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner);
shutdownChore(hfileCleaner);
shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
shutdownChore(regionsRecoveryChore);
protected void stopChores() {
shutdownChore(mobFileCleanerChore);
shutdownChore(mobFileCompactionChore);
shutdownChore(balancerChore);
if (regionNormalizerManager != null) {
shutdownChore(regionNormalizerManager.getRegionNormalizerChore());
}
shutdownChore(clusterStatusChore);
shutdownChore(catalogJanitorChore);
shutdownChore(clusterStatusPublisherChore);
shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner);
shutdownChore(hfileCleaner);
shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
shutdownChore(regionsRecoveryChore);
}
/**
@ -2722,16 +2725,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return status;
}
@Override
public Optional<ServerName> getActiveMaster() {
return activeMasterManager.getActiveMasterServerName();
}
@Override
public List<ServerName> getBackupMasters() {
return activeMasterManager.getBackupMasters();
}
/**
* @return info port of active master or 0 if any exception occurs.
*/
@ -2747,11 +2740,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return activeMasterManager.getBackupMasterInfoPort(sn);
}
@Override
public Collection<ServerName> getRegionServers() {
return regionServerTracker.getRegionServers();
}
/**
* The set of loaded coprocessors is stored in a static set. Since it's
* statically allocated, it does not require that HMaster's cpHost be
@ -2842,11 +2830,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
@Override
public ZKWatcher getZooKeeper() {
return zooKeeper;
}
@Override
public MasterCoprocessorHost getMasterCoprocessorHost() {
return cpHost;
@ -2927,15 +2910,18 @@ public class HMaster extends HRegionServer implements MasterServices {
@Override
public void stop(String msg) {
if (!isStopped()) {
super.stop(msg);
if (!this.stopped) {
LOG.info("***** STOPPING master '" + this + "' *****");
this.stopped = true;
LOG.info("STOPPED: " + msg);
// Wakes run() if it is sleeping
sleeper.skipSleepCycle();
if (this.activeMasterManager != null) {
this.activeMasterManager.stop();
}
}
}
@InterfaceAudience.Private
protected void checkServiceStarted() throws ServerNotRunningYetException {
if (!serviceStarted) {
throw new ServerNotRunningYetException("Server is not running yet");
@ -2987,8 +2973,6 @@ public class HMaster extends HRegionServer implements MasterServices {
*
* @return true if master is ready to go, false if not.
*/
@Override
public boolean isOnline() {
return serviceStarted;
}
@ -3003,7 +2987,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return maintenanceMode;
}
@InterfaceAudience.Private
public void setInitialized(boolean isInitialized) {
procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
}
@ -3871,28 +3854,19 @@ public class HMaster extends HRegionServer implements MasterServices {
return this.snapshotQuotaChore;
}
public ActiveMasterManager getActiveMasterManager() {
return activeMasterManager;
}
@Override
public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
return this.syncReplicationReplayWALManager;
}
@Override
public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
return new HashMap<>();
}
public HbckChore getHbckChore() {
return this.hbckChore;
}
@Override
public String getClusterId() {
if (activeMaster) {
return super.getClusterId();
}
return cachedClusterId.getFromCacheOrFetch();
}
@Override
public void runReplicationBarrierCleaner() {
ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
@ -3959,4 +3933,58 @@ public class HMaster extends HRegionServer implements MasterServices {
MasterRegion getMasterRegion() {
return masterRegion;
}
@Override
public void onConfigurationChange(Configuration newConf) {
try {
Superusers.initialize(newConf);
} catch (IOException e) {
LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
}
}
@Override
protected NamedQueueRecorder createNamedQueueRecord() {
final boolean isBalancerDecisionRecording = conf
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
final boolean isBalancerRejectionRecording = conf
.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
return NamedQueueRecorder.getInstance(conf);
} else {
return null;
}
}
@Override
protected boolean clusterMode() {
return true;
}
public String getClusterId() {
if (activeMaster) {
return clusterId;
}
return cachedClusterId.getFromCacheOrFetch();
}
public Optional<ServerName> getActiveMaster() {
return activeMasterManager.getActiveMasterServerName();
}
public List<ServerName> getBackupMasters() {
return activeMasterManager.getBackupMasters();
}
@Override
public Collection<ServerName> getRegionServers() {
return regionServerTracker.getRegionServers();
}
@Override
public List<HRegionLocation> getMetaLocations() {
return metaRegionLocationCache.getMetaRegionLocations();
}
}

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.monitoring.StateDumpServlet;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
import org.apache.hadoop.hbase.util.LogMonitoring;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
@ -102,11 +101,6 @@ public class MasterDumpServlet extends StateDumpServlet {
long tailKb = getTailKbParam(request);
LogMonitoring.dumpTailOfLogs(out, tailKb);
out.println("\n\nRS Queue:");
out.println(LINE);
if (isShowQueueDump(conf)) {
RSDumpServlet.dumpQueue(master, out);
}
out.flush();
}
}

View File

@ -225,7 +225,7 @@ class MetricsRegionServerWrapperImpl
@Override
public long getTotalRequestCount() {
return regionServer.rpcServices.requestCount.sum();
return regionServer.getRpcServices().requestCount.sum();
}
@Override
@ -479,27 +479,27 @@ class MetricsRegionServerWrapperImpl
@Override
public long getRpcGetRequestsCount() {
return regionServer.rpcServices.rpcGetRequestCount.sum();
return regionServer.getRpcServices().rpcGetRequestCount.sum();
}
@Override
public long getRpcScanRequestsCount() {
return regionServer.rpcServices.rpcScanRequestCount.sum();
return regionServer.getRpcServices().rpcScanRequestCount.sum();
}
@Override
public long getRpcFullScanRequestsCount() {
return regionServer.rpcServices.rpcFullScanRequestCount.sum();
return regionServer.getRpcServices().rpcFullScanRequestCount.sum();
}
@Override
public long getRpcMultiRequestsCount() {
return regionServer.rpcServices.rpcMultiRequestCount.sum();
return regionServer.getRpcServices().rpcMultiRequestCount.sum();
}
@Override
public long getRpcMutateRequestsCount() {
return regionServer.rpcServices.rpcMutateRequestCount.sum();
return regionServer.getRpcServices().rpcMutateRequestCount.sum();
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@ -46,7 +47,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.OOMEChecker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@ -428,7 +430,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final void uncaughtException(Thread t, Throwable e,
ReplicationSourceManager manager, String peerId) {
RSRpcServices.exitIfOOME(e);
OOMEChecker.exitIfOOME(e, getClass().getSimpleName());
LOG.error("Unexpected exception in {} currentPath={}",
t.getName(), getCurrentPath(), e);
if(abortOnError){

View File

@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.Cell;
@ -51,6 +50,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)

View File

@ -29,6 +29,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -76,4 +78,12 @@ public class NettyEventLoopGroupConfig {
public Class<? extends Channel> clientChannelClass() {
return clientChannelClass;
}
public static NettyEventLoopGroupConfig setup(Configuration conf, String threadPoolName) {
// Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
NettyEventLoopGroupConfig nelgc = new NettyEventLoopGroupConfig(conf, threadPoolName);
NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
return nelgc;
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public final class OOMEChecker {
private static final Logger LOG = LoggerFactory.getLogger(OOMEChecker.class);
private OOMEChecker() {
}
public static boolean exitIfOOME(Throwable e, String service) {
boolean stop = false;
try {
if (e instanceof OutOfMemoryError ||
(e.getCause() != null && e.getCause() instanceof OutOfMemoryError) ||
(e.getMessage() != null && e.getMessage().contains("java.lang.OutOfMemoryError"))) {
stop = true;
LOG.error(HBaseMarkers.FATAL, "Run out of memory; {} will abort itself immediately",
service, e);
}
} finally {
if (stop) {
Runtime.getRuntime().halt(1);
}
}
return stop;
}
}

View File

@ -93,7 +93,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.logging.Log4jUtils;
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
import org.apache.hadoop.hbase.master.HMaster;
@ -2941,17 +2940,6 @@ public class HBaseTestingUtil extends HBaseZKTestingUtil {
// That's fine.
}
}
for (MasterThread mt : cluster.getLiveMasterThreads()) {
try {
for (RegionInfo region : ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
online.add(region.getRegionNameAsString());
}
} catch (RegionServerStoppedException e) {
// That's fine.
} catch (ServerNotRunningYetException e) {
// That's fine.
}
}
return online;
}

View File

@ -806,15 +806,6 @@ public class SingleProcessHBaseCluster extends HBaseClusterInterface {
@Override
public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
throws IOException {
// Assume there is only one master thread which is the active master.
// If there are multiple master threads, the backup master threads
// should hold some regions. Please refer to #countServedRegions
// to see how we find out all regions.
HMaster master = getMaster();
Region region = master.getOnlineRegion(regionName);
if (region != null) {
return master.getServerName();
}
int index = getServerWith(regionName);
if (index < 0) {
return null;
@ -833,9 +824,6 @@ public class SingleProcessHBaseCluster extends HBaseClusterInterface {
for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
count += rst.getRegionServer().getNumberOfOnlineRegions();
}
for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
count += mt.getMaster().getNumberOfOnlineRegions();
}
return count;
}

View File

@ -113,13 +113,8 @@ public class TestMetaUpdatesGoToPriorityQueue {
// find the meta server
SingleProcessHBaseCluster cluster = UTIL.getMiniHBaseCluster();
int rsIndex = cluster.getServerWithMeta();
HRegionServer rs;
if (rsIndex >= 0) {
rs = cluster.getRegionServer(rsIndex);
} else {
// it is in master
rs = cluster.getMaster();
}
assertTrue(rsIndex >= 0);
HRegionServer rs = cluster.getRegionServer(rsIndex);
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
long time = EnvironmentEdgeManager.currentTime();

View File

@ -133,7 +133,7 @@ public class TestSeparateClientZKCluster {
HMaster master = cluster.getMaster();
master.stopMaster();
LOG.info("Stopped master {}", master.getServerName());
while (!master.isShutDown()) {
while (master.isAlive()) {
Thread.sleep(200);
}
LOG.info("Shutdown master {}", master.getServerName());
@ -225,7 +225,7 @@ public class TestSeparateClientZKCluster {
int metaServerId = cluster.getServerWithMeta();
HRegionServer metaServer = cluster.getRegionServer(metaServerId);
metaServer.stop("Stop current RS holding meta region");
while (!metaServer.isShutDown()) {
while (metaServer.isAlive()) {
Thread.sleep(200);
}
// wait for meta region online

View File

@ -290,7 +290,6 @@ public class TestTableFavoredNodes {
* 3. Is the FN information consistent between Master and the respective RegionServer?
*/
private void checkIfFavoredNodeInformationIsCorrect(TableName tableName) throws Exception {
/*
* Since we need HRegionServer to check for consistency of FN between Master and RS,
* lets construct a map for each serverName lookup. Makes it easy later.
@ -300,11 +299,6 @@ public class TestTableFavoredNodes {
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
snRSMap.put(rst.getRegionServer().getServerName(), rst.getRegionServer());
}
// Also include master, since it can also host user regions.
for (JVMClusterUtil.MasterThread rst :
TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads()) {
snRSMap.put(rst.getMaster().getServerName(), rst.getMaster());
}
int dnPort = FavoredNodeAssignmentHelper.getDataNodePort(TEST_UTIL.getConfiguration());
RegionLocator regionLocator = admin.getConnection().getRegionLocator(tableName);

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.AfterClass;
@ -43,8 +42,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@ -55,8 +52,6 @@ public class TestMasterFifoRpcScheduler {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class);
private static final String REGION_SERVER_REPORT = "RegionServerReport";
private static final String OTHER = "Other";
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
@ -64,7 +59,7 @@ public class TestMasterFifoRpcScheduler {
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
conf.set(MasterRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
"org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2);

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delet
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@ -78,6 +78,7 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@ -384,27 +385,6 @@ public abstract class AbstractTestDLS {
putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
}
}
for (MasterThread mt : cluster.getLiveMasterThreads()) {
HRegionServer hrs = mt.getMaster();
List<RegionInfo> hris;
try {
hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
} catch (ServerNotRunningYetException e) {
// It's ok: this master may be a backup. Ignored.
continue;
}
for (RegionInfo hri : hris) {
if (hri.getTable().isSystemTable()) {
continue;
}
LOG.debug(
"adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
Region region = hrs.getOnlineRegion(hri.getRegionName());
assertTrue(region != null);
putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
}
}
}
public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)

View File

@ -93,18 +93,16 @@ public class TestGetLastFlushedSequenceId {
}
assertNotNull(region);
Thread.sleep(2000);
RegionStoreSequenceIds ids =
testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
RegionStoreSequenceIds ids = testUtil.getHBaseCluster().getMaster().getServerManager()
.getLastFlushedSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId());
// This will be the sequenceid just before that of the earliest edit in memstore.
long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId();
assertTrue(storeSequenceId > 0);
testUtil.getAdmin().flush(tableName);
Thread.sleep(2000);
ids =
testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
ids = testUtil.getHBaseCluster().getMaster().getServerManager()
.getLastFlushedSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
assertTrue(ids.getLastFlushedSequenceId() + " > " + storeSequenceId,
ids.getLastFlushedSequenceId() > storeSequenceId);
assertEquals(ids.getLastFlushedSequenceId(), ids.getStoreSequenceId(0).getSequenceId());

View File

@ -56,11 +56,6 @@ public class TestGetReplicationLoad {
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
super(conf);
}
@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
// do nothing
}
}
@BeforeClass

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -74,12 +73,7 @@ public class TestMasterMetrics {
}
@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
// do nothing
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
protected MasterRpcServices createRpcServices() throws IOException {
return new MasterRpcServices(this) {
@Override

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertFalse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({MasterTests.class, MediumTests.class})
public class TestMasterNotCarryTable {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterNotCarryTable.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterNotCarryTable.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static HMaster master;
@BeforeClass
public static void setUp() throws Exception {
Configuration c = UTIL.getConfiguration();
// We use local filesystem. Set it so it writes into the testdir.
CommonFSUtils.setRootDir(c, UTIL.getDataTestDir());
UTIL.startMiniZKCluster();
master = new HMaster(UTIL.getConfiguration());
master.start();
// As no regionservers, only wait master to create AssignmentManager.
while (master.getAssignmentManager() != null) {
LOG.debug("Wait master to create AssignmentManager");
Thread.sleep(1000);
}
}
@AfterClass
public static void tearDown() throws Exception {
master.stop("Shutdown");
UTIL.shutdownMiniZKCluster();
}
@Test
public void testMasterBlockCache() {
// no need to instantiate block cache.
assertFalse(master.getBlockCache().isPresent());
}
@Test
public void testMasterMOBFileCache() {
// no need to instantiate mob file cache.
assertFalse(master.getMobFileCache().isPresent());
}
}

View File

@ -103,8 +103,5 @@ public class TestMasterQosFunction extends QosTestBase {
@Test
public void testAnnotations() {
checkMethod(conf, "GetLastFlushedSequenceId", HConstants.ADMIN_QOS, qosFunction);
checkMethod(conf, "CompactRegion", HConstants.ADMIN_QOS, qosFunction);
checkMethod(conf, "GetLastFlushedSequenceId", HConstants.ADMIN_QOS, qosFunction);
checkMethod(conf, "GetRegionInfo", HConstants.ADMIN_QOS, qosFunction);
}
}

View File

@ -261,9 +261,6 @@ public class TestRollingRestart {
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
numFound += rst.getRegionServer().getNumberOfOnlineRegions();
}
for (MasterThread mt : cluster.getMasterThreads()) {
numFound += mt.getMaster().getNumberOfOnlineRegions();
}
return numFound;
}

View File

@ -107,7 +107,7 @@ public class TestWakeUpUnexpectedProcedure {
ExecuteProceduresRequest request) throws ServiceException {
if (request.getOpenRegionCount() > 0) {
if (ARRIVE_EXEC_PROC != null) {
SERVER_TO_KILL = regionServer.getServerName();
SERVER_TO_KILL = getServer().getServerName();
ARRIVE_EXEC_PROC.countDown();
ARRIVE_EXEC_PROC = null;
try {

View File

@ -41,13 +41,10 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerWrapperStub;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tmpl.master.MasterStatusTmpl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.Before;
@ -112,18 +109,9 @@ public class TestMasterStatusServlet {
Mockito.doReturn("fakequorum").when(zkw).getQuorum();
Mockito.doReturn(zkw).when(master).getZooKeeper();
// Fake MasterAddressTracker
MasterAddressTracker tracker = Mockito.mock(MasterAddressTracker.class);
Mockito.doReturn(tracker).when(master).getMasterAddressTracker();
Mockito.doReturn(FAKE_HOST).when(tracker).getMasterAddress();
// Fake ActiveMaster
Mockito.doReturn(Optional.of(FAKE_HOST)).when(master).getActiveMaster();
MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class);
Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper();
Mockito.doReturn(rms).when(master).getMetrics();
// Mock admin
admin = Mockito.mock(Admin.class);
}

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.namequeues;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -93,9 +92,7 @@ public class TestSlowLogAccessor {
@Before
public void setUp() throws Exception {
HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder");
slowLogRecorder.setAccessible(true);
this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer);
this.namedQueueRecorder = hRegionServer.getNamedQueueRecorder();
}
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(

View File

@ -178,7 +178,6 @@ public class TestMultiLogThreshold {
}
private void assertLogBatchWarnings(boolean expected) {
assertFalse(logs.isEmpty());
boolean actual = false;
for (LevelAndMessage event : logs) {
if (event.level == org.apache.logging.log4j.Level.WARN &&

View File

@ -125,7 +125,7 @@ public class TestMutateRowsRecovery {
// Send the RS Load to ensure correct lastflushedseqid for stores
rs1.tryRegionServerReport(now - 30000, now);
// Kill the RS to trigger wal replay
cluster.killRegionServer(rs1.serverName);
cluster.killRegionServer(rs1.getServerName());
// Ensure correct data exists
Get g1 = new Get(row1);

View File

@ -529,7 +529,7 @@ public class TestRegionMergeTransactionOnCluster {
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
protected MasterRpcServices createRpcServices() throws IOException {
return new MyMasterRpcServices(this);
}
}

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -121,12 +120,6 @@ public class TestRegionServerAbort {
StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(className);
cp.setStopAllowed(true);
}
HMaster master = cluster.getMaster();
RegionServerCoprocessorHost host = master.getRegionServerCoprocessorHost();
if (host != null) {
StopBlockingRegionObserver obs = (StopBlockingRegionObserver) host.findCoprocessor(className);
if (obs != null) obs.setStopAllowed(true);
}
testUtil.shutdownMiniCluster();
}

View File

@ -161,7 +161,7 @@ public class TestRegionServerNoMaster {
throws Exception {
AdminProtos.OpenRegionRequest orr =
RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, null);
AdminProtos.OpenRegionResponse responseOpen = rs.rpcServices.openRegion(null, orr);
AdminProtos.OpenRegionResponse responseOpen = rs.getRpcServices().openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
@ -184,7 +184,7 @@ public class TestRegionServerNoMaster {
throws Exception {
AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
rs.getServerName(), hri.getRegionName());
AdminProtos.CloseRegionResponse responseClose = rs.rpcServices.closeRegion(null, crr);
AdminProtos.CloseRegionResponse responseClose = rs.getRpcServices().closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
checkRegionIsClosed(HTU, rs, hri);
}
@ -209,7 +209,7 @@ public class TestRegionServerNoMaster {
// no transition in ZK
AdminProtos.CloseRegionRequest crr =
ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), regionName);
AdminProtos.CloseRegionResponse responseClose = getRS().rpcServices.closeRegion(null, crr);
AdminProtos.CloseRegionResponse responseClose = getRS().getRpcServices().closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
// now waiting & checking. After a while, the transition should be done and the region closed
@ -227,11 +227,12 @@ public class TestRegionServerNoMaster {
public void testMultipleCloseFromMaster() throws Exception {
for (int i = 0; i < 10; i++) {
AdminProtos.CloseRegionRequest crr =
ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), regionName, null);
ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), regionName, null);
try {
AdminProtos.CloseRegionResponse responseClose = getRS().rpcServices.closeRegion(null, crr);
AdminProtos.CloseRegionResponse responseClose =
getRS().getRpcServices().closeRegion(null, crr);
Assert.assertTrue("request " + i + " failed",
responseClose.getClosed() || responseClose.hasClosed());
responseClose.getClosed() || responseClose.hasClosed());
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException se) {
Assert.assertTrue("The next queries may throw an exception.", i > 0);
}
@ -258,7 +259,7 @@ public class TestRegionServerNoMaster {
AdminProtos.CloseRegionRequest crr =
ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), regionName);
try {
getRS().rpcServices.closeRegion(null, crr);
getRS().getRpcServices().closeRegion(null, crr);
Assert.assertTrue(false);
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException expected) {
}
@ -268,9 +269,9 @@ public class TestRegionServerNoMaster {
hri.getEncodedNameAsBytes()));
// Let's start the open handler
TableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
TableDescriptor htd = getRS().getTableDescriptors().get(hri.getTable());
getRS().executorService.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1));
getRS().getExecutorService().submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed(HTU, getRS(), hri);

View File

@ -111,7 +111,7 @@ public class TestScannerRPCScanMetrics {
scanNextIterate(ht, dummyScan);
RSRpcServices testClusterRSRPCServices = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.rpcServices;
.getRpcServices();
assertEquals(4, testClusterRSRPCServices.rpcFullScanRequestCount.intValue());
}

View File

@ -1032,7 +1032,7 @@ public class TestSplitTransactionOnCluster {
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
protected MasterRpcServices createRpcServices() throws IOException {
return new MyMasterRpcServices(this);
}
}

View File

@ -109,7 +109,7 @@ public class TestReplicationStatus extends TestReplicationBase {
// Stop rs1, then the queue of rs1 will be transfered to rs0
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(1);
hrs.stop("Stop RegionServer");
while(!hrs.isShutDown()) {
while(hrs.isAlive()) {
Threads.sleep(100);
}
// To be sure it dead and references cleaned up. TODO: Change this to a barrier.

View File

@ -75,11 +75,11 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.logging.Log4jUtils;
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
import org.apache.hadoop.hbase.master.HMaster;
@ -3415,18 +3414,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
// That's fine.
}
}
for (MasterThread mt : cluster.getLiveMasterThreads()) {
try {
for (RegionInfo region :
ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
online.add(region.getRegionNameAsString());
}
} catch (RegionServerStoppedException e) {
// That's fine.
} catch (ServerNotRunningYetException e) {
// That's fine.
}
}
return online;
}

View File

@ -858,16 +858,7 @@ public class MiniHBaseCluster extends HBaseCluster {
@Override
public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
throws IOException {
// Assume there is only one master thread which is the active master.
// If there are multiple master threads, the backup master threads
// should hold some regions. Please refer to #countServedRegions
// to see how we find out all regions.
HMaster master = getMaster();
Region region = master.getOnlineRegion(regionName);
if (region != null) {
return master.getServerName();
}
throws IOException {
int index = getServerWith(regionName);
if (index < 0) {
return null;
@ -886,9 +877,6 @@ public class MiniHBaseCluster extends HBaseCluster {
for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
count += rst.getRegionServer().getNumberOfOnlineRegions();
}
for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
count += mt.getMaster().getNumberOfOnlineRegions();
}
return count;
}