HBASE-19793 Minor improvements on Master/RS startup

This commit is contained in:
zhangduo 2018-01-14 20:45:31 +08:00
parent e546034f09
commit 3b638f7ea1
4 changed files with 74 additions and 78 deletions

View File

@ -52,6 +52,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
@ -540,6 +541,11 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
} }
@Override
protected String getUseThisHostnameInstead(Configuration conf) {
return conf.get(MASTER_HOSTNAME_KEY);
}
// Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
// block in here until then. // block in here until then.
@Override @Override
@ -612,7 +618,8 @@ public class HMaster extends HRegionServer implements MasterServices {
masterJettyServer.addConnector(connector); masterJettyServer.addConnector(connector);
masterJettyServer.setStopAtShutdown(true); masterJettyServer.setStopAtShutdown(true);
final String redirectHostname = shouldUseThisHostnameInstead() ? useThisHostnameInstead : null; final String redirectHostname =
StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead;
final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname); final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname);
final WebAppContext context = new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS); final WebAppContext context = new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS);
@ -788,7 +795,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
// Initialize the chunkCreator // Initialize the chunkCreator
initializeMemStoreChunkCreator(); initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(this); this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this); this.walManager = new MasterWalManager(this);
// enable table descriptors cache // enable table descriptors cache
@ -806,7 +813,7 @@ public class HMaster extends HRegionServer implements MasterServices {
ClusterId clusterId = fileSystemManager.getClusterId(); ClusterId clusterId = fileSystemManager.getClusterId();
status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper"); status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper");
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
this.clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); this.clusterId = clusterId.toString();
this.serverManager = createServerManager(this); this.serverManager = createServerManager(this);

View File

@ -98,11 +98,8 @@ public class MasterFileSystem {
private boolean isSecurityEnabled; private boolean isSecurityEnabled;
private final MasterServices services; public MasterFileSystem(Configuration conf) throws IOException {
this.conf = conf;
public MasterFileSystem(MasterServices services) throws IOException {
this.conf = services.getConfiguration();
this.services = services;
// Set filesystem to be that of this.rootdir else we get complaints about // Set filesystem to be that of this.rootdir else we get complaints about
// mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
// default localfs. Presumption is that rootdir is fully-qualified before // default localfs. Presumption is that rootdir is fully-qualified before

View File

@ -25,6 +25,7 @@ import java.lang.reflect.Constructor;
import java.net.BindException; import java.net.BindException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -42,8 +43,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
@ -51,6 +50,7 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -169,7 +169,6 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -179,6 +178,7 @@ import sun.misc.SignalHandler;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@ -385,13 +385,13 @@ public class HRegionServer extends HasThread implements
final AtomicBoolean online = new AtomicBoolean(false); final AtomicBoolean online = new AtomicBoolean(false);
// zookeeper connection and watcher // zookeeper connection and watcher
protected ZKWatcher zooKeeper; protected final ZKWatcher zooKeeper;
// master address tracker // master address tracker
private MasterAddressTracker masterAddressTracker; private final MasterAddressTracker masterAddressTracker;
// Cluster Status Tracker // Cluster Status Tracker
protected ClusterStatusTracker clusterStatusTracker; protected final ClusterStatusTracker clusterStatusTracker;
// Log Splitting Worker // Log Splitting Worker
private SplitLogWorker splitLogWorker; private SplitLogWorker splitLogWorker;
@ -524,7 +524,6 @@ public class HRegionServer extends HasThread implements
private final boolean masterless; private final boolean masterless;
static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
/** /**
* Starts a HRegionServer at the default location * Starts a HRegionServer at the default location
*/ */
@ -571,23 +570,10 @@ public class HRegionServer extends HasThread implements
this.stopped = false; this.stopped = false;
rpcServices = createRpcServices(); rpcServices = createRpcServices();
if (this instanceof HMaster) { useThisHostnameInstead = getUseThisHostnameInstead(conf);
useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY); String hostName =
} else { StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY); : this.useThisHostnameInstead;
if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
if (shouldUseThisHostnameInstead()) {
String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
" are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
" to true while " + RS_HOSTNAME_KEY + " is used";
throw new IOException(msg);
} else {
useThisHostnameInstead = rpcServices.isa.getHostName();
}
}
}
String hostName = shouldUseThisHostnameInstead() ?
this.useThisHostnameInstead : this.rpcServices.isa.getHostName();
serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
@ -623,7 +609,6 @@ public class HRegionServer extends HasThread implements
// Open connection to zookeeper and set primary watcher // Open connection to zookeeper and set primary watcher
zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
rpcServices.isa.getPort(), this, canCreateBaseZNode()); rpcServices.isa.getPort(), this, canCreateBaseZNode());
// If no master in cluster, skip trying to track one or look for a cluster status. // If no master in cluster, skip trying to track one or look for a cluster status.
if (!this.masterless) { if (!this.masterless) {
this.csm = new ZkCoordinatedStateManager(this); this.csm = new ZkCoordinatedStateManager(this);
@ -633,7 +618,14 @@ public class HRegionServer extends HasThread implements
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start(); clusterStatusTracker.start();
} else {
masterAddressTracker = null;
clusterStatusTracker = null;
} }
} else {
zooKeeper = null;
masterAddressTracker = null;
clusterStatusTracker = null;
} }
// This violates 'no starting stuff in Constructor' but Master depends on the below chore // This violates 'no starting stuff in Constructor' but Master depends on the below chore
// and executor being created and takes a different startup route. Lots of overlap between HRS // and executor being created and takes a different startup route. Lots of overlap between HRS
@ -652,6 +644,23 @@ public class HRegionServer extends HasThread implements
} }
} }
// HMaster should override this method to load the specific config for master
protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
String hostname = conf.get(RS_HOSTNAME_KEY);
if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
if (!StringUtils.isBlank(hostname)) {
String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
" are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
" to true while " + RS_HOSTNAME_KEY + " is used";
throw new IOException(msg);
} else {
return rpcServices.isa.getHostName();
}
} else {
return hostname;
}
}
/** /**
* If running on Windows, do windows-specific setup. * If running on Windows, do windows-specific setup.
*/ */
@ -701,13 +710,6 @@ public class HRegionServer extends HasThread implements
return null; return null;
} }
/*
* Returns true if configured hostname should be used
*/
protected boolean shouldUseThisHostnameInstead() {
return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
}
protected void login(UserProvider user, String host) throws IOException { protected void login(UserProvider user, String host) throws IOException {
user.login("hbase.regionserver.keytab.file", user.login("hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", host); "hbase.regionserver.kerberos.principal", host);
@ -810,17 +812,14 @@ public class HRegionServer extends HasThread implements
} }
/** /**
* All initialization needed before we go register with Master. * All initialization needed before we go register with Master.<br>
* Do bare minimum. Do bulk of initializations AFTER we've connected to the Master. * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
* In here we just put up the RpcServer, setup Connection, and ZooKeeper. * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
*
* @throws IOException
* @throws InterruptedException
*/ */
private void preRegistrationInitialization() { private void preRegistrationInitialization() {
try { try {
setupClusterConnection();
initializeZooKeeper(); initializeZooKeeper();
setupClusterConnection();
// Setup RPC client for master communication // Setup RPC client for master communication
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
@ -833,18 +832,18 @@ public class HRegionServer extends HasThread implements
} }
/** /**
* Bring up connection to zk ensemble and then wait until a master for this * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
* cluster and then after that, wait until cluster 'up' flag has been set. * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
* This is the order in which master does things. * <p>
* Finally open long-living server short-circuit connection. * Finally open long-living server short-circuit connection.
* @throws IOException
* @throws InterruptedException
*/ */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
justification="cluster Id znode read would give us correct response") justification="cluster Id znode read would give us correct response")
private void initializeZooKeeper() throws IOException, InterruptedException { private void initializeZooKeeper() throws IOException, InterruptedException {
// Nothing to do in here if no Master in the mix. // Nothing to do in here if no Master in the mix.
if (this.masterless) return; if (this.masterless) {
return;
}
// Create the master address tracker, register with zk, and start it. Then // Create the master address tracker, register with zk, and start it. Then
// block until a master is available. No point in starting up if no master // block until a master is available. No point in starting up if no master
@ -855,17 +854,20 @@ public class HRegionServer extends HasThread implements
// when ready. // when ready.
blockAndCheckIfStopped(this.clusterStatusTracker); blockAndCheckIfStopped(this.clusterStatusTracker);
// Retrieve clusterId // If we are HMaster then the cluster id should have already been set.
// Since cluster status is now up if (clusterId == null) {
// ID should have already been set by HMaster // Retrieve clusterId
try { // Since cluster status is now up
clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); // ID should have already been set by HMaster
if (clusterId == null) { try {
this.abort("Cluster ID has not been set"); clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
if (clusterId == null) {
this.abort("Cluster ID has not been set");
}
LOG.info("ClusterId : " + clusterId);
} catch (KeeperException e) {
this.abort("Failed to retrieve Cluster ID", e);
} }
LOG.info("ClusterId : "+clusterId);
} catch (KeeperException e) {
this.abort("Failed to retrieve Cluster ID",e);
} }
// In case colocated master, wait here till it's active. // In case colocated master, wait here till it's active.
@ -887,16 +889,6 @@ public class HRegionServer extends HasThread implements
} }
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED",
justification="We don't care about the return")
private void doLatch(final CountDownLatch latch) throws InterruptedException {
if (latch != null) {
// Result is ignored intentionally but if I remove the below, findbugs complains (the
// above justification on this method doesn't seem to suppress it).
boolean result = latch.await(20, TimeUnit.SECONDS);
}
}
/** /**
* Utilty method to wait indefinitely on a znode availability while checking * Utilty method to wait indefinitely on a znode availability while checking
* if the region server is shut down * if the region server is shut down
@ -1467,14 +1459,14 @@ public class HRegionServer extends HasThread implements
String hostnameFromMasterPOV = e.getValue(); String hostnameFromMasterPOV = e.getValue();
this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
this.startcode); this.startcode);
if (shouldUseThisHostnameInstead() && if (!StringUtils.isBlank(useThisHostnameInstead) &&
!hostnameFromMasterPOV.equals(useThisHostnameInstead)) { !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
String msg = "Master passed us a different hostname to use; was=" + String msg = "Master passed us a different hostname to use; was=" +
this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
LOG.error(msg); LOG.error(msg);
throw new IOException(msg); throw new IOException(msg);
} }
if (!shouldUseThisHostnameInstead() && if (StringUtils.isBlank(useThisHostnameInstead) &&
!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) { !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
String msg = "Master passed us a different hostname to use; was=" + String msg = "Master passed us a different hostname to use; was=" +
rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
@ -1691,7 +1683,7 @@ public class HRegionServer extends HasThread implements
CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
super("CompactionChecker", stopper, sleepTime); super("CompactionChecker", stopper, sleepTime);
this.instance = h; this.instance = h;
LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
/* MajorCompactPriority is configurable. /* MajorCompactPriority is configurable.
* If not set, the compaction will use default priority. * If not set, the compaction will use default priority.
@ -2386,7 +2378,7 @@ public class HRegionServer extends HasThread implements
// Do our best to report our abort to the master, but this may not work // Do our best to report our abort to the master, but this may not work
try { try {
if (cause != null) { if (cause != null) {
msg += "\nCause:\n" + StringUtils.stringifyException(cause); msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
} }
// Report to the master but only if we have already registered with the master. // Report to the master but only if we have already registered with the master.
if (rssStub != null && this.serverName != null) { if (rssStub != null && this.serverName != null) {
@ -2614,7 +2606,7 @@ public class HRegionServer extends HasThread implements
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
int port = rpcServices.isa.getPort(); int port = rpcServices.isa.getPort();
RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
if (shouldUseThisHostnameInstead()) { if (!StringUtils.isBlank(useThisHostnameInstead)) {
request.setUseThisHostnameInstead(useThisHostnameInstead); request.setUseThisHostnameInstead(useThisHostnameInstead);
} }
request.setPort(port); request.setPort(port);

View File

@ -103,7 +103,7 @@ public class MockMasterServices extends MockNoopMasterServices {
super(conf); super(conf);
this.regionsToRegionServers = regionsToRegionServers; this.regionsToRegionServers = regionsToRegionServers;
Superusers.initialize(conf); Superusers.initialize(conf);
this.fileSystemManager = new MasterFileSystem(this); this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this); this.walManager = new MasterWalManager(this);
// Mock an AM. // Mock an AM.
this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) { this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {