HBASE-11319 No need to use favored node mapping initialization to find all regions
This commit is contained in:
parent
3ed3c5513c
commit
6834c929cc
|
@ -572,6 +572,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
Set<TableName> disabledOrDisablingOrEnabling = null;
|
||||
Map<HRegionInfo, ServerName> allRegions = null;
|
||||
|
||||
if (!failover) {
|
||||
disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
|
||||
|
@ -579,7 +580,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
ZooKeeperProtos.Table.State.ENABLING);
|
||||
|
||||
// Clean re/start, mark all user regions closed before reassignment
|
||||
regionStates.closeAllUserRegions(disabledOrDisablingOrEnabling);
|
||||
allRegions = regionStates.closeAllUserRegions(
|
||||
disabledOrDisablingOrEnabling);
|
||||
}
|
||||
|
||||
// Now region states are restored
|
||||
|
@ -608,7 +610,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (!failover) {
|
||||
// Fresh cluster startup.
|
||||
LOG.info("Clean cluster startup. Assigning user regions");
|
||||
assignAllUserRegions(disabledOrDisablingOrEnabling);
|
||||
assignAllUserRegions(allRegions);
|
||||
}
|
||||
return failover;
|
||||
}
|
||||
|
@ -2648,22 +2650,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* should be shutdown.
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private void assignAllUserRegions(Set<TableName> disabledOrDisablingOrEnabling)
|
||||
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
|
||||
// Skip assignment for regions of tables in DISABLING state because during clean cluster startup
|
||||
// no RS is alive and regions map also doesn't have any information about the regions.
|
||||
// See HBASE-6281.
|
||||
// Scan hbase:meta for all user regions, skipping any disabled tables
|
||||
Map<HRegionInfo, ServerName> allRegions;
|
||||
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
|
||||
new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
|
||||
snapshotOfRegionAssignment.initialize();
|
||||
allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
|
||||
if (allRegions == null || allRegions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
|
||||
throws IOException, InterruptedException {
|
||||
if (allRegions == null || allRegions.isEmpty()) return;
|
||||
|
||||
// Determine what type of assignment to do on startup
|
||||
boolean retainAssignment = server.getConfiguration().
|
||||
|
|
|
@ -73,8 +73,6 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.http.InfoServer;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
|
|
|
@ -651,7 +651,12 @@ public class RegionStates {
|
|||
lastAssignments.put(encodedName, serverName);
|
||||
}
|
||||
|
||||
synchronized void closeAllUserRegions(Set<TableName> excludedTables) {
|
||||
/**
|
||||
* At cluster clean re/start, mark all user regions closed except those of tables
|
||||
* that are excluded, such as disabled/disabling/enabling tables. All user regions
|
||||
* and their previous locations are returned.
|
||||
*/
|
||||
synchronized Map<HRegionInfo, ServerName> closeAllUserRegions(Set<TableName> excludedTables) {
|
||||
boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty();
|
||||
Set<HRegionInfo> toBeClosed = new HashSet<HRegionInfo>(regionStates.size());
|
||||
for(RegionState state: regionStates.values()) {
|
||||
|
@ -662,9 +667,13 @@ public class RegionStates {
|
|||
toBeClosed.add(hri);
|
||||
}
|
||||
}
|
||||
Map<HRegionInfo, ServerName> allUserRegions =
|
||||
new HashMap<HRegionInfo, ServerName>(toBeClosed.size());
|
||||
for (HRegionInfo hri: toBeClosed) {
|
||||
updateRegionState(hri, State.CLOSED);
|
||||
RegionState regionState = updateRegionState(hri, State.CLOSED);
|
||||
allUserRegions.put(hri, regionState.getServerName());
|
||||
}
|
||||
return allUserRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -926,8 +925,8 @@ public class ServerManager {
|
|||
int oldCount = 0;
|
||||
ServerName masterSn = master.getServerName();
|
||||
boolean selfCheckedIn = isServerOnline(masterSn);
|
||||
while (!this.master.isStopped() && !selfCheckedIn && count < maxToStart
|
||||
&& (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
|
||||
while (!this.master.isStopped() && (!selfCheckedIn || (count < maxToStart
|
||||
&& (lastCountChange+interval > now || timeout > slept || count < minToStart)))) {
|
||||
// Log some info at every interval time or if there is a change
|
||||
if (oldCount != count || lastLogTime+interval < now){
|
||||
lastLogTime = now;
|
||||
|
@ -935,7 +934,8 @@ public class ServerManager {
|
|||
"Waiting for region servers count to settle; currently"+
|
||||
" checked in " + count + ", slept for " + slept + " ms," +
|
||||
" expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
|
||||
", timeout of "+timeout+" ms, interval of "+interval+" ms.";
|
||||
", timeout of "+timeout+" ms, interval of "+interval+" ms," +
|
||||
" selfCheckedIn " + selfCheckedIn;
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg);
|
||||
}
|
||||
|
@ -958,7 +958,8 @@ public class ServerManager {
|
|||
LOG.info("Finished waiting for region servers count to settle;" +
|
||||
" checked in " + count + ", slept for " + slept + " ms," +
|
||||
" expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
|
||||
" master is "+ (this.master.isStopped() ? "stopped.": "running.")
|
||||
" master is "+ (this.master.isStopped() ? "stopped.": "running," +
|
||||
" selfCheckedIn " + selfCheckedIn)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -834,10 +834,17 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
numRetainedAssigments++;
|
||||
} else {
|
||||
// multiple new servers in the cluster on this same host
|
||||
int size = localServers.size();
|
||||
ServerName target =
|
||||
localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM
|
||||
.nextInt(size));
|
||||
ServerName target = null;
|
||||
for (ServerName tmp: localServers) {
|
||||
if (tmp.getPort() == oldServerName.getPort()) {
|
||||
target = tmp;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (target == null) {
|
||||
int size = localServers.size();
|
||||
target = localServers.get(RANDOM.nextInt(size));
|
||||
}
|
||||
assignments.get(target).add(region);
|
||||
numRetainedAssigments++;
|
||||
}
|
||||
|
|
|
@ -118,7 +118,6 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
|
|
|
@ -102,6 +102,7 @@ public class TestMasterShutdown {
|
|||
// Create config to use for this cluster
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt("hbase.ipc.client.failed.servers.expiry", 200);
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||
|
||||
// Start the cluster
|
||||
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
|
|
|
@ -19,17 +19,21 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -128,4 +132,112 @@ public class TestRestartCluster {
|
|||
UTIL.waitTableAvailable(TABLE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests retaining assignments on a cluster restart
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
public void testRetainAssignmentOnRestart() throws Exception {
|
||||
UTIL.startMiniCluster(2);
|
||||
while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
// Turn off balancer
|
||||
UTIL.getMiniHBaseCluster().getMaster().
|
||||
getMasterRpcServices().synchronousBalanceSwitch(false);
|
||||
LOG.info("\n\nCreating tables");
|
||||
for(byte [] TABLE : TABLES) {
|
||||
UTIL.createTable(TABLE, FAMILY);
|
||||
}
|
||||
for(byte [] TABLE : TABLES) {
|
||||
UTIL.waitTableEnabled(TABLE);
|
||||
}
|
||||
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
AssignmentManager am = master.getAssignmentManager();
|
||||
am.waitUntilNoRegionsInTransition(120000);
|
||||
|
||||
// We don't have to use SnapshotOfRegionAssignmentFromMeta.
|
||||
// We use it here because AM used to use it to load all user region placements
|
||||
CatalogTracker ct = new CatalogTracker(UTIL.getConfiguration());
|
||||
SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct);
|
||||
snapshot.initialize();
|
||||
Map<HRegionInfo, ServerName> regionToRegionServerMap
|
||||
= snapshot.getRegionToRegionServerMap();
|
||||
|
||||
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
|
||||
List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
|
||||
assertEquals(2, threads.size());
|
||||
int[] rsPorts = new int[3];
|
||||
for (int i = 0; i < 2; i++) {
|
||||
rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
|
||||
}
|
||||
rsPorts[2] = cluster.getMaster().getServerName().getPort();
|
||||
for (ServerName serverName: regionToRegionServerMap.values()) {
|
||||
boolean found = false; // Test only, no need to optimize
|
||||
for (int k = 0; k < 3 && !found; k++) {
|
||||
found = serverName.getPort() == rsPorts[k];
|
||||
}
|
||||
assertTrue(found);
|
||||
}
|
||||
|
||||
LOG.info("\n\nShutting down HBase cluster");
|
||||
cluster.shutdown();
|
||||
cluster.waitUntilShutDown();
|
||||
|
||||
LOG.info("\n\nSleeping a bit");
|
||||
Thread.sleep(2000);
|
||||
|
||||
LOG.info("\n\nStarting cluster the second time with the same ports");
|
||||
try {
|
||||
cluster.getConf().setInt(
|
||||
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 4);
|
||||
master = cluster.startMaster().getMaster();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]);
|
||||
cluster.startRegionServer();
|
||||
}
|
||||
} finally {
|
||||
// Reset region server port so as not to conflict with other tests
|
||||
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, 0);
|
||||
cluster.getConf().setInt(
|
||||
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
|
||||
}
|
||||
|
||||
// Make sure live regionservers are on the same host/port
|
||||
List<ServerName> localServers = master.getServerManager().getOnlineServersList();
|
||||
assertEquals(4, localServers.size());
|
||||
for (int i = 0; i < 3; i++) {
|
||||
boolean found = false;
|
||||
for (ServerName serverName: localServers) {
|
||||
if (serverName.getPort() == rsPorts[i]) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(found);
|
||||
}
|
||||
|
||||
// Wait till master is initialized and all regions are assigned
|
||||
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
|
||||
int expectedRegions = regionToRegionServerMap.size() + 1;
|
||||
while (!master.isInitialized()
|
||||
|| regionStates.getRegionAssignments().size() != expectedRegions) {
|
||||
Threads.sleep(100);
|
||||
}
|
||||
|
||||
ct = new CatalogTracker(UTIL.getConfiguration());
|
||||
snapshot =new SnapshotOfRegionAssignmentFromMeta(ct);
|
||||
snapshot.initialize();
|
||||
Map<HRegionInfo, ServerName> newRegionToRegionServerMap =
|
||||
snapshot.getRegionToRegionServerMap();
|
||||
assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
|
||||
for (Map.Entry<HRegionInfo, ServerName> entry: newRegionToRegionServerMap.entrySet()) {
|
||||
if (TableName.NAMESPACE_TABLE_NAME.equals(entry.getKey().getTable())) continue;
|
||||
ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
|
||||
ServerName currentServer = entry.getValue();
|
||||
assertEquals(oldServer.getHostAndPort(), currentServer.getHostAndPort());
|
||||
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.LocalHBaseCluster;
|
|||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
|
@ -62,6 +63,7 @@ public class TestRSKilledWhenInitializing {
|
|||
firstRS.set(true);
|
||||
// Create config to use for this cluster
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||
|
||||
// Start the cluster
|
||||
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
|
|
Loading…
Reference in New Issue