HBASE-11370 SSH doesn't need to scan meta if not using ZK for assignment
This commit is contained in:
parent
200579f5fd
commit
f0a32dff9b
|
@ -458,10 +458,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// need to be handled.
|
||||
|
||||
// Scan hbase:meta to build list of existing regions, servers, and assignment
|
||||
// Returns servers who have not checked in (assumed dead) and their regions
|
||||
Map<ServerName, List<HRegionInfo>> deadServers;
|
||||
// Returns servers who have not checked in (assumed dead) that some regions
|
||||
// were assigned to (according to the meta)
|
||||
Set<ServerName> deadServers = rebuildUserRegions();
|
||||
|
||||
deadServers = rebuildUserRegions();
|
||||
// This method will assign all user regions if a clean server startup or
|
||||
// it will reconstruct master state and cleanup any leftovers from
|
||||
// previous master process.
|
||||
|
@ -489,8 +489,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
boolean processDeadServersAndRegionsInTransition(
|
||||
final Map<ServerName, List<HRegionInfo>> deadServers)
|
||||
throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
|
||||
final Set<ServerName> deadServers) throws KeeperException,
|
||||
IOException, InterruptedException, CoordinatedStateException {
|
||||
List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
|
||||
watcher.assignmentZNode);
|
||||
|
||||
|
@ -2702,13 +2702,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
/**
|
||||
* Rebuild the list of user regions and assignment information.
|
||||
* <p>
|
||||
* Returns a map of servers that are not found to be online and the regions
|
||||
* they were hosting.
|
||||
* @return map of servers not online to their assigned regions, as stored
|
||||
* in META
|
||||
* Returns a set of servers that are not found to be online that hosted
|
||||
* some regions.
|
||||
* @return set of servers not online that hosted some regions per meta
|
||||
* @throws IOException
|
||||
*/
|
||||
Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws
|
||||
Set<ServerName> rebuildUserRegions() throws
|
||||
IOException, KeeperException, CoordinatedStateException {
|
||||
Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
|
||||
|
@ -2722,16 +2721,16 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
List<Result> results = MetaReader.fullScan(this.catalogTracker);
|
||||
// Get any new but slow to checkin region server that joined the cluster
|
||||
Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
|
||||
// Map of offline servers and their regions to be returned
|
||||
Map<ServerName, List<HRegionInfo>> offlineServers =
|
||||
new TreeMap<ServerName, List<HRegionInfo>>();
|
||||
// Set of offline servers to be returned
|
||||
Set<ServerName> offlineServers = new HashSet<ServerName>();
|
||||
// Iterate regions in META
|
||||
for (Result result : results) {
|
||||
HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
|
||||
if (regionInfo == null) continue;
|
||||
State state = RegionStateStore.getRegionState(result);
|
||||
ServerName lastHost = HRegionInfo.getServerName(result);
|
||||
ServerName regionLocation = RegionStateStore.getRegionServer(result);
|
||||
regionStates.createRegionState(regionInfo, state, regionLocation);
|
||||
regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
|
||||
if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
|
||||
// Region is not open (either offline or in transition), skip
|
||||
continue;
|
||||
|
@ -2739,13 +2738,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
TableName tableName = regionInfo.getTable();
|
||||
if (!onlineServers.contains(regionLocation)) {
|
||||
// Region is located on a server that isn't online
|
||||
List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
|
||||
if (offlineRegions == null) {
|
||||
offlineRegions = new ArrayList<HRegionInfo>(1);
|
||||
offlineServers.put(regionLocation, offlineRegions);
|
||||
offlineServers.add(regionLocation);
|
||||
if (useZKForAssignment) {
|
||||
regionStates.regionOffline(regionInfo);
|
||||
}
|
||||
regionStates.regionOffline(regionInfo);
|
||||
offlineRegions.add(regionInfo);
|
||||
} else if (!disabledOrEnablingTables.contains(tableName)) {
|
||||
// Region is being served and on an active server
|
||||
// add only if region not in disabled or enabling table
|
||||
|
@ -2838,13 +2834,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @throws KeeperException
|
||||
*/
|
||||
private void processDeadServersAndRecoverLostRegions(
|
||||
Map<ServerName, List<HRegionInfo>> deadServers)
|
||||
throws IOException, KeeperException {
|
||||
if (deadServers != null) {
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
|
||||
ServerName serverName = server.getKey();
|
||||
// We need to keep such info even if the server is known dead
|
||||
regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
|
||||
Set<ServerName> deadServers) throws IOException, KeeperException {
|
||||
if (deadServers != null && !deadServers.isEmpty()) {
|
||||
for (ServerName serverName: deadServers) {
|
||||
if (!serverManager.isServerDead(serverName)) {
|
||||
serverManager.expireServer(serverName); // Let SSH do region re-assign
|
||||
}
|
||||
|
@ -3420,7 +3412,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
} else if (code == TransitionCode.SPLIT_PONR) {
|
||||
try {
|
||||
regionStateStore.splitRegion(p, a, b, sn);
|
||||
regionStates.splitRegion(p, a, b, sn);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Failed to record split region " + p.getShortNameToLog());
|
||||
return "Failed to record the splitting in meta";
|
||||
|
@ -3469,7 +3461,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
} else if (code == TransitionCode.MERGE_PONR) {
|
||||
try {
|
||||
regionStateStore.mergeRegions(p, a, b, sn);
|
||||
regionStates.mergeRegions(p, a, b, sn);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Failed to record merged region " + p.getShortNameToLog());
|
||||
return "Failed to record the merging in meta";
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -47,14 +46,11 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -340,30 +336,6 @@ public class MasterFileSystem {
|
|||
return logDirs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark regions in recovering state when distributedLogReplay are set true
|
||||
* @param serverNames Set of ServerNames to be replayed wals in order to recover changes contained
|
||||
* in them
|
||||
* @throws IOException
|
||||
*/
|
||||
public void prepareLogReplay(Set<ServerName> serverNames) throws IOException {
|
||||
if (!this.distributedLogReplay) {
|
||||
return;
|
||||
}
|
||||
// mark regions in recovering state
|
||||
for (ServerName serverName : serverNames) {
|
||||
NavigableMap<HRegionInfo, Result> regions = this.getServerUserRegions(serverName);
|
||||
if (regions == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark regions in recovering state when distributedLogReplay are set true
|
||||
* @param serverName Failed region server whose wals to be replayed
|
||||
|
@ -675,19 +647,6 @@ public class MasterFileSystem {
|
|||
return htd;
|
||||
}
|
||||
|
||||
private NavigableMap<HRegionInfo, Result> getServerUserRegions(ServerName serverName)
|
||||
throws IOException {
|
||||
if (!this.master.isStopped()) {
|
||||
try {
|
||||
this.master.getCatalogTracker().waitForMeta();
|
||||
return MetaReader.getServerUserRegions(this.master.getCatalogTracker(), serverName);
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* The function is used in SSH to set recovery mode based on configuration after all outstanding
|
||||
* log split tasks drained.
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -248,16 +249,22 @@ public class RegionStates {
|
|||
* no effect, and the original state is returned.
|
||||
*/
|
||||
public RegionState createRegionState(final HRegionInfo hri) {
|
||||
return createRegionState(hri, null, null);
|
||||
return createRegionState(hri, null, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a region to RegionStates with the specified state.
|
||||
* If the region is already in RegionStates, this call has
|
||||
* no effect, and the original state is returned.
|
||||
*
|
||||
* @param hri the region info to create a state for
|
||||
* @param newState the state to the region in set to
|
||||
* @param serverName the server the region is transitioning on
|
||||
* @param lastHost the last server that hosts the region
|
||||
* @return the current state
|
||||
*/
|
||||
public synchronized RegionState createRegionState(
|
||||
final HRegionInfo hri, State newState, ServerName serverName) {
|
||||
public synchronized RegionState createRegionState(final HRegionInfo hri,
|
||||
State newState, ServerName serverName, ServerName lastHost) {
|
||||
if (newState == null || (newState == State.OPEN && serverName == null)) {
|
||||
newState = State.OFFLINE;
|
||||
}
|
||||
|
@ -274,17 +281,25 @@ public class RegionStates {
|
|||
regionState = new RegionState(hri, newState, serverName);
|
||||
regionStates.put(encodedName, regionState);
|
||||
if (newState == State.OPEN) {
|
||||
regionAssignments.put(hri, serverName);
|
||||
lastAssignments.put(encodedName, serverName);
|
||||
Set<HRegionInfo> regions = serverHoldings.get(serverName);
|
||||
if (regions == null) {
|
||||
regions = new HashSet<HRegionInfo>();
|
||||
serverHoldings.put(serverName, regions);
|
||||
if (!serverName.equals(lastHost)) {
|
||||
LOG.warn("Open region's last host " + lastHost
|
||||
+ " should be the same as the current one " + serverName
|
||||
+ ", ignored the last and used the current one");
|
||||
lastHost = serverName;
|
||||
}
|
||||
regions.add(hri);
|
||||
lastAssignments.put(encodedName, lastHost);
|
||||
regionAssignments.put(hri, lastHost);
|
||||
} else if (!regionState.isUnassignable()) {
|
||||
regionsInTransition.put(encodedName, regionState);
|
||||
}
|
||||
if (lastHost != null && newState != State.SPLIT) {
|
||||
Set<HRegionInfo> regions = serverHoldings.get(lastHost);
|
||||
if (regions == null) {
|
||||
regions = new HashSet<HRegionInfo>();
|
||||
serverHoldings.put(lastHost, regions);
|
||||
}
|
||||
regions.add(hri);
|
||||
}
|
||||
}
|
||||
return regionState;
|
||||
}
|
||||
|
@ -590,6 +605,31 @@ public class RegionStates {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a copy of all regions assigned to a server
|
||||
*/
|
||||
public synchronized Set<HRegionInfo> getServerRegions(ServerName serverName) {
|
||||
Set<HRegionInfo> regions = serverHoldings.get(serverName);
|
||||
if (regions == null) return null;
|
||||
return new HashSet<HRegionInfo>(regions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a region from all state maps.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public synchronized void deleteRegion(final HRegionInfo hri) {
|
||||
String encodedName = hri.getEncodedName();
|
||||
regionsInTransition.remove(encodedName);
|
||||
regionStates.remove(encodedName);
|
||||
lastAssignments.remove(encodedName);
|
||||
ServerName sn = regionAssignments.remove(hri);
|
||||
if (sn != null) {
|
||||
Set<HRegionInfo> regions = serverHoldings.get(sn);
|
||||
regions.remove(hri);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checking if a region was assigned to a server which is not online now.
|
||||
* If so, we should hold re-assign this region till SSH has split its hlogs.
|
||||
|
@ -651,6 +691,38 @@ public class RegionStates {
|
|||
lastAssignments.put(encodedName, serverName);
|
||||
}
|
||||
|
||||
void splitRegion(HRegionInfo p,
|
||||
HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
|
||||
regionStateStore.splitRegion(p, a, b, sn);
|
||||
synchronized (this) {
|
||||
// After PONR, split is considered to be done.
|
||||
// Update server holdings to be aligned with the meta.
|
||||
Set<HRegionInfo> regions = serverHoldings.get(sn);
|
||||
if (regions == null) {
|
||||
throw new IllegalStateException(sn + " should host some regions");
|
||||
}
|
||||
regions.remove(p);
|
||||
regions.add(a);
|
||||
regions.add(b);
|
||||
}
|
||||
}
|
||||
|
||||
void mergeRegions(HRegionInfo p,
|
||||
HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
|
||||
regionStateStore.mergeRegions(p, a, b, sn);
|
||||
synchronized (this) {
|
||||
// After PONR, merge is considered to be done.
|
||||
// Update server holdings to be aligned with the meta.
|
||||
Set<HRegionInfo> regions = serverHoldings.get(sn);
|
||||
if (regions == null) {
|
||||
throw new IllegalStateException(sn + " should host some regions");
|
||||
}
|
||||
regions.remove(a);
|
||||
regions.remove(b);
|
||||
regions.add(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* At cluster clean re/start, mark all user regions closed except those of tables
|
||||
* that are excluded, such as disabled/disabling/enabling tables. All user regions
|
||||
|
@ -661,8 +733,11 @@ public class RegionStates {
|
|||
Set<HRegionInfo> toBeClosed = new HashSet<HRegionInfo>(regionStates.size());
|
||||
for(RegionState state: regionStates.values()) {
|
||||
HRegionInfo hri = state.getRegion();
|
||||
if (state.isSplit() || hri.isSplit()) {
|
||||
continue;
|
||||
}
|
||||
TableName tableName = hri.getTable();
|
||||
if (!TableName.META_TABLE_NAME.equals(tableName) && !hri.isSplit()
|
||||
if (!TableName.META_TABLE_NAME.equals(tableName)
|
||||
&& (noExcludeTables || !excludedTables.contains(tableName))) {
|
||||
toBeClosed.add(hri);
|
||||
}
|
||||
|
@ -859,19 +934,4 @@ public class RegionStates {
|
|||
}
|
||||
return regionState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a region from all state maps.
|
||||
*/
|
||||
private synchronized void deleteRegion(final HRegionInfo hri) {
|
||||
String encodedName = hri.getEncodedName();
|
||||
regionsInTransition.remove(encodedName);
|
||||
regionStates.remove(encodedName);
|
||||
lastAssignments.remove(encodedName);
|
||||
ServerName sn = regionAssignments.remove(hri);
|
||||
if (sn != null) {
|
||||
Set<HRegionInfo> regions = serverHoldings.get(sn);
|
||||
regions.remove(hri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
|
|||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.util.ConfigUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
|
@ -162,8 +163,16 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
this.server.getCatalogTracker().waitForMeta();
|
||||
// Skip getting user regions if the server is stopped.
|
||||
if (!this.server.isStopped()) {
|
||||
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
|
||||
this.serverName).keySet();
|
||||
if (ConfigUtil.useZKForAssignment(server.getConfiguration())) {
|
||||
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
|
||||
this.serverName).keySet();
|
||||
} else {
|
||||
// Not using ZK for assignment, regionStates has everything we want
|
||||
hris = am.getRegionStates().getServerRegions(serverName);
|
||||
if (hris != null) {
|
||||
hris.remove(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -418,7 +418,7 @@ public class TestSplitTransactionOnCluster {
|
|||
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = true;
|
||||
// Now try splitting and it should work.
|
||||
split(hri, server, regionCount);
|
||||
// Assert the ephemeral node is up in zk.
|
||||
|
||||
String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
|
||||
hri.getEncodedName());
|
||||
RegionTransition rt = null;
|
||||
|
@ -437,7 +437,7 @@ public class TestSplitTransactionOnCluster {
|
|||
}
|
||||
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
|
||||
assertTrue(rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
|
||||
// Now crash the server
|
||||
// Now crash the server, for ZK-less assignment, the server is auto aborted
|
||||
cluster.abortRegionServer(tableRegionIndex);
|
||||
}
|
||||
waitUntilRegionServerDead();
|
||||
|
@ -1329,12 +1329,12 @@ public class TestSplitTransactionOnCluster {
|
|||
private void waitUntilRegionServerDead() throws InterruptedException, InterruptedIOException {
|
||||
// Wait until the master processes the RS shutdown
|
||||
for (int i=0; cluster.getMaster().getClusterStatus().
|
||||
getServers().size() == NB_SERVERS && i<100; i++) {
|
||||
getServers().size() > NB_SERVERS && i<100; i++) {
|
||||
LOG.info("Waiting on server to go down");
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertFalse("Waited too long for RS to die", cluster.getMaster().getClusterStatus().
|
||||
getServers().size() == NB_SERVERS);
|
||||
getServers().size() > NB_SERVERS);
|
||||
}
|
||||
|
||||
private void awaitDaughters(byte[] tableName, int numDaughters) throws InterruptedException {
|
||||
|
|
|
@ -1426,6 +1426,12 @@ public class TestHBaseFsck {
|
|||
meta.delete(new Delete(daughters.getSecond().getRegionName()));
|
||||
meta.flushCommits();
|
||||
|
||||
// Remove daughters from regionStates
|
||||
RegionStates regionStates = TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getAssignmentManager().getRegionStates();
|
||||
regionStates.deleteRegion(daughters.getFirst());
|
||||
regionStates.deleteRegion(daughters.getSecond());
|
||||
|
||||
HBaseFsck hbck = doFsck(conf, false);
|
||||
assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
|
||||
ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN}); //no LINGERING_SPLIT_PARENT
|
||||
|
|
Loading…
Reference in New Issue