diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cbb1537df03..8f2ae6b2097 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -99,7 +99,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; -import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode; import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; @@ -472,66 +471,73 @@ public class HMaster extends HRegionServer implements MasterServices { public HMaster(final Configuration conf, CoordinatedStateManager csm) throws IOException, KeeperException { super(conf, csm); - this.rsFatals = new MemoryBoundedLogMessageBuffer( - conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024)); + try { + this.rsFatals = new MemoryBoundedLogMessageBuffer( + conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); - LOG.info("hbase.rootdir=" + getRootDir() + - ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); + LOG.info("hbase.rootdir=" + getRootDir() + + ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); - // Disable usage of meta replicas in the master - this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); + // Disable usage of meta replicas in the master + this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); - Replication.decorateMasterConfiguration(this.conf); + Replication.decorateMasterConfiguration(this.conf); - // Hack! Maps DFSClient => Master for logs. HDFS made this - // config param for task trackers, but we can piggyback off of it. - if (this.conf.get("mapreduce.task.attempt.id") == null) { - this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString()); - } - - // should we check the compression codec type at master side, default true, HBASE-6370 - this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true); - - // should we check encryption settings at master side, default true - this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true); - - this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this)); - - // preload table descriptor at startup - this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true); - - this.maxBlancingTime = getMaxBalancingTime(); - this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT, - HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT); - - // Do we publish the status? - - boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED, - HConstants.STATUS_PUBLISHED_DEFAULT); - Class publisherClass = - conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS, - ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS, - ClusterStatusPublisher.Publisher.class); - - if (shouldPublish) { - if (publisherClass == null) { - LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + - ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS + - " is not set - not publishing status"); - } else { - clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); - getChoreService().scheduleChore(clusterStatusPublisherChore); + // Hack! Maps DFSClient => Master for logs. HDFS made this + // config param for task trackers, but we can piggyback off of it. + if (this.conf.get("mapreduce.task.attempt.id") == null) { + this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString()); } - } - // Some unit tests don't need a cluster, so no zookeeper at all - if (!conf.getBoolean("hbase.testing.nocluster", false)) { - setInitLatch(new CountDownLatch(1)); - activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); - int infoPort = putUpJettyServer(); - startActiveMasterManager(infoPort); - } else { - activeMasterManager = null; + // should we check the compression codec type at master side, default true, HBASE-6370 + this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true); + + // should we check encryption settings at master side, default true + this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true); + + this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this)); + + // preload table descriptor at startup + this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true); + + this.maxBlancingTime = getMaxBalancingTime(); + this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT, + HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT); + + // Do we publish the status? + + boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED, + HConstants.STATUS_PUBLISHED_DEFAULT); + Class publisherClass = + conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS, + ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS, + ClusterStatusPublisher.Publisher.class); + + if (shouldPublish) { + if (publisherClass == null) { + LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + + ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS + + " is not set - not publishing status"); + } else { + clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); + getChoreService().scheduleChore(clusterStatusPublisherChore); + } + } + + // Some unit tests don't need a cluster, so no zookeeper at all + if (!conf.getBoolean("hbase.testing.nocluster", false)) { + setInitLatch(new CountDownLatch(1)); + activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); + int infoPort = putUpJettyServer(); + startActiveMasterManager(infoPort); + } else { + activeMasterManager = null; + } + } catch (Throwable t) { + // Make sure we log the exception. HMaster is often started via reflection and the + // cause of failed startup is lost. + LOG.error("Failed construction of Master", t); + throw t; } } @@ -1100,15 +1106,15 @@ public class HMaster extends HRegionServer implements MasterServices { */ private void startServiceThreads() throws IOException{ // Start the executor service pools - this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, + this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); - this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, + this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); - this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, + this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); - this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, + this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.meta.serverops.threads", 5)); - this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, + this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); // We depend on there being only one instance of this executor running @@ -1116,7 +1122,7 @@ public class HMaster extends HRegionServer implements MasterServices { // tables. // Any time changing this maxThreads to > 1, pls see the comment at // AccessController#postCompletedCreateTableAction - this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); + this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); // Start log cleaner thread diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2c0bd035f59..7ed53601832 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -234,16 +233,10 @@ public class HRegionServer extends HasThread implements /** * For testing only! Set to true to skip notifying region assignment to master . */ + @VisibleForTesting @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") public static boolean TEST_SKIP_REPORTING_TRANSITION = false; - /* - * Strings to be used in forming the exception message for - * RegionsAlreadyInTransitionException. - */ - protected static final String OPEN = "OPEN"; - protected static final String CLOSE = "CLOSE"; - //RegionName vs current action in progress //true - if open region action in progress //false - if close region action in progress @@ -315,8 +308,8 @@ public class HRegionServer extends HasThread implements // Leases protected Leases leases; - // Instance of the hbase executor service. - protected ExecutorService service; + // Instance of the hbase executor executorService. + protected ExecutorService executorService; // If false, the file system has become unavailable protected volatile boolean fsOk; @@ -380,7 +373,7 @@ public class HRegionServer extends HasThread implements /** * ChoreService used to schedule tasks that we want to run periodically */ - private final ChoreService choreService; + private ChoreService choreService; /* * Check for compactions requests. @@ -396,7 +389,7 @@ public class HRegionServer extends HasThread implements // WAL roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes - protected final LogRoller walRoller; + protected LogRoller walRoller; // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -532,6 +525,15 @@ public class HRegionServer extends HasThread implements private final NettyEventLoopGroupConfig eventLoopGroupConfig; + /** + * True if this RegionServer is coming up in a cluster where there is no Master; + * means it needs to just come up and make do without a Master to talk to: e.g. in test or + * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only + * purpose is as a Replication-stream sink; see HBASE-18846 for more. + */ + private final boolean masterless; + static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; + /** * Starts a HRegionServer at the default location. */ @@ -541,145 +543,156 @@ public class HRegionServer extends HasThread implements /** * Starts a HRegionServer at the default location + * * @param csm implementation of CoordinatedStateManager to be used */ + // Don't start any services or managers in here in the Consructor. + // Defer till after we register with the Master as much as possible. See #startServices. public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException { super("RegionServer"); // thread name - this.startcode = System.currentTimeMillis(); - this.fsOk = true; - this.conf = conf; - // initialize netty event loop group at the very beginning as we may use it to start rpc server, - // rpc client and WAL. - this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); - NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(), - eventLoopGroupConfig.clientChannelClass()); - NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(), - eventLoopGroupConfig.clientChannelClass()); - MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); - HFile.checkHFileVersion(this.conf); - checkCodecs(this.conf); - this.userProvider = UserProvider.instantiate(conf); - FSUtils.setupShortCircuitRead(this.conf); + try { + this.startcode = System.currentTimeMillis(); + this.conf = conf; + this.fsOk = true; + this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); + this.eventLoopGroupConfig = setupNetty(this.conf); + MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); + HFile.checkHFileVersion(this.conf); + checkCodecs(this.conf); + this.userProvider = UserProvider.instantiate(conf); + FSUtils.setupShortCircuitRead(this.conf); + Replication.decorateRegionServerConfiguration(this.conf); - Replication.decorateRegionServerConfiguration(this.conf); + // Disable usage of meta replicas in the regionserver + this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); + // Config'ed params + this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); + this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); - // Disable usage of meta replicas in the regionserver - this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); - // Config'ed params - this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + this.sleeper = new Sleeper(this.msgInterval, this); - this.sleeper = new Sleeper(this.msgInterval, this); + boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); + this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; - boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); - this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; + this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); - this.numRegionsToReport = conf.getInt( - "hbase.regionserver.numregionstoreport", 10); + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.operationTimeout = conf.getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); - this.shortOperationTimeout = conf.getInt( - HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); + this.abortRequested = false; + this.stopped = false; - this.abortRequested = false; - this.stopped = false; - - rpcServices = createRpcServices(); - if (this instanceof HMaster) { - useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY); - } else { - useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY); - if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { - if (shouldUseThisHostnameInstead()) { - String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY + - " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + - " to true while " + RS_HOSTNAME_KEY + " is used"; - throw new IOException(msg); - } else { - useThisHostnameInstead = rpcServices.isa.getHostName(); + rpcServices = createRpcServices(); + if (this instanceof HMaster) { + useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY); + } else { + useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY); + if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { + if (shouldUseThisHostnameInstead()) { + String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY + + " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + + " to true while " + RS_HOSTNAME_KEY + " is used"; + throw new IOException(msg); + } else { + useThisHostnameInstead = rpcServices.isa.getHostName(); + } } } - } - String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead : - rpcServices.isa.getHostName(); - serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); + String hostName = shouldUseThisHostnameInstead() ? + this.useThisHostnameInstead : this.rpcServices.isa.getHostName(); + serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); - rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); - rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); + rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); - // login the zookeeper client principal (if using security) - ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, - HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); - // login the server principal (if using secure Hadoop) - login(userProvider, hostName); - // init superusers and add the server principal (if using security) - // or process owner as default super user. - Superusers.initialize(conf); + // login the zookeeper client principal (if using security) + ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, + HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); + // login the server principal (if using secure Hadoop) + login(userProvider, hostName); + // init superusers and add the server principal (if using security) + // or process owner as default super user. + Superusers.initialize(conf); - regionServerAccounting = new RegionServerAccounting(conf); - cacheConfig = new CacheConfig(conf); - mobCacheConfig = new MobCacheConfig(conf); - uncaughtExceptionHandler = new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - abort("Uncaught exception in service thread " + t.getName(), e); + regionServerAccounting = new RegionServerAccounting(conf); + cacheConfig = new CacheConfig(conf); + mobCacheConfig = new MobCacheConfig(conf); + uncaughtExceptionHandler = new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + abort("Uncaught exception in executorService thread " + t.getName(), e); + } + }; + + initializeFileSystem(); + spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); + + this.configurationManager = new ConfigurationManager(); + setupWindows(getConfiguration(), getConfigurationManager()); + + // Some unit tests don't need a cluster, so no zookeeper at all + if (!conf.getBoolean("hbase.testing.nocluster", false)) { + // Open connection to zookeeper and set primary watcher + zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + + rpcServices.isa.getPort(), this, canCreateBaseZNode()); + + // If no master in cluster, skip trying to track one or look for a cluster status. + if (!this.masterless) { + this.csm = (BaseCoordinatedStateManager) csm; + this.csm.initialize(this); + this.csm.start(); + + masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); + masterAddressTracker.start(); + + clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); + clusterStatusTracker.start(); + } } - }; - - initializeFileSystem(); - - service = new ExecutorService(getServerName().toShortString()); - spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); - - // Some unit tests don't need a cluster, so no zookeeper at all - if (!conf.getBoolean("hbase.testing.nocluster", false)) { - // Open connection to zookeeper and set primary watcher - zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + - rpcServices.isa.getPort(), this, canCreateBaseZNode()); - - this.csm = (BaseCoordinatedStateManager) csm; - this.csm.initialize(this); - this.csm.start(); - - masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); - masterAddressTracker.start(); - - clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); - clusterStatusTracker.start(); + // This violates 'no starting stuff in Constructor' but Master depends on the below chore + // and executor being created and takes a different startup route. Lots of overlap between HRS + // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super + // Master expects Constructor to put up web servers. Ugh. + // class HRS. TODO. + this.choreService = new ChoreService(getServerName().toString(), true); + this.executorService = new ExecutorService(getServerName().toShortString()); + this.rpcServices.start(); + putUpWebUI(); + } catch (Throwable t) { + // Make sure we log the exception. HRegionServer is often started via reflection and the + // cause of failed startup is lost. + LOG.error("Failed construction RegionServer", t); + throw t; } - this.configurationManager = new ConfigurationManager(); - - rpcServices.start(); - putUpWebUI(); - this.walRoller = new LogRoller(this, this); - this.choreService = new ChoreService(getServerName().toString(), true); - this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); + } + /** + * If running on Windows, do windows-specific setup. + */ + private static void setupWindows(final Configuration conf, ConfigurationManager cm) { if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @Override public void handle(Signal signal) { - getConfiguration().reloadConfiguration(); - configurationManager.notifyAllObservers(getConfiguration()); + conf.reloadConfiguration(); + cm.notifyAllObservers(conf); } }); } - // Create the CompactedFileDischarger chore service. This chore helps to - // remove the compacted files - // that will no longer be used in reads. - // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to - // 2 mins so that compacted files can be archived before the TTLCleaner runs - int cleanerInterval = - conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); - this.compactedFileDischarger = - new CompactedHFilesDischarger(cleanerInterval, this, this); - choreService.scheduleChore(compactedFileDischarger); + } + + private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { + // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. + NettyEventLoopGroupConfig nelgc = + new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); + NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); + NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); + return nelgc; } private void initializeFileSystem() throws IOException { @@ -723,7 +736,7 @@ public class HRegionServer extends HasThread implements "hbase.regionserver.kerberos.principal", host); } - protected void waitForMasterActive(){ + protected void waitForMasterActive() { } protected String getProcessName() { @@ -731,7 +744,7 @@ public class HRegionServer extends HasThread implements } protected boolean canCreateBaseZNode() { - return false; + return this.masterless; } protected boolean canUpdateTableDescriptor() { @@ -754,28 +767,28 @@ public class HRegionServer extends HasThread implements @Override public boolean registerService(com.google.protobuf.Service instance) { /* - * No stacking of instances is allowed for a single service name + * No stacking of instances is allowed for a single executorService name */ com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType(); String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); if (coprocessorServiceHandlers.containsKey(serviceName)) { - LOG.error("Coprocessor service " + serviceName + LOG.error("Coprocessor executorService " + serviceName + " already registered, rejecting request from " + instance); return false; } coprocessorServiceHandlers.put(serviceName, instance); if (LOG.isDebugEnabled()) { - LOG.debug("Registered regionserver coprocessor service: service=" + serviceName); + LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName); } return true; } /** * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to - * the local server. Safe to use going to local or remote server. - * Create this instance in a method can be intercepted and mocked in tests. + * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote + * server. Create this instance in a method can be intercepted and mocked in tests. * @throws IOException */ @VisibleForTesting @@ -821,28 +834,19 @@ public class HRegionServer extends HasThread implements /** * All initialization needed before we go register with Master. + * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master. + * In here we just put up the RpcServer, setup Connection, and ZooKeeper. * * @throws IOException * @throws InterruptedException */ - private void preRegistrationInitialization(){ + private void preRegistrationInitialization() { try { setupClusterConnection(); - - this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); - this.secureBulkLoadManager.start(); - - // Health checker thread. - if (isHealthCheckerConfigured()) { - int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, - HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); - healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); - } - initializeZooKeeper(); - if (!isStopped() && !isAborted()) { - initializeThreads(); - } + // Setup RPC client for master communication + this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( + this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. @@ -862,6 +866,9 @@ public class HRegionServer extends HasThread implements @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", justification="cluster Id znode read would give us correct response") private void initializeZooKeeper() throws IOException, InterruptedException { + // Nothing to do in here if no Master in the mix. + if (this.masterless) return; + // Create the master address tracker, register with zk, and start it. Then // block until a master is available. No point in starting up if no master // running. @@ -908,7 +915,7 @@ public class HRegionServer extends HasThread implements } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED", - justification="We don't care about the return") + justification="We don't care about the return") private void doLatch(final CountDownLatch latch) throws InterruptedException { if (latch != null) { // Result is ignored intentionally but if I remove the below, findbugs complains (the @@ -934,67 +941,11 @@ public class HRegionServer extends HasThread implements } /** - * @return False if cluster shutdown in progress + * @return True if the cluster is up. */ private boolean isClusterUp() { - return clusterStatusTracker != null && clusterStatusTracker.isClusterUp(); - } - - private void initializeThreads() throws IOException { - // Cache flushing thread. - this.cacheFlusher = new MemStoreFlusher(conf, this); - - // Compaction thread - this.compactSplitThread = new CompactSplit(this); - - // Background thread to check for compactions; needed if region has not gotten updates - // in a while. It will take care of not checking too frequently on store-by-store basis. - this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); - this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); - this.leases = new Leases(this.threadWakeFrequency); - - // Create the thread to clean the moved regions list - movedRegionsCleaner = MovedRegionsCleaner.create(this); - - if (this.nonceManager != null) { - // Create the scheduled chore that cleans up nonces. - nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); - } - - // Setup the Quota Manager - rsQuotaManager = new RegionServerRpcQuotaManager(this); - rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); - - if (QuotaUtil.isQuotaEnabled(conf)) { - this.fsUtilizationChore = new FileSystemUtilizationChore(this); - } - - // Setup RPC client for master communication - rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( - rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); - - boolean onlyMetaRefresh = false; - int storefileRefreshPeriod = conf.getInt( - StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD - , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); - if (storefileRefreshPeriod == 0) { - storefileRefreshPeriod = conf.getInt( - StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, - StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); - onlyMetaRefresh = true; - } - if (storefileRefreshPeriod > 0) { - this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, - onlyMetaRefresh, this, this); - } - registerConfigurationObservers(); - } - - private void registerConfigurationObservers() { - // Registering the compactSplitThread object with the ConfigurationManager. - configurationManager.registerObserver(this.compactSplitThread); - configurationManager.registerObserver(this.rpcServices); - configurationManager.registerObserver(this); + return this.masterless || + this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp(); } /** @@ -1019,6 +970,7 @@ public class HRegionServer extends HasThread implements // Try and register with the Master; tell it we are here. Break if // server is stopped or the clusterup flag is down or hdfs went wacky. + // Once registered successfully, go ahead and start up all Services. while (keepLooping()) { RegionServerStartupResponse w = reportForDuty(); if (w == null) { @@ -1030,14 +982,19 @@ public class HRegionServer extends HasThread implements } } - if (!isStopped() && isHealthy()){ + if (!isStopped() && isHealthy()) { // start the snapshot handler and other procedure handlers, // since the server is ready to run - rspmHost.start(); - + if (this.rspmHost != null) { + this.rspmHost.start(); + } // Start the Quota Manager - rsQuotaManager.start(getRpcServer().getScheduler()); - rsSpaceQuotaManager.start(); + if (this.rsQuotaManager != null) { + rsQuotaManager.start(getRpcServer().getScheduler()); + } + if (this.rsSpaceQuotaManager != null) { + this.rsSpaceQuotaManager.start(); + } } // We registered with the Master. Go into run mode. @@ -1176,8 +1133,7 @@ public class HRegionServer extends HasThread implements if (!this.killed && this.fsOk) { waitOnAllRegionsToClose(abortRequested); - LOG.info("stopping server " + this.serverName + - "; all regions closed."); + LOG.info("stopping server " + this.serverName + "; all regions closed."); } //fsOk flag may be changed when closing regions throws exception. @@ -1223,8 +1179,7 @@ public class HRegionServer extends HasThread implements if (this.zooKeeper != null) { this.zooKeeper.close(); } - LOG.info("stopping server " + this.serverName + - "; zookeeper connection closed."); + LOG.info("stopping server " + this.serverName + "; zookeeper connection closed."); LOG.info(Thread.currentThread().getName() + " exiting"); } @@ -1267,8 +1222,7 @@ public class HRegionServer extends HasThread implements ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); try { RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); - ServerName sn = ServerName.parseVersionedServerName( - this.serverName.getVersionedBytes()); + ServerName sn = ServerName.parseVersionedServerName(this.serverName.getVersionedBytes()); request.setServer(ProtobufUtil.toServerName(sn)); request.setLoad(sl); rss.regionServerReport(null, request.build()); @@ -1379,8 +1333,7 @@ public class HRegionServer extends HasThread implements maxMemory = usage.getMax(); } - ClusterStatusProtos.ServerLoad.Builder serverLoad = - ClusterStatusProtos.ServerLoad.newBuilder(); + ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount()); serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); @@ -1414,7 +1367,7 @@ public class HRegionServer extends HasThread implements serverLoad.setInfoServerPort(-1); } - // for the replicationLoad purpose. Only need to get from one service + // for the replicationLoad purpose. Only need to get from one executorService // either source or sink will get the same info ReplicationSourceService rsources = getReplicationSourceService(); @@ -1472,12 +1425,12 @@ public class HRegionServer extends HasThread implements // iterator of onlineRegions to close all user regions. for (Map.Entry e : this.onlineRegions.entrySet()) { RegionInfo hri = e.getValue().getRegionInfo(); - if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) - && !closedRegions.contains(hri.getEncodedName())) { + if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && + !closedRegions.contains(hri.getEncodedName())) { closedRegions.add(hri.getEncodedName()); // Don't update zk with this close transition; pass false. closeRegionIgnoreErrors(hri, abort); - } + } } // No regions in RIT, we could stop waiting now. if (this.regionsInTransitionInRS.isEmpty()) { @@ -1539,8 +1492,8 @@ public class HRegionServer extends HasThread implements // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); - this.serverName = ServerName.valueOf(hostnameFromMasterPOV, - rpcServices.isa.getPort(), this.startcode); + this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), + this.startcode); if (shouldUseThisHostnameInstead() && !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { String msg = "Master passed us a different hostname to use; was=" + @@ -1580,13 +1533,13 @@ public class HRegionServer extends HasThread implements // hack! Maps DFSClient => RegionServer for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapreduce.task.attempt.id") == null) { - this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + - this.serverName.toString()); + this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); } // Save it in a file, this will allow to see if we crash ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); + // This call sets up an initialized replication and WAL. Later we start it up. setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); @@ -1595,14 +1548,19 @@ public class HRegionServer extends HasThread implements this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); pauseMonitor.start(); - startServiceThreads(); - startHeapMemoryManager(); - // Call it after starting HeapMemoryManager. - initializeMemStoreChunkCreator(); - LOG.info("Serving as " + this.serverName + - ", RpcServer on " + rpcServices.isa + - ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); + // There is a rare case where we do NOT want services to start. Check config. + if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { + startServices(); + } + // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. + // or make sense of it. + startReplicationService(); + + + // Set up ZK + LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + + ", sessionid=0x" + + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); // Wake up anyone waiting for this server to online synchronized (online) { @@ -1627,15 +1585,15 @@ public class HRegionServer extends HasThread implements long globalMemStoreSize = pair.getFirst(); boolean offheap = this.regionServerAccounting.isOffheap(); // When off heap memstore in use, take full area for chunk pool. - float poolSizePercentage = offheap ? 1.0F - : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); + float poolSizePercentage = offheap? 1.0F: + conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); // init the chunkCreator ChunkCreator chunkCreator = ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, - initialCountPercentage, this.hMemManager); + initialCountPercentage, this.hMemManager); } } @@ -1652,8 +1610,7 @@ public class HRegionServer extends HasThread implements rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); - ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, - getMyEphemeralNodePath(), data); + ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); } private void deleteMyEphemeralNode() throws KeeperException { @@ -1756,8 +1713,7 @@ public class HRegionServer extends HasThread implements // immediately upon region server startup private long iteration = 1; - CompactionChecker(final HRegionServer h, final int sleepTime, - final Stoppable stopper) { + CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { super("CompactionChecker", stopper, sleepTime); this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); @@ -1766,8 +1722,8 @@ public class HRegionServer extends HasThread implements * If not set, the compaction will use default priority. */ this.majorCompactPriority = this.instance.conf. - getInt("hbase.regionserver.compactionChecker.majorCompactPriority", - DEFAULT_PRIORITY); + getInt("hbase.regionserver.compactionChecker.majorCompactPriority", + DEFAULT_PRIORITY); } @Override @@ -1793,12 +1749,13 @@ public class HRegionServer extends HasThread implements if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > hr.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(hr, s, - getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, - CompactionLifeCycleTracker.DUMMY, null); + getName() + " requests major compaction; use default priority", + Store.NO_PRIORITY, + CompactionLifeCycleTracker.DUMMY, null); } else { this.instance.compactSplitThread.requestCompaction(hr, s, - getName() + " requests major compaction; use configured priority", - this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); + getName() + " requests major compaction; use configured priority", + this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); } } } catch (IOException e) { @@ -1844,7 +1801,7 @@ public class HRegionServer extends HasThread implements /** * Report the status of the server. A server is online once all the startup is - * completed (setting up filesystem, starting service threads, etc.). This + * completed (setting up filesystem, starting executorService threads, etc.). This * method is designed mostly to be useful in tests. * * @return true if online, false if not. @@ -1856,6 +1813,7 @@ public class HRegionServer extends HasThread implements /** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. + * * @throws IOException */ private void setupWALAndReplication() throws IOException { @@ -1867,11 +1825,11 @@ public class HRegionServer extends HasThread implements if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); if (this.walFs.exists(logDir)) { throw new RegionServerRunningException("Region server has already " + - "created directory at " + this.serverName.toString()); + "created directory at " + this.serverName.toString()); } - // Instantiate replication manager if replication enabled. Pass it the - // log directories. + // Instantiate replication if replication enabled. Pass it the log directories. + // In here we create the Replication instances. Later they are initialized and started up. createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); // listeners the wal factory will add to wals it creates. @@ -1887,8 +1845,9 @@ public class HRegionServer extends HasThread implements // We use WALActionsListener to get the newly rolled WALs, so we need to get the // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written. - // So we here we need to construct WALFactory first, and then pass it to the initialize method + // So we here we need to construct WALFactory first, and then pass it to the initialized method // of ReplicationSourceHandler. + // TODO: I can't follow replication; it has initialize and then later on we start it! WALFactory factory = new WALFactory(conf, listeners, serverName.toString()); this.walFactory = factory; if (this.replicationSourceHandler != null) { @@ -1900,6 +1859,25 @@ public class HRegionServer extends HasThread implements } } + /** + * Start up replication source and sink handlers. + * @throws IOException + */ + private void startReplicationService() throws IOException { + if (this.replicationSourceHandler == this.replicationSinkHandler && + this.replicationSourceHandler != null) { + this.replicationSourceHandler.startReplicationService(); + } else { + if (this.replicationSourceHandler != null) { + this.replicationSourceHandler.startReplicationService(); + } + if (this.replicationSinkHandler != null) { + this.replicationSinkHandler.startReplicationService(); + } + } + } + + public MetricsRegionServer getRegionServerMetrics() { return this.metricsRegionServer; } @@ -1913,6 +1891,8 @@ public class HRegionServer extends HasThread implements /* * Start maintenance Threads, Server, Worker and lease checker threads. + * Start all threads we need to run. This is called after we've successfully + * registered with the Master. * Install an UncaughtExceptionHandler that calls abort of RegionServer if we * get an unhandled exception. We cannot set the handler on all threads. * Server's internal Listener thread is off limits. For Server, if an OOME, it @@ -1923,35 +1903,62 @@ public class HRegionServer extends HasThread implements * keeps its own internal stop mechanism so needs to be stopped by this * hosting server. Worker logs the exception and exits. */ - private void startServiceThreads() throws IOException { - // Start executor services - this.service.startExecutorService(ExecutorType.RS_OPEN_REGION, - conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); - this.service.startExecutorService(ExecutorType.RS_OPEN_META, - conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); - this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, - conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); - this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, - conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); - this.service.startExecutorService(ExecutorType.RS_CLOSE_META, - conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); - if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { - this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, - conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); + private void startServices() throws IOException { + if (!isStopped() && !isAborted()) { + initializeThreads(); } - this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( - "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); + this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); + this.secureBulkLoadManager.start(); + + // Health checker thread. + if (isHealthCheckerConfigured()) { + int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, + HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); + healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); + } + + this.walRoller = new LogRoller(this, this); + this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); + + // Create the CompactedFileDischarger chore executorService. This chore helps to + // remove the compacted files + // that will no longer be used in reads. + // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to + // 2 mins so that compacted files can be archived before the TTLCleaner runs + int cleanerInterval = + conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + this.compactedFileDischarger = + new CompactedHFilesDischarger(cleanerInterval, this, this); + choreService.scheduleChore(compactedFileDischarger); + + // Start executor services + this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, + conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); + this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, + conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); + this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, + conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); + this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, + conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); + this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, + conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); + if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { + this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, + conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); + } + this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( + "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); // Start the threads for compacted files discharger - this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, - conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); + this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, + conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { - this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, - conf.getInt("hbase.regionserver.region.replica.flusher.threads", - conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); + this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, + conf.getInt("hbase.regionserver.region.replica.flusher.threads", + conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", - uncaughtExceptionHandler); + uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); @@ -1965,19 +1972,7 @@ public class HRegionServer extends HasThread implements // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", - uncaughtExceptionHandler); - - if (this.replicationSourceHandler == this.replicationSinkHandler && - this.replicationSourceHandler != null) { - this.replicationSourceHandler.startReplicationService(); - } else { - if (this.replicationSourceHandler != null) { - this.replicationSourceHandler.startReplicationService(); - } - if (this.replicationSinkHandler != null) { - this.replicationSinkHandler.startReplicationService(); - } - } + uncaughtExceptionHandler); // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for @@ -1985,12 +1980,77 @@ public class HRegionServer extends HasThread implements // tasks even after current task is preempted after a split task times out. Configuration sinkConf = HBaseConfiguration.create(conf); sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds + conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds + conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); - splitLogWorker.start(); + if (this.csm != null) { + // SplitLogWorker needs csm. If none, don't start this. + this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, + this, walFactory); + splitLogWorker.start(); + } else { + LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); + } + + // Memstore services. + startHeapMemoryManager(); + // Call it after starting HeapMemoryManager. + initializeMemStoreChunkCreator(); + } + + private void initializeThreads() throws IOException { + // Cache flushing thread. + this.cacheFlusher = new MemStoreFlusher(conf, this); + + // Compaction thread + this.compactSplitThread = new CompactSplit(this); + + // Background thread to check for compactions; needed if region has not gotten updates + // in a while. It will take care of not checking too frequently on store-by-store basis. + this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); + this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); + this.leases = new Leases(this.threadWakeFrequency); + + // Create the thread to clean the moved regions list + movedRegionsCleaner = MovedRegionsCleaner.create(this); + + if (this.nonceManager != null) { + // Create the scheduled chore that cleans up nonces. + nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); + } + + // Setup the Quota Manager + rsQuotaManager = new RegionServerRpcQuotaManager(this); + rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); + + if (QuotaUtil.isQuotaEnabled(conf)) { + this.fsUtilizationChore = new FileSystemUtilizationChore(this); + } + + + boolean onlyMetaRefresh = false; + int storefileRefreshPeriod = conf.getInt( + StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, + StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); + if (storefileRefreshPeriod == 0) { + storefileRefreshPeriod = conf.getInt( + StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, + StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); + onlyMetaRefresh = true; + } + if (storefileRefreshPeriod > 0) { + this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, + onlyMetaRefresh, this, this); + } + registerConfigurationObservers(); + } + + private void registerConfigurationObservers() { + // Registering the compactSplitThread object with the ConfigurationManager. + configurationManager.registerObserver(this.compactSplitThread); + configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this); } /** @@ -2058,14 +2118,15 @@ public class HRegionServer extends HasThread implements return false; } // Verify that all threads are alive - if (!(leases.isAlive() - && cacheFlusher.isAlive() && walRoller.isAlive() - && this.compactionChecker.isScheduled() - && this.periodicFlusher.isScheduled())) { + boolean healthy = (this.leases == null || this.leases.isAlive()) + && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) + && (this.walRoller == null || this.walRoller.isAlive()) + && (this.compactionChecker == null || this.compactionChecker.isScheduled()) + && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); + if (!healthy) { stop("One or more threads are no longer alive -- stop"); - return false; } - return true; + return healthy; } private static final byte[] UNSPECIFIED_REGION = new byte[]{}; @@ -2088,7 +2149,9 @@ public class HRegionServer extends HasThread implements byte[] namespace = regionInfo.getTable().getNamespace(); wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace); } - walRoller.addWAL(wal); + if (this.walRoller != null) { + this.walRoller.addWAL(wal); + } return wal; } @@ -2307,9 +2370,10 @@ public class HRegionServer extends HasThread implements // RegionReplicaFlushHandler might reset this. // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler - this.service.submit( - new RegionReplicaFlushHandler(this, clusterConnection, - rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + if (this.executorService != null) { + this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, + rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + } } @Override @@ -2432,7 +2496,7 @@ public class HRegionServer extends HasThread implements if (this.compactSplitThread != null) { this.compactSplitThread.join(); } - if (this.service != null) this.service.shutdown(); + if (this.executorService != null) this.executorService.shutdown(); if (this.replicationSourceHandler != null && this.replicationSourceHandler == this.replicationSinkHandler) { this.replicationSourceHandler.stopReplicationService(); @@ -2448,7 +2512,7 @@ public class HRegionServer extends HasThread implements /** * @return Return the object that implements the replication - * source service. + * source executorService. */ @VisibleForTesting public ReplicationSourceService getReplicationSourceService() { @@ -2457,7 +2521,7 @@ public class HRegionServer extends HasThread implements /** * @return Return the object that implements the replication - * sink service. + * sink executorService. */ ReplicationSinkService getReplicationSinkService() { return replicationSinkHandler; @@ -2569,6 +2633,7 @@ public class HRegionServer extends HasThread implements * @throws IOException */ private RegionServerStartupResponse reportForDuty() throws IOException { + if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); ServerName masterServerName = createRegionServerStatusStub(true); if (masterServerName == null) return null; RegionServerStartupResponse result = null; @@ -2880,7 +2945,7 @@ public class HRegionServer extends HasThread implements @Override public ExecutorService getExecutorService() { - return service; + return executorService; } @Override @@ -2898,10 +2963,10 @@ public class HRegionServer extends HasThread implements // /** - * Load the replication service objects, if any + * Load the replication executorService objects, if any */ - private static void createNewReplicationInstance(Configuration conf, - HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{ + private static void createNewReplicationInstance(Configuration conf, HRegionServer server, + FileSystem walFs, Path walDir, Path oldWALDir) throws IOException { if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { @@ -2971,7 +3036,7 @@ public class HRegionServer extends HasThread implements * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ public static void main(String[] args) throws Exception { - LOG.info("STARTING service " + HRegionServer.class.getSimpleName()); + LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); @SuppressWarnings("unchecked") @@ -3145,7 +3210,7 @@ public class HRegionServer extends HasThread implements } else { crh = new CloseRegionHandler(this, this, hri, abort, sn); } - this.service.submit(crh); + this.executorService.submit(crh); return true; } @@ -3395,8 +3460,7 @@ public class HRegionServer extends HasThread implements // This map will contains all the regions that we closed for a move. // We add the time it was moved as we don't want to keep too old information - protected Map movedRegions = - new ConcurrentHashMap<>(3000); + protected Map movedRegions = new ConcurrentHashMap<>(3000); // We need a timeout. If not there is a risk of giving a wrong information: this would double // the number of network calls instead of reducing them. @@ -3457,7 +3521,6 @@ public class HRegionServer extends HasThread implements /** * Creates a Chore thread to clean the moved region cache. */ - protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { private HRegionServer regionServer; Stoppable stoppable; @@ -3606,7 +3669,7 @@ public class HRegionServer extends HasThread implements String serviceName = call.getServiceName(); com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); if (service == null) { - throw new UnknownProtocolException(null, "No registered coprocessor service found for " + + throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " + serviceName); } com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = @@ -3617,7 +3680,7 @@ public class HRegionServer extends HasThread implements serviceDesc.findMethodByName(methodName); if (methodDesc == null) { throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + - " called on service " + serviceName); + " called on executorService " + serviceName); } com.google.protobuf.Message request = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 045838a6507..28f73aaf7c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -314,6 +314,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); + /** + * Services launched in RSRpcServices. By default they are on but you can use the below + * booleans to selectively enable/disable either Admin or Client Service (Rare is the case + * where you would ever turn off one or the other). + */ + public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = + "hbase.regionserver.admin.executorService"; + public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = + "hbase.regionserver.client.executorService"; + /** * An Rpc callback for closing a RegionScanner. */ @@ -591,13 +601,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Mutate a list of rows atomically. * - * @param region - * @param actions * @param cellScanner if non-null, the mutation data -- the Cell content. - * @param row - * @param family - * @param qualifier - * @param compareOp * @param comparator @throws IOException */ private boolean checkAndRowMutate(final HRegion region, final List actions, @@ -649,12 +653,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Execute an append mutation. * - * @param region - * @param m - * @param cellScanner * @return result to return to client if default operation should be * bypassed as indicated by RegionObserver, null otherwise - * @throws IOException */ private Result append(final HRegion region, final OperationQuota quota, final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, @@ -1426,17 +1426,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } /** - * @return list of blocking services and their security info classes that this server supports + * By default, put up an Admin and a Client Service. + * Set booleans hbase.regionserver.admin.executorService and + * hbase.regionserver.client.executorService if you want to enable/disable services. + * Default is that both are enabled. + * @return immutable list of blocking services and the security info classes that this server + * supports */ protected List getServices() { - List bssi = new ArrayList<>(2); - bssi.add(new BlockingServiceAndInterface( + boolean admin = + getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); + boolean client = + getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); + List bssi = new ArrayList<>(); + if (client) { + bssi.add(new BlockingServiceAndInterface( ClientService.newReflectiveBlockingService(this), ClientService.BlockingInterface.class)); - bssi.add(new BlockingServiceAndInterface( + } + if (admin) { + bssi.add(new BlockingServiceAndInterface( AdminService.newReflectiveBlockingService(this), AdminService.BlockingInterface.class)); - return bssi; + } + return new org.apache.hadoop.hbase.shaded.com.google.common.collect. + ImmutableList.Builder().addAll(bssi).build(); } public InetSocketAddress getSocketAddress() { @@ -1943,20 +1957,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // If there is no action in progress, we can submit a specific handler. // Need to pass the expected version in the constructor. - if (region.isMetaRegion()) { - regionServer.service.submit(new OpenMetaHandler( - regionServer, regionServer, region, htd, masterSystemTime)); + if (regionServer.executorService == null) { + LOG.info("No executor executorService; skipping open request"); } else { - if (regionOpenInfo.getFavoredNodesCount() > 0) { - regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), - regionOpenInfo.getFavoredNodesList()); - } - if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { - regionServer.service.submit(new OpenPriorityRegionHandler( - regionServer, regionServer, region, htd, masterSystemTime)); + if (region.isMetaRegion()) { + regionServer.executorService.submit(new OpenMetaHandler( + regionServer, regionServer, region, htd, masterSystemTime)); } else { - regionServer.service.submit(new OpenRegionHandler( + if (regionOpenInfo.getFavoredNodesCount() > 0) { + regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), + regionOpenInfo.getFavoredNodesList()); + } + if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { + regionServer.executorService.submit(new OpenPriorityRegionHandler( regionServer, regionServer, region, htd, masterSystemTime)); + } else { + regionServer.executorService.submit(new OpenRegionHandler( + regionServer, regionServer, region, htd, masterSystemTime)); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index ffda9640607..f8b9f6eeaa3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -263,7 +263,7 @@ public class TestRegionServerNoMaster { // Let's start the open handler TableDescriptor htd = getRS().tableDescriptors.get(hri.getTable()); - getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1)); + getRS().executorService.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1)); // The open handler should have removed the region from RIT but kept the region closed checkRegionIsClosed(HTU, getRS(), hri);