HBASE-18846 Accommodate the hbase-indexer/lily/SEP consumer deploy-type

Patch to start a standalone RegionServer that register's itself and
optionally stands up Services. Can work w/o a Master in the mix.
Useful testing. Also can be used by hbase-indexer to put up a
Replication sink that extends public-facing APIs w/o need to extend
internals. See JIRA release note for detail.

This patch adds booleans for whether to start Admin and Client Service.
Other refactoring moves all thread and service start into the one fat
location so we can ask to by-pass 'services' if we don't need them.
See JIRA for an example hbase-server.xml that has config to shutdown
WAL, cache, etc.

Adds checks if a service/thread has been setup before going to use it.

Renames the ExecutorService in HRegionServer from service to
executorService.

See JIRA too for example Connection implementation that makes use of
Connection plugin point to receive a replication stream. The default
replication sink catches the incoming replication stream, undoes the
WALEdits and then creates a Table to call a batch with the
edits; up on JIRA, an example Connection plugin (legit, supported)
returns a Table with an overridden batch method where in we do index
inserts returning appropriate results to keep the replication engine
ticking over.

Upsides: an unadulterated RegionServer that will keep replication metrics
and even hosts a web UI if wanted. No hacks. Just ordained configs
shutting down unused services. Injection of the indexing function at a
blessed point with no pollution by hbase internals; only public imports.
No user of Private nor LimitedPrivate classes.
This commit is contained in:
Michael Stack 2017-09-26 22:27:58 -07:00
parent 37b29e909d
commit 456057ef90
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
4 changed files with 493 additions and 406 deletions

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -99,7 +99,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@ -472,6 +471,7 @@ public class HMaster extends HRegionServer implements MasterServices {
public HMaster(final Configuration conf, CoordinatedStateManager csm)
throws IOException, KeeperException {
super(conf, csm);
try {
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
@ -533,6 +533,12 @@ public class HMaster extends HRegionServer implements MasterServices {
} else {
activeMasterManager = null;
}
} catch (Throwable t) {
// Make sure we log the exception. HMaster is often started via reflection and the
// cause of failed startup is lost.
LOG.error("Failed construction of Master", t);
throw t;
}
}
// Main run loop. Calls through to the regionserver run loop.
@ -1100,15 +1106,15 @@ public class HMaster extends HRegionServer implements MasterServices {
*/
private void startServiceThreads() throws IOException{
// Start the executor service pools
this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
conf.getInt("hbase.master.executor.openregion.threads", 5));
this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
// We depend on there being only one instance of this executor running
@ -1116,7 +1122,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// tables.
// Any time changing this maxThreads to > 1, pls see the comment at
// AccessController#postCompletedCreateTableAction
this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor();
// Start log cleaner thread

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -234,16 +233,10 @@ public class HRegionServer extends HasThread implements
/**
* For testing only! Set to true to skip notifying region assignment to master .
*/
@VisibleForTesting
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
/*
* Strings to be used in forming the exception message for
* RegionsAlreadyInTransitionException.
*/
protected static final String OPEN = "OPEN";
protected static final String CLOSE = "CLOSE";
//RegionName vs current action in progress
//true - if open region action in progress
//false - if close region action in progress
@ -315,8 +308,8 @@ public class HRegionServer extends HasThread implements
// Leases
protected Leases leases;
// Instance of the hbase executor service.
protected ExecutorService service;
// Instance of the hbase executor executorService.
protected ExecutorService executorService;
// If false, the file system has become unavailable
protected volatile boolean fsOk;
@ -380,7 +373,7 @@ public class HRegionServer extends HasThread implements
/**
* ChoreService used to schedule tasks that we want to run periodically
*/
private final ChoreService choreService;
private ChoreService choreService;
/*
* Check for compactions requests.
@ -396,7 +389,7 @@ public class HRegionServer extends HasThread implements
// WAL roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected final LogRoller walRoller;
protected LogRoller walRoller;
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@ -532,6 +525,15 @@ public class HRegionServer extends HasThread implements
private final NettyEventLoopGroupConfig eventLoopGroupConfig;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
* HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
* purpose is as a Replication-stream sink; see HBASE-18846 for more.
*/
private final boolean masterless;
static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
/**
* Starts a HRegionServer at the default location.
*/
@ -541,26 +543,24 @@ public class HRegionServer extends HasThread implements
/**
* Starts a HRegionServer at the default location
*
* @param csm implementation of CoordinatedStateManager to be used
*/
// Don't start any services or managers in here in the Consructor.
// Defer till after we register with the Master as much as possible. See #startServices.
public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException {
super("RegionServer"); // thread name
try {
this.startcode = System.currentTimeMillis();
this.fsOk = true;
this.conf = conf;
// initialize netty event loop group at the very beginning as we may use it to start rpc server,
// rpc client and WAL.
this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
eventLoopGroupConfig.clientChannelClass());
NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
eventLoopGroupConfig.clientChannelClass());
this.fsOk = true;
this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
this.eventLoopGroupConfig = setupNetty(this.conf);
MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf);
FSUtils.setupShortCircuitRead(this.conf);
Replication.decorateRegionServerConfiguration(this.conf);
// Disable usage of meta replicas in the regionserver
@ -576,15 +576,12 @@ public class HRegionServer extends HasThread implements
boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
this.numRegionsToReport = conf.getInt(
"hbase.regionserver.numregionstoreport", 10);
this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10);
this.operationTimeout = conf.getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.shortOperationTimeout = conf.getInt(
HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
this.abortRequested = false;
@ -606,9 +603,9 @@ public class HRegionServer extends HasThread implements
}
}
}
String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
rpcServices.isa.getHostName();
serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
String hostName = shouldUseThisHostnameInstead() ?
this.useThisHostnameInstead : this.rpcServices.isa.getHostName();
serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
@ -628,21 +625,24 @@ public class HRegionServer extends HasThread implements
uncaughtExceptionHandler = new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
abort("Uncaught exception in service thread " + t.getName(), e);
abort("Uncaught exception in executorService thread " + t.getName(), e);
}
};
initializeFileSystem();
service = new ExecutorService(getServerName().toShortString());
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
this.configurationManager = new ConfigurationManager();
setupWindows(getConfiguration(), getConfigurationManager());
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
// Open connection to zookeeper and set primary watcher
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
rpcServices.isa.getPort(), this, canCreateBaseZNode());
// If no master in cluster, skip trying to track one or look for a cluster status.
if (!this.masterless) {
this.csm = (BaseCoordinatedStateManager) csm;
this.csm.initialize(this);
this.csm.start();
@ -653,33 +653,46 @@ public class HRegionServer extends HasThread implements
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
}
this.configurationManager = new ConfigurationManager();
rpcServices.start();
putUpWebUI();
this.walRoller = new LogRoller(this, this);
}
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
// and executor being created and takes a different startup route. Lots of overlap between HRS
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
// Master expects Constructor to put up web servers. Ugh.
// class HRS. TODO.
this.choreService = new ChoreService(getServerName().toString(), true);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
this.executorService = new ExecutorService(getServerName().toShortString());
this.rpcServices.start();
putUpWebUI();
} catch (Throwable t) {
// Make sure we log the exception. HRegionServer is often started via reflection and the
// cause of failed startup is lost.
LOG.error("Failed construction RegionServer", t);
throw t;
}
}
/**
* If running on Windows, do windows-specific setup.
*/
private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
if (!SystemUtils.IS_OS_WINDOWS) {
Signal.handle(new Signal("HUP"), new SignalHandler() {
@Override
public void handle(Signal signal) {
getConfiguration().reloadConfiguration();
configurationManager.notifyAllObservers(getConfiguration());
conf.reloadConfiguration();
cm.notifyAllObservers(conf);
}
});
}
// Create the CompactedFileDischarger chore service. This chore helps to
// remove the compacted files
// that will no longer be used in reads.
// Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
// 2 mins so that compacted files can be archived before the TTLCleaner runs
int cleanerInterval =
conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
this.compactedFileDischarger =
new CompactedHFilesDischarger(cleanerInterval, this, this);
choreService.scheduleChore(compactedFileDischarger);
}
private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
// Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
NettyEventLoopGroupConfig nelgc =
new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
return nelgc;
}
private void initializeFileSystem() throws IOException {
@ -731,7 +744,7 @@ public class HRegionServer extends HasThread implements
}
protected boolean canCreateBaseZNode() {
return false;
return this.masterless;
}
protected boolean canUpdateTableDescriptor() {
@ -754,28 +767,28 @@ public class HRegionServer extends HasThread implements
@Override
public boolean registerService(com.google.protobuf.Service instance) {
/*
* No stacking of instances is allowed for a single service name
* No stacking of instances is allowed for a single executorService name
*/
com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
instance.getDescriptorForType();
String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
if (coprocessorServiceHandlers.containsKey(serviceName)) {
LOG.error("Coprocessor service " + serviceName
LOG.error("Coprocessor executorService " + serviceName
+ " already registered, rejecting request from " + instance);
return false;
}
coprocessorServiceHandlers.put(serviceName, instance);
if (LOG.isDebugEnabled()) {
LOG.debug("Registered regionserver coprocessor service: service=" + serviceName);
LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName);
}
return true;
}
/**
* Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
* the local server. Safe to use going to local or remote server.
* Create this instance in a method can be intercepted and mocked in tests.
* the local server; i.e. a short-circuit Connection. Safe to use going to local or remote
* server. Create this instance in a method can be intercepted and mocked in tests.
* @throws IOException
*/
@VisibleForTesting
@ -821,6 +834,8 @@ public class HRegionServer extends HasThread implements
/**
* All initialization needed before we go register with Master.
* Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.
* In here we just put up the RpcServer, setup Connection, and ZooKeeper.
*
* @throws IOException
* @throws InterruptedException
@ -828,21 +843,10 @@ public class HRegionServer extends HasThread implements
private void preRegistrationInitialization() {
try {
setupClusterConnection();
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
this.secureBulkLoadManager.start();
// Health checker thread.
if (isHealthCheckerConfigured()) {
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}
initializeZooKeeper();
if (!isStopped() && !isAborted()) {
initializeThreads();
}
// Setup RPC client for master communication
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
} catch (Throwable t) {
// Call stop if error or process will stick around for ever since server
// puts up non-daemon threads.
@ -862,6 +866,9 @@ public class HRegionServer extends HasThread implements
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
justification="cluster Id znode read would give us correct response")
private void initializeZooKeeper() throws IOException, InterruptedException {
// Nothing to do in here if no Master in the mix.
if (this.masterless) return;
// Create the master address tracker, register with zk, and start it. Then
// block until a master is available. No point in starting up if no master
// running.
@ -934,67 +941,11 @@ public class HRegionServer extends HasThread implements
}
/**
* @return False if cluster shutdown in progress
* @return True if the cluster is up.
*/
private boolean isClusterUp() {
return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
}
private void initializeThreads() throws IOException {
// Cache flushing thread.
this.cacheFlusher = new MemStoreFlusher(conf, this);
// Compaction thread
this.compactSplitThread = new CompactSplit(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this);
this.leases = new Leases(this.threadWakeFrequency);
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.create(this);
if (this.nonceManager != null) {
// Create the scheduled chore that cleans up nonces.
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
}
// Setup the Quota Manager
rsQuotaManager = new RegionServerRpcQuotaManager(this);
rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
if (QuotaUtil.isQuotaEnabled(conf)) {
this.fsUtilizationChore = new FileSystemUtilizationChore(this);
}
// Setup RPC client for master communication
rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
boolean onlyMetaRefresh = false;
int storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if (storefileRefreshPeriod == 0) {
storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
onlyMetaRefresh = true;
}
if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh, this, this);
}
registerConfigurationObservers();
}
private void registerConfigurationObservers() {
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
return this.masterless ||
this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp();
}
/**
@ -1019,6 +970,7 @@ public class HRegionServer extends HasThread implements
// Try and register with the Master; tell it we are here. Break if
// server is stopped or the clusterup flag is down or hdfs went wacky.
// Once registered successfully, go ahead and start up all Services.
while (keepLooping()) {
RegionServerStartupResponse w = reportForDuty();
if (w == null) {
@ -1033,11 +985,16 @@ public class HRegionServer extends HasThread implements
if (!isStopped() && isHealthy()) {
// start the snapshot handler and other procedure handlers,
// since the server is ready to run
rspmHost.start();
if (this.rspmHost != null) {
this.rspmHost.start();
}
// Start the Quota Manager
if (this.rsQuotaManager != null) {
rsQuotaManager.start(getRpcServer().getScheduler());
rsSpaceQuotaManager.start();
}
if (this.rsSpaceQuotaManager != null) {
this.rsSpaceQuotaManager.start();
}
}
// We registered with the Master. Go into run mode.
@ -1176,8 +1133,7 @@ public class HRegionServer extends HasThread implements
if (!this.killed && this.fsOk) {
waitOnAllRegionsToClose(abortRequested);
LOG.info("stopping server " + this.serverName +
"; all regions closed.");
LOG.info("stopping server " + this.serverName + "; all regions closed.");
}
//fsOk flag may be changed when closing regions throws exception.
@ -1223,8 +1179,7 @@ public class HRegionServer extends HasThread implements
if (this.zooKeeper != null) {
this.zooKeeper.close();
}
LOG.info("stopping server " + this.serverName +
"; zookeeper connection closed.");
LOG.info("stopping server " + this.serverName + "; zookeeper connection closed.");
LOG.info(Thread.currentThread().getName() + " exiting");
}
@ -1267,8 +1222,7 @@ public class HRegionServer extends HasThread implements
ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
try {
RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
ServerName sn = ServerName.parseVersionedServerName(
this.serverName.getVersionedBytes());
ServerName sn = ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
request.setServer(ProtobufUtil.toServerName(sn));
request.setLoad(sl);
rss.regionServerReport(null, request.build());
@ -1379,8 +1333,7 @@ public class HRegionServer extends HasThread implements
maxMemory = usage.getMax();
}
ClusterStatusProtos.ServerLoad.Builder serverLoad =
ClusterStatusProtos.ServerLoad.newBuilder();
ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
@ -1414,7 +1367,7 @@ public class HRegionServer extends HasThread implements
serverLoad.setInfoServerPort(-1);
}
// for the replicationLoad purpose. Only need to get from one service
// for the replicationLoad purpose. Only need to get from one executorService
// either source or sink will get the same info
ReplicationSourceService rsources = getReplicationSourceService();
@ -1472,8 +1425,8 @@ public class HRegionServer extends HasThread implements
// iterator of onlineRegions to close all user regions.
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
RegionInfo hri = e.getValue().getRegionInfo();
if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
&& !closedRegions.contains(hri.getEncodedName())) {
if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
!closedRegions.contains(hri.getEncodedName())) {
closedRegions.add(hri.getEncodedName());
// Don't update zk with this close transition; pass false.
closeRegionIgnoreErrors(hri, abort);
@ -1539,8 +1492,8 @@ public class HRegionServer extends HasThread implements
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue();
this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
rpcServices.isa.getPort(), this.startcode);
this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
this.startcode);
if (shouldUseThisHostnameInstead() &&
!hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
String msg = "Master passed us a different hostname to use; was=" +
@ -1580,13 +1533,13 @@ public class HRegionServer extends HasThread implements
// hack! Maps DFSClient => RegionServer for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it.
if (this.conf.get("mapreduce.task.attempt.id") == null) {
this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
this.serverName.toString());
this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
}
// Save it in a file, this will allow to see if we crash
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
// This call sets up an initialized replication and WAL. Later we start it up.
setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
@ -1595,12 +1548,17 @@ public class HRegionServer extends HasThread implements
this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
pauseMonitor.start();
startServiceThreads();
startHeapMemoryManager();
// Call it after starting HeapMemoryManager.
initializeMemStoreChunkCreator();
LOG.info("Serving as " + this.serverName +
", RpcServer on " + rpcServices.isa +
// There is a rare case where we do NOT want services to start. Check config.
if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
startServices();
}
// In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
// or make sense of it.
startReplicationService();
// Set up ZK
LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
@ -1627,8 +1585,8 @@ public class HRegionServer extends HasThread implements
long globalMemStoreSize = pair.getFirst();
boolean offheap = this.regionServerAccounting.isOffheap();
// When off heap memstore in use, take full area for chunk pool.
float poolSizePercentage = offheap ? 1.0F
: conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
float poolSizePercentage = offheap? 1.0F:
conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
@ -1652,8 +1610,7 @@ public class HRegionServer extends HasThread implements
rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
getMyEphemeralNodePath(), data);
ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
}
private void deleteMyEphemeralNode() throws KeeperException {
@ -1756,8 +1713,7 @@ public class HRegionServer extends HasThread implements
// immediately upon region server startup
private long iteration = 1;
CompactionChecker(final HRegionServer h, final int sleepTime,
final Stoppable stopper) {
CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
super("CompactionChecker", stopper, sleepTime);
this.instance = h;
LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
@ -1793,7 +1749,8 @@ public class HRegionServer extends HasThread implements
if (majorCompactPriority == DEFAULT_PRIORITY ||
majorCompactPriority > hr.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(hr, s,
getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
getName() + " requests major compaction; use default priority",
Store.NO_PRIORITY,
CompactionLifeCycleTracker.DUMMY, null);
} else {
this.instance.compactSplitThread.requestCompaction(hr, s,
@ -1844,7 +1801,7 @@ public class HRegionServer extends HasThread implements
/**
* Report the status of the server. A server is online once all the startup is
* completed (setting up filesystem, starting service threads, etc.). This
* completed (setting up filesystem, starting executorService threads, etc.). This
* method is designed mostly to be useful in tests.
*
* @return true if online, false if not.
@ -1856,6 +1813,7 @@ public class HRegionServer extends HasThread implements
/**
* Setup WAL log and replication if enabled.
* Replication setup is done in here because it wants to be hooked up to WAL.
*
* @throws IOException
*/
private void setupWALAndReplication() throws IOException {
@ -1870,8 +1828,8 @@ public class HRegionServer extends HasThread implements
"created directory at " + this.serverName.toString());
}
// Instantiate replication manager if replication enabled. Pass it the
// log directories.
// Instantiate replication if replication enabled. Pass it the log directories.
// In here we create the Replication instances. Later they are initialized and started up.
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
// listeners the wal factory will add to wals it creates.
@ -1887,8 +1845,9 @@ public class HRegionServer extends HasThread implements
// We use WALActionsListener to get the newly rolled WALs, so we need to get the
// WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then
// ReplicationSourceHandler need to use WALFactory get the length of the wal file being written.
// So we here we need to construct WALFactory first, and then pass it to the initialize method
// So we here we need to construct WALFactory first, and then pass it to the initialized method
// of ReplicationSourceHandler.
// TODO: I can't follow replication; it has initialize and then later on we start it!
WALFactory factory = new WALFactory(conf, listeners, serverName.toString());
this.walFactory = factory;
if (this.replicationSourceHandler != null) {
@ -1900,6 +1859,25 @@ public class HRegionServer extends HasThread implements
}
}
/**
* Start up replication source and sink handlers.
* @throws IOException
*/
private void startReplicationService() throws IOException {
if (this.replicationSourceHandler == this.replicationSinkHandler &&
this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else {
if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
}
if (this.replicationSinkHandler != null) {
this.replicationSinkHandler.startReplicationService();
}
}
}
public MetricsRegionServer getRegionServerMetrics() {
return this.metricsRegionServer;
}
@ -1913,6 +1891,8 @@ public class HRegionServer extends HasThread implements
/*
* Start maintenance Threads, Server, Worker and lease checker threads.
* Start all threads we need to run. This is called after we've successfully
* registered with the Master.
* Install an UncaughtExceptionHandler that calls abort of RegionServer if we
* get an unhandled exception. We cannot set the handler on all threads.
* Server's internal Listener thread is off limits. For Server, if an OOME, it
@ -1923,29 +1903,56 @@ public class HRegionServer extends HasThread implements
* keeps its own internal stop mechanism so needs to be stopped by this
* hosting server. Worker logs the exception and exits.
*/
private void startServiceThreads() throws IOException {
private void startServices() throws IOException {
if (!isStopped() && !isAborted()) {
initializeThreads();
}
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
this.secureBulkLoadManager.start();
// Health checker thread.
if (isHealthCheckerConfigured()) {
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}
this.walRoller = new LogRoller(this, this);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
// Create the CompactedFileDischarger chore executorService. This chore helps to
// remove the compacted files
// that will no longer be used in reads.
// Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
// 2 mins so that compacted files can be archived before the TTLCleaner runs
int cleanerInterval =
conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
this.compactedFileDischarger =
new CompactedHFilesDischarger(cleanerInterval, this, this);
choreService.scheduleChore(compactedFileDischarger);
// Start executor services
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
}
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
// Start the threads for compacted files discharger
this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
}
@ -1967,18 +1974,6 @@ public class HRegionServer extends HasThread implements
Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
uncaughtExceptionHandler);
if (this.replicationSourceHandler == this.replicationSinkHandler &&
this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else {
if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
}
if (this.replicationSinkHandler != null) {
this.replicationSinkHandler.startReplicationService();
}
}
// Create the log splitting worker and start it
// set a smaller retries to fast fail otherwise splitlogworker could be blocked for
// quite a while inside Connection layer. The worker won't be available for other
@ -1989,8 +1984,73 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
if (this.csm != null) {
// SplitLogWorker needs csm. If none, don't start this.
this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
this, walFactory);
splitLogWorker.start();
} else {
LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
}
// Memstore services.
startHeapMemoryManager();
// Call it after starting HeapMemoryManager.
initializeMemStoreChunkCreator();
}
private void initializeThreads() throws IOException {
// Cache flushing thread.
this.cacheFlusher = new MemStoreFlusher(conf, this);
// Compaction thread
this.compactSplitThread = new CompactSplit(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this);
this.leases = new Leases(this.threadWakeFrequency);
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.create(this);
if (this.nonceManager != null) {
// Create the scheduled chore that cleans up nonces.
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
}
// Setup the Quota Manager
rsQuotaManager = new RegionServerRpcQuotaManager(this);
rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
if (QuotaUtil.isQuotaEnabled(conf)) {
this.fsUtilizationChore = new FileSystemUtilizationChore(this);
}
boolean onlyMetaRefresh = false;
int storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if (storefileRefreshPeriod == 0) {
storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
onlyMetaRefresh = true;
}
if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh, this, this);
}
registerConfigurationObservers();
}
private void registerConfigurationObservers() {
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
}
/**
@ -2058,14 +2118,15 @@ public class HRegionServer extends HasThread implements
return false;
}
// Verify that all threads are alive
if (!(leases.isAlive()
&& cacheFlusher.isAlive() && walRoller.isAlive()
&& this.compactionChecker.isScheduled()
&& this.periodicFlusher.isScheduled())) {
boolean healthy = (this.leases == null || this.leases.isAlive())
&& (this.cacheFlusher == null || this.cacheFlusher.isAlive())
&& (this.walRoller == null || this.walRoller.isAlive())
&& (this.compactionChecker == null || this.compactionChecker.isScheduled())
&& (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
if (!healthy) {
stop("One or more threads are no longer alive -- stop");
return false;
}
return true;
return healthy;
}
private static final byte[] UNSPECIFIED_REGION = new byte[]{};
@ -2088,7 +2149,9 @@ public class HRegionServer extends HasThread implements
byte[] namespace = regionInfo.getTable().getNamespace();
wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace);
}
walRoller.addWAL(wal);
if (this.walRoller != null) {
this.walRoller.addWAL(wal);
}
return wal;
}
@ -2307,10 +2370,11 @@ public class HRegionServer extends HasThread implements
// RegionReplicaFlushHandler might reset this.
// submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
this.service.submit(
new RegionReplicaFlushHandler(this, clusterConnection,
if (this.executorService != null) {
this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
}
}
@Override
public RpcServerInterface getRpcServer() {
@ -2432,7 +2496,7 @@ public class HRegionServer extends HasThread implements
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
}
if (this.service != null) this.service.shutdown();
if (this.executorService != null) this.executorService.shutdown();
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler == this.replicationSinkHandler) {
this.replicationSourceHandler.stopReplicationService();
@ -2448,7 +2512,7 @@ public class HRegionServer extends HasThread implements
/**
* @return Return the object that implements the replication
* source service.
* source executorService.
*/
@VisibleForTesting
public ReplicationSourceService getReplicationSourceService() {
@ -2457,7 +2521,7 @@ public class HRegionServer extends HasThread implements
/**
* @return Return the object that implements the replication
* sink service.
* sink executorService.
*/
ReplicationSinkService getReplicationSinkService() {
return replicationSinkHandler;
@ -2569,6 +2633,7 @@ public class HRegionServer extends HasThread implements
* @throws IOException
*/
private RegionServerStartupResponse reportForDuty() throws IOException {
if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
ServerName masterServerName = createRegionServerStatusStub(true);
if (masterServerName == null) return null;
RegionServerStartupResponse result = null;
@ -2880,7 +2945,7 @@ public class HRegionServer extends HasThread implements
@Override
public ExecutorService getExecutorService() {
return service;
return executorService;
}
@Override
@ -2898,10 +2963,10 @@ public class HRegionServer extends HasThread implements
//
/**
* Load the replication service objects, if any
* Load the replication executorService objects, if any
*/
private static void createNewReplicationInstance(Configuration conf,
HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
FileSystem walFs, Path walDir, Path oldWALDir) throws IOException {
if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) ||
LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
@ -2971,7 +3036,7 @@ public class HRegionServer extends HasThread implements
* @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
*/
public static void main(String[] args) throws Exception {
LOG.info("STARTING service " + HRegionServer.class.getSimpleName());
LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
VersionInfo.logVersion();
Configuration conf = HBaseConfiguration.create();
@SuppressWarnings("unchecked")
@ -3145,7 +3210,7 @@ public class HRegionServer extends HasThread implements
} else {
crh = new CloseRegionHandler(this, this, hri, abort, sn);
}
this.service.submit(crh);
this.executorService.submit(crh);
return true;
}
@ -3395,8 +3460,7 @@ public class HRegionServer extends HasThread implements
// This map will contains all the regions that we closed for a move.
// We add the time it was moved as we don't want to keep too old information
protected Map<String, MovedRegionInfo> movedRegions =
new ConcurrentHashMap<>(3000);
protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000);
// We need a timeout. If not there is a risk of giving a wrong information: this would double
// the number of network calls instead of reducing them.
@ -3457,7 +3521,6 @@ public class HRegionServer extends HasThread implements
/**
* Creates a Chore thread to clean the moved region cache.
*/
protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
private HRegionServer regionServer;
Stoppable stoppable;
@ -3606,7 +3669,7 @@ public class HRegionServer extends HasThread implements
String serviceName = call.getServiceName();
com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
if (service == null) {
throw new UnknownProtocolException(null, "No registered coprocessor service found for " +
throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
serviceName);
}
com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
@ -3617,7 +3680,7 @@ public class HRegionServer extends HasThread implements
serviceDesc.findMethodByName(methodName);
if (methodDesc == null) {
throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
" called on service " + serviceName);
" called on executorService " + serviceName);
}
com.google.protobuf.Message request =

View File

@ -314,6 +314,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
/**
* Services launched in RSRpcServices. By default they are on but you can use the below
* booleans to selectively enable/disable either Admin or Client Service (Rare is the case
* where you would ever turn off one or the other).
*/
public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
"hbase.regionserver.admin.executorService";
public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
"hbase.regionserver.client.executorService";
/**
* An Rpc callback for closing a RegionScanner.
*/
@ -591,13 +601,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Mutate a list of rows atomically.
*
* @param region
* @param actions
* @param cellScanner if non-null, the mutation data -- the Cell content.
* @param row
* @param family
* @param qualifier
* @param compareOp
* @param comparator @throws IOException
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
@ -649,12 +653,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Execute an append mutation.
*
* @param region
* @param m
* @param cellScanner
* @return result to return to client if default operation should be
* bypassed as indicated by RegionObserver, null otherwise
* @throws IOException
*/
private Result append(final HRegion region, final OperationQuota quota,
final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
@ -1426,17 +1426,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
/**
* @return list of blocking services and their security info classes that this server supports
* By default, put up an Admin and a Client Service.
* Set booleans <code>hbase.regionserver.admin.executorService</code> and
* <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
* Default is that both are enabled.
* @return immutable list of blocking services and the security info classes that this server
* supports
*/
protected List<BlockingServiceAndInterface> getServices() {
List<BlockingServiceAndInterface> bssi = new ArrayList<>(2);
boolean admin =
getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
boolean client =
getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
List<BlockingServiceAndInterface> bssi = new ArrayList<>();
if (client) {
bssi.add(new BlockingServiceAndInterface(
ClientService.newReflectiveBlockingService(this),
ClientService.BlockingInterface.class));
}
if (admin) {
bssi.add(new BlockingServiceAndInterface(
AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
return bssi;
}
return new org.apache.hadoop.hbase.shaded.com.google.common.collect.
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
}
public InetSocketAddress getSocketAddress() {
@ -1943,8 +1957,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// If there is no action in progress, we can submit a specific handler.
// Need to pass the expected version in the constructor.
if (regionServer.executorService == null) {
LOG.info("No executor executorService; skipping open request");
} else {
if (region.isMetaRegion()) {
regionServer.service.submit(new OpenMetaHandler(
regionServer.executorService.submit(new OpenMetaHandler(
regionServer, regionServer, region, htd, masterSystemTime));
} else {
if (regionOpenInfo.getFavoredNodesCount() > 0) {
@ -1952,14 +1969,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionOpenInfo.getFavoredNodesList());
}
if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
regionServer.service.submit(new OpenPriorityRegionHandler(
regionServer.executorService.submit(new OpenPriorityRegionHandler(
regionServer, regionServer, region, htd, masterSystemTime));
} else {
regionServer.service.submit(new OpenRegionHandler(
regionServer.executorService.submit(new OpenRegionHandler(
regionServer, regionServer, region, htd, masterSystemTime));
}
}
}
}
builder.addOpeningState(RegionOpeningState.OPENED);

View File

@ -263,7 +263,7 @@ public class TestRegionServerNoMaster {
// Let's start the open handler
TableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1));
getRS().executorService.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed(HTU, getRS(), hri);