HBASE-19840 Flakey TestMetaWithReplicas

Fix two issues:

 # Meta Replicas can all be assigned to the same server. This
 will call the test to hang when we do our kill of the server
 hosting meta because there'll be no replicas to read from
 as test intends. Check is to look for this condition on
 startup and adjust if we come across it. Replicas cross-cut
 assignment. They need work.
 # Other issue was shutdown. The master started toward the
 end of the test may not have come up fully by the time
 shutdown is called. We could be stuck assigning the
 meta replicas. Have shutdown shutdown the procedure
 executor engine.

There is other cleanup and notes in the below.

M HMaster
 Remove the silly stops in startup now we have real
 means of shutting down Master during init.

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
 This replica stuff was doing stuff it shouldn't be doing
 like setting core Master state flags. It may have made
 sense once but now meta is assigned by a Pv2 Procedure
 so the flag setting in here is meddlesome. Clear out
 methods no longer needed.

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
 Remove unused methods.
 Changes local variable names so they align w/ our naming elsewhere in
 code base.

M hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
 Check for all replicas on the one server.
This commit is contained in:
Michael Stack 2018-01-24 21:36:57 -08:00
parent 1efa050ebf
commit 77607e4961
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
7 changed files with 142 additions and 143 deletions

View File

@ -35,10 +35,12 @@ public abstract class HasThread implements Runnable {
public HasThread() {
this.thread = new Thread(this);
this.thread.setDaemon(true);
}
public HasThread(String name) {
this.thread = new Thread(this, name);
this.thread.setDaemon(true);
}
public Thread getThread() {

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -814,13 +813,13 @@ public class HMaster extends HRegionServer implements MasterServices {
this.serverManager = createServerManager(this);
// This manager is started AFTER hbase:meta is confirmed on line.
// See inside metaBootstrap.recoverMeta(); below. Shouldn't be so cryptic!
this.tableStateManager = new TableStateManager(this);
status.setStatus("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
// Set Master as active now after we've setup zk with stuff like whether cluster is up or not.
// RegionServers won't come up if the cluster status is not up.
// Set ourselves as active Master now our claim has succeeded up in zk.
this.activeMaster = true;
// This is for backwards compatibility
@ -863,10 +862,6 @@ public class HMaster extends HRegionServer implements MasterServices {
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.initialize();
// Check if master is shutting down because of some issue
// in initializing the regionserver or the balancer.
if (isStopped()) return;
// Make sure meta assigned before proceeding.
status.setStatus("Recovering Meta Region");
@ -875,9 +870,6 @@ public class HMaster extends HRegionServer implements MasterServices {
MasterMetaBootstrap metaBootstrap = createMetaBootstrap(this, status);
metaBootstrap.recoverMeta();
// check if master is shutting down because above assignMeta could return even hbase:meta isn't
// assigned when master is shutting down
if (isStopped()) return;
//Initialize after meta as it scans meta
if (favoredNodesManager != null) {
@ -935,15 +927,11 @@ public class HMaster extends HRegionServer implements MasterServices {
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
// Set master as 'initialized'.
setInitialized(true);
assignmentManager.checkIfShouldMoveSystemRegionAsync();
status.setStatus("Assign meta replicas");
metaBootstrap.assignMetaReplicas();
status.setStatus("Starting quota manager");
initQuotaManager();
if (QuotaUtil.isQuotaEnabled(conf)) {
@ -2686,18 +2674,25 @@ public class HMaster extends HRegionServer implements MasterServices {
if (cpHost != null) {
cpHost.preShutdown();
}
// Tell the servermanager cluster is down.
// Tell the servermanager cluster shutdown has been called. This makes it so when Master is
// last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
// the cluster status as down. RegionServers will notice this change in state and will start
// shutting themselves down. When last has exited, Master can go down.
if (this.serverManager != null) {
this.serverManager.shutdownCluster();
}
// Set the cluster down flag; broadcast across the cluster.
if (this.clusterStatusTracker != null){
if (this.clusterStatusTracker != null) {
try {
this.clusterStatusTracker.setClusterDown();
} catch (KeeperException e) {
LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
}
}
// Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
// processing so we can go down.
if (this.procedureExecutor != null) {
this.procedureExecutor.stop();
}
// Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
// this is what we want especially if the Master is in startup phase doing call outs to
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on

View File

@ -20,15 +20,18 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -74,12 +77,45 @@ public class MasterMetaBootstrap {
}
}
public void assignMetaReplicas()
/**
* For assigning hbase:meta replicas only.
* TODO: The way this assign runs, nothing but chance to stop all replicas showing up on same
* server as the hbase:meta region.
*/
protected void assignMetaReplicas()
throws IOException, InterruptedException, KeeperException {
int numReplicas = master.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
if (numReplicas <= 1) {
// No replicaas to assign. Return.
return;
}
final AssignmentManager assignmentManager = master.getAssignmentManager();
if (!assignmentManager.isMetaInitialized()) {
throw new IllegalStateException("hbase:meta must be initialized first before we can " +
"assign out its replicas");
}
ServerName metaServername =
this.master.getMetaTableLocator().getMetaRegionLocation(this.master.getZooKeeper());
for (int i = 1; i < numReplicas; i++) {
assignMeta(i);
// Get current meta state for replica from zk.
RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper(), i);
RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, i);
LOG.debug(hri.getRegionNameAsString() + " replica region state from zookeeper=" + metaState);
if (metaServername.equals(metaState.getServerName())) {
metaState = null;
LOG.info(hri.getRegionNameAsString() +
" old location is same as current hbase:meta location; setting location as null...");
}
// These assigns run inline. All is blocked till they complete. Only interrupt is shutting
// down hosting server which calls AM#stop.
if (metaState != null && metaState.getServerName() != null) {
// Try to retain old assignment.
assignmentManager.assign(hri, metaState.getServerName());
} else {
assignmentManager.assign(hri);
}
}
unassignExcessMetaReplica(numReplicas);
}
@ -108,49 +144,6 @@ public class MasterMetaBootstrap {
}
}
/**
* Check <code>hbase:meta</code> is assigned. If not, assign it.
*/
protected void assignMeta(int replicaId)
throws InterruptedException, IOException, KeeperException {
final AssignmentManager assignmentManager = master.getAssignmentManager();
// Work on meta region
// TODO: Unimplemented
// long timeout =
// master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
status.setStatus("Assigning hbase:meta region");
} else {
status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
}
// Get current meta state from zk.
RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper(), replicaId);
LOG.debug("meta state from zookeeper: " + metaState);
RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId);
assignmentManager.assignMeta(hri, metaState.getServerName());
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
// TODO: should we prevent from using state manager before meta was initialized?
// tableStateManager.start();
master.getTableStateManager()
.setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
}
master.getTableStateManager().start();
// Make sure a hbase:meta location is set. We need to enable SSH here since
// if the meta region server is died at this time, we need it to be re-assigned
// by SSH so that system tables can be assigned.
// No need to wait for meta is assigned = 0 when meta is just verified.
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(false);
LOG.info("hbase:meta with replicaId " + replicaId + ", location="
+ master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper(), replicaId));
status.setStatus("META assigned.");
}
private void enableCrashedServerProcessing(final boolean waitForMeta)
throws InterruptedException {
// If crashed server processing is disabled, we enable it and expire those dead but not expired

View File

@ -92,9 +92,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* A server is fully processed only after the handler is fully enabled
* and has completed the handling.
*/
/**
*
*/
@InterfaceAudience.Private
public class ServerManager {
public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
@ -587,7 +584,7 @@ public class ServerManager {
// If cluster is going down, yes, servers are going to be expiring; don't
// process as a dead server
if (this.clusterShutdown.get()) {
if (isClusterShutdown()) {
LOG.info("Cluster shutdown set; " + serverName +
" expired; onlineServers=" + this.onlineServers.size());
if (this.onlineServers.isEmpty()) {
@ -961,7 +958,7 @@ public class ServerManager {
this.clusterShutdown.set(true);
}
public boolean isClusterShutdown() {
boolean isClusterShutdown() {
return this.clusterShutdown.get();
}

View File

@ -449,27 +449,6 @@ public class AssignmentManager implements ServerListener {
return metaLoadEvent.isReady();
}
// ============================================================================================
// TODO: Sync helpers
// ============================================================================================
public void assignMeta(final RegionInfo metaRegionInfo) throws IOException {
assignMeta(metaRegionInfo, null);
}
public void assignMeta(final RegionInfo metaRegionInfo, final ServerName serverName)
throws IOException {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
AssignProcedure proc;
if (serverName != null) {
LOG.debug("Try assigning Meta " + metaRegionInfo + " to " + serverName);
proc = createAssignProcedure(metaRegionInfo, serverName);
} else {
LOG.debug("Assigning " + metaRegionInfo.getRegionNameAsString());
proc = createAssignProcedure(metaRegionInfo);
}
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
}
/**
* Start a new thread to check if there are region servers whose versions are higher than others.
* If so, move all system table regions to RS with the highest version to keep compatibility.
@ -526,6 +505,11 @@ public class AssignmentManager implements ServerListener {
.collect(Collectors.toList());
}
public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException {
AssignProcedure proc = createAssignProcedure(regionInfo, sn);
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
}
public void assign(final RegionInfo regionInfo) throws IOException {
AssignProcedure proc = createAssignProcedure(regionInfo);
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
@ -1649,6 +1633,7 @@ public class AssignmentManager implements ServerListener {
pendingAssignQueue.clear();
}
};
assignThread.setDaemon(true);
assignThread.start();
}
@ -1712,17 +1697,16 @@ public class AssignmentManager implements ServerListener {
// TODO: Optimize balancer. pass a RegionPlan?
final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
final List<RegionInfo> userRRList = new ArrayList<>();
// regions for system tables requiring reassignment
final List<RegionInfo> sysRRList = new ArrayList<>();
for (RegionStateNode regionNode : regions.values()) {
boolean sysTable = regionNode.isSystemTable();
final List<RegionInfo> rrList = sysTable ? sysRRList : userRRList;
if (regionNode.getRegionLocation() != null) {
retainMap.put(regionNode.getRegionInfo(), regionNode.getRegionLocation());
final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
// Regions for system tables requiring reassignment
final List<RegionInfo> systemHRIs = new ArrayList<>();
for (RegionStateNode regionStateNode: regions.values()) {
boolean sysTable = regionStateNode.isSystemTable();
final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
if (regionStateNode.getRegionLocation() != null) {
retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
} else {
rrList.add(regionNode.getRegionInfo());
hris.add(regionStateNode.getRegionInfo());
}
}
@ -1731,45 +1715,44 @@ public class AssignmentManager implements ServerListener {
// TODO use events
List<ServerName> servers = master.getServerManager().createDestinationServersList();
for (int i = 0; servers.size() < 1; ++i) {
// Report every fourth time around this loop; try not to flood log.
if (i % 4 == 0) {
LOG.warn("no server available, unable to find a location for " + regions.size() +
" unassigned regions. waiting");
LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
}
// the was AM killed
if (!isRunning()) {
LOG.debug("aborting assignment-queue with " + regions.size() + " not assigned");
LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
return;
}
Threads.sleep(250);
servers = master.getServerManager().createDestinationServersList();
}
if (!sysRRList.isEmpty()) {
// system table regions requiring reassignment are present, get region servers
if (!systemHRIs.isEmpty()) {
// System table regions requiring reassignment are present, get region servers
// not available for system table regions
final List<ServerName> excludeServers = getExcludedServersForSystemTable();
List<ServerName> serversForSysTables = servers.stream()
.filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
if (serversForSysTables.isEmpty()) {
LOG.warn("No servers available for system table regions, considering all servers!");
LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
"instead considering all candidate servers!");
}
LOG.debug("Processing assignment plans for System tables sysServersCount=" +
serversForSysTables.size() + ", allServersCount=" + servers.size());
processAssignmentPlans(regions, null, sysRRList,
serversForSysTables.isEmpty() ? servers : serversForSysTables);
LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
", allServersCount=" + servers.size());
processAssignmentPlans(regions, null, systemHRIs,
serversForSysTables.isEmpty()? servers: serversForSysTables);
}
processAssignmentPlans(regions, retainMap, userRRList, servers);
processAssignmentPlans(regions, retainMap, userHRIs, servers);
}
private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> rrList,
final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
final List<ServerName> servers) {
boolean isTraceEnabled = LOG.isTraceEnabled();
if (isTraceEnabled) {
LOG.trace("available servers count=" + servers.size() + ": " + servers);
LOG.trace("Available servers count=" + servers.size() + ": " + servers);
}
final LoadBalancer balancer = getBalancer();
@ -1788,16 +1771,16 @@ public class AssignmentManager implements ServerListener {
// TODO: Do we need to split retain and round-robin?
// the retain seems to fallback to round-robin/random if the region is not in the map.
if (!rrList.isEmpty()) {
Collections.sort(rrList, RegionInfo.COMPARATOR);
if (!hris.isEmpty()) {
Collections.sort(hris, RegionInfo.COMPARATOR);
if (isTraceEnabled) {
LOG.trace("round robin regions=" + rrList);
LOG.trace("round robin regions=" + hris);
}
try {
acceptPlan(regions, balancer.roundRobinAssignment(rrList, servers));
acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
} catch (HBaseIOException e) {
LOG.warn("unable to round-robin assignment", e);
addToPendingAssignment(regions, rrList);
addToPendingAssignment(regions, hris);
}
}
}
@ -1844,16 +1827,19 @@ public class AssignmentManager implements ServerListener {
}
/**
* Get a list of servers that this region can not assign to.
* For system table, we must assign them to a server with highest version.
* Get a list of servers that this region cannot be assigned to.
* For system tables, we must assign them to a server with highest version.
*/
public List<ServerName> getExcludedServersForSystemTable() {
// TODO: This should be a cached list kept by the ServerManager rather than calculated on each
// move or system region assign. The RegionServerTracker keeps list of online Servers with
// RegionServerInfo that includes Version.
List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
.stream()
.map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
.collect(Collectors.toList());
if (serverList.isEmpty()) {
return new ArrayList<>();
return Collections.EMPTY_LIST;
}
String highestVersion = Collections.max(serverList,
(o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();

View File

@ -28,7 +28,9 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.master.NoSuchProcedureException;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -81,6 +84,7 @@ public class TestMetaWithReplicas {
build();
private static final Logger LOG = LoggerFactory.getLogger(TestMetaWithReplicas.class);
private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int REGIONSERVERS_COUNT = 3;
@Rule
public TestName name = new TestName();
@ -91,30 +95,47 @@ public class TestMetaWithReplicas {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.getConfiguration().setInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000);
TEST_UTIL.startMiniCluster(3);
// disable the balancer
LoadBalancerTracker l = new LoadBalancerTracker(TEST_UTIL.getZooKeeperWatcher(),
new Abortable() {
AtomicBoolean aborted = new AtomicBoolean(false);
@Override
public boolean isAborted() {
return aborted.get();
}
@Override
public void abort(String why, Throwable e) {
aborted.set(true);
}
});
l.setBalancerOn(false);
for (int replicaId = 1; replicaId < 3; replicaId ++) {
RegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO,
replicaId);
TEST_UTIL.startMiniCluster(REGIONSERVERS_COUNT);
AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
Set<ServerName> sns = new HashSet<ServerName>();
for (int replicaId = 0; replicaId < 3; replicaId ++) {
RegionInfo h =
RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO,
replicaId);
try {
TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().waitForAssignment(h);
am.waitForAssignment(h);
ServerName sn = am.getRegionStates().getRegionServerOfRegion(h);
LOG.info(h.getRegionNameAsString() + " on " + sn);
sns.add(sn);
} catch (NoSuchProcedureException e) {
LOG.info("Presume the procedure has been cleaned up so just proceed: " + e.toString());
}
}
// Fun. All meta region replicas have ended up on the one server. This will cause this test
// to fail ... sometimes.
if (sns.size() == 1) {
LOG.warn("All hbase:meta replicas are on the one server; moving hbase:meta");
int metaServerIndex = TEST_UTIL.getHBaseCluster().getServerWithMeta();
int newServerIndex = (metaServerIndex + 1) % REGIONSERVERS_COUNT;
ServerName destinationServerName =
TEST_UTIL.getHBaseCluster().getRegionServer(newServerIndex).getServerName();
TEST_UTIL.getAdmin().move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
Bytes.toBytes(destinationServerName.toString()));
}
// Disable the balancer
LoadBalancerTracker l = new LoadBalancerTracker(TEST_UTIL.getZooKeeperWatcher(),
new Abortable() {
AtomicBoolean aborted = new AtomicBoolean(false);
@Override
public boolean isAborted() {
return aborted.get();
}
@Override
public void abort(String why, Throwable e) {
aborted.set(true);
}
});
l.setBalancerOn(false);
LOG.debug("All meta replicas assigned");
}
@ -208,13 +229,15 @@ public class TestMetaWithReplicas {
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
30000) * 3);
}
// Ensure all metas are not on same hbase:meta replica=0 server!
master = util.getHBaseClusterInterface().getClusterMetrics().getMasterName();
// kill the master so that regionserver recovery is not triggered at all
// for the meta server
LOG.info("Stopping master=" + master.toString());
util.getHBaseClusterInterface().stopMaster(master);
util.getHBaseClusterInterface().waitForMasterToStop(master, 60000);
LOG.info("Master stopped!");
LOG.info("Master " + master + " stopped!");
if (!master.equals(primary)) {
util.getHBaseClusterInterface().killRegionServer(primary);
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);

View File

@ -262,7 +262,10 @@ public class TestMasterNoCluster {
MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
return new MasterMetaBootstrap(this, status) {
@Override
protected void assignMeta(int replicaId) { }
protected void assignMetaReplicas()
throws IOException, InterruptedException, KeeperException {
// Nothing to do.
}
};
}