HBASE-7701 Opening regions on dead server are not reassigned quickly
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1441565 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6657449eb4
commit
157f4eb83d
|
@ -2705,7 +2705,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* @param sn Server that went down.
|
* @param sn Server that went down.
|
||||||
* @return list of regions in transition on this server
|
* @return list of regions in transition on this server
|
||||||
*/
|
*/
|
||||||
public List<RegionState> processServerShutdown(final ServerName sn) {
|
public List<HRegionInfo> processServerShutdown(final ServerName sn) {
|
||||||
// Clean out any existing assignment plans for this server
|
// Clean out any existing assignment plans for this server
|
||||||
synchronized (this.regionPlans) {
|
synchronized (this.regionPlans) {
|
||||||
for (Iterator <Map.Entry<String, RegionPlan>> i =
|
for (Iterator <Map.Entry<String, RegionPlan>> i =
|
||||||
|
@ -2719,7 +2719,30 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return regionStates.serverOffline(sn);
|
List<HRegionInfo> regions = regionStates.serverOffline(sn);
|
||||||
|
for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
|
||||||
|
HRegionInfo hri = it.next();
|
||||||
|
String encodedName = hri.getEncodedName();
|
||||||
|
|
||||||
|
// We need a lock on the region as we could update it
|
||||||
|
Lock lock = locker.acquireLock(encodedName);
|
||||||
|
try {
|
||||||
|
RegionState regionState =
|
||||||
|
regionStates.getRegionTransitionState(encodedName);
|
||||||
|
if (regionState == null
|
||||||
|
|| !regionState.isPendingOpenOrOpeningOnServer(sn)) {
|
||||||
|
LOG.info("Skip region " + hri
|
||||||
|
+ " since it is not opening on the dead server any more: " + sn);
|
||||||
|
it.remove();
|
||||||
|
} else {
|
||||||
|
// Mark the region closed and assign it again by SSH
|
||||||
|
regionStates.updateRegionState(hri, RegionState.State.CLOSED);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return regions;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -300,7 +300,7 @@ public class RegionStates {
|
||||||
State state = oldState.getState();
|
State state = oldState.getState();
|
||||||
ServerName sn = oldState.getServerName();
|
ServerName sn = oldState.getServerName();
|
||||||
if (state != State.OFFLINE || sn != null) {
|
if (state != State.OFFLINE || sn != null) {
|
||||||
LOG.debug("Online a region with current state=" + state + ", expected state=OFFLINE"
|
LOG.debug("Offline a region with current state=" + state + ", expected state=OFFLINE"
|
||||||
+ ", assigned to server: " + sn + ", expected null");
|
+ ", assigned to server: " + sn + ", expected null");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -317,10 +317,10 @@ public class RegionStates {
|
||||||
/**
|
/**
|
||||||
* A server is offline, all regions on it are dead.
|
* A server is offline, all regions on it are dead.
|
||||||
*/
|
*/
|
||||||
public synchronized List<RegionState> serverOffline(final ServerName sn) {
|
public synchronized List<HRegionInfo> serverOffline(final ServerName sn) {
|
||||||
// Clean up this server from map of servers to regions, and remove all regions
|
// Clean up this server from map of servers to regions, and remove all regions
|
||||||
// of this server from online map of regions.
|
// of this server from online map of regions.
|
||||||
List<RegionState> rits = new ArrayList<RegionState>();
|
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
|
||||||
Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
|
Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
|
||||||
if (assignedRegions == null) {
|
if (assignedRegions == null) {
|
||||||
assignedRegions = new HashSet<HRegionInfo>();
|
assignedRegions = new HashSet<HRegionInfo>();
|
||||||
|
@ -330,19 +330,23 @@ public class RegionStates {
|
||||||
regionAssignments.remove(region);
|
regionAssignments.remove(region);
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if any of the regions that were online on this server were in RIT
|
|
||||||
// If they are, normal timeouts will deal with them appropriately so
|
|
||||||
// let's skip a manual re-assignment.
|
|
||||||
for (RegionState state : regionsInTransition.values()) {
|
for (RegionState state : regionsInTransition.values()) {
|
||||||
if (assignedRegions.contains(state.getRegion())) {
|
HRegionInfo hri = state.getRegion();
|
||||||
rits.add(state);
|
if (assignedRegions.contains(hri)) {
|
||||||
|
// Region is open on this region server, but in transition.
|
||||||
|
// This region must be moving away from this server.
|
||||||
|
// SSH will handle it, either skip assigning, or re-assign.
|
||||||
|
LOG.info("Transitioning region "
|
||||||
|
+ state + " will be handled by SSH for " + sn);
|
||||||
} else if (sn.equals(state.getServerName())) {
|
} else if (sn.equals(state.getServerName())) {
|
||||||
// Region is in transition on this region server, and this
|
// Region is in transition on this region server, and this
|
||||||
// region is not open on this server. So the region must be
|
// region is not open on this server. So the region must be
|
||||||
// moving to this server from another one (i.e. opening or
|
// moving to this server from another one (i.e. opening or
|
||||||
// pending open on this server, was open on another one
|
// pending open on this server, was open on another one
|
||||||
if (state.isPendingOpen() || state.isOpening()) {
|
if (state.isPendingOpen() || state.isOpening()) {
|
||||||
state.setTimestamp(0); // timeout it, let timeout monitor reassign
|
LOG.info("Found opening region "
|
||||||
|
+ state + " to be reassigned by SSH for " + sn);
|
||||||
|
rits.add(hri);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected state "
|
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected state "
|
||||||
+ state + " of region in transition on server " + sn);
|
+ state + " of region in transition on server " + sn);
|
||||||
|
|
|
@ -198,25 +198,30 @@ public class ServerShutdownHandler extends EventHandler {
|
||||||
// OFFLINE? -- and then others after like CLOSING that depend on log
|
// OFFLINE? -- and then others after like CLOSING that depend on log
|
||||||
// splitting.
|
// splitting.
|
||||||
AssignmentManager am = services.getAssignmentManager();
|
AssignmentManager am = services.getAssignmentManager();
|
||||||
List<RegionState> regionsInTransition = am.processServerShutdown(serverName);
|
List<HRegionInfo> regionsInTransition = am.processServerShutdown(serverName);
|
||||||
LOG.info("Reassigning " + ((hris == null)? 0: hris.size()) +
|
LOG.info("Reassigning " + ((hris == null)? 0: hris.size()) +
|
||||||
" region(s) that " + (serverName == null? "null": serverName) +
|
" region(s) that " + (serverName == null? "null": serverName) +
|
||||||
" was carrying (skipping " + regionsInTransition.size() +
|
" was carrying (and " + regionsInTransition.size() +
|
||||||
" regions(s) that are already in transition)");
|
" regions(s) that were opening on this server)");
|
||||||
|
|
||||||
|
List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
|
||||||
|
toAssignRegions.addAll(regionsInTransition);
|
||||||
|
|
||||||
// Iterate regions that were on this server and assign them
|
// Iterate regions that were on this server and assign them
|
||||||
if (hris != null) {
|
if (hris != null) {
|
||||||
RegionStates regionStates = am.getRegionStates();
|
RegionStates regionStates = am.getRegionStates();
|
||||||
List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
|
|
||||||
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
|
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
|
||||||
HRegionInfo hri = e.getKey();
|
HRegionInfo hri = e.getKey();
|
||||||
|
if (regionsInTransition.contains(hri)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
RegionState rit = regionStates.getRegionTransitionState(hri);
|
RegionState rit = regionStates.getRegionTransitionState(hri);
|
||||||
if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
|
if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
|
||||||
ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
|
ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
|
||||||
if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
|
if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
|
||||||
// If this region is in transition on the dead server, it must be
|
// If this region is in transition on the dead server, it must be
|
||||||
// opening or pending_open, which is covered by AM#processServerShutdown
|
// opening or pending_open, which should have been covered by AM#processServerShutdown
|
||||||
LOG.debug("Skip assigning region " + hri.getRegionNameAsString()
|
LOG.info("Skip assigning region " + hri.getRegionNameAsString()
|
||||||
+ " because it has been opened in " + addressFromAM.getServerName());
|
+ " because it has been opened in " + addressFromAM.getServerName());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -262,12 +267,12 @@ public class ServerShutdownHandler extends EventHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
}
|
||||||
am.assign(toAssignRegions);
|
try {
|
||||||
} catch (InterruptedException ie) {
|
am.assign(toAssignRegions);
|
||||||
LOG.error("Caught " + ie + " during round-robin assignment");
|
} catch (InterruptedException ie) {
|
||||||
throw new IOException(ie);
|
LOG.error("Caught " + ie + " during round-robin assignment");
|
||||||
}
|
throw new IOException(ie);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.deadServers.finish(serverName);
|
this.deadServers.finish(serverName);
|
||||||
|
|
|
@ -912,7 +912,7 @@ public class TestAssignmentManager {
|
||||||
/**
|
/**
|
||||||
* When a region is in transition, if the region server opening the region goes down,
|
* When a region is in transition, if the region server opening the region goes down,
|
||||||
* the region assignment takes a long time normally (waiting for timeout monitor to trigger assign).
|
* the region assignment takes a long time normally (waiting for timeout monitor to trigger assign).
|
||||||
* This test is to make sure SSH times out the transition right away.
|
* This test is to make sure SSH reassigns it right away.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSSHTimesOutOpeningRegionTransition()
|
public void testSSHTimesOutOpeningRegionTransition()
|
||||||
|
@ -925,6 +925,7 @@ public class TestAssignmentManager {
|
||||||
// adding region in pending open.
|
// adding region in pending open.
|
||||||
RegionState state = new RegionState(REGIONINFO,
|
RegionState state = new RegionState(REGIONINFO,
|
||||||
State.OPENING, System.currentTimeMillis(), SERVERNAME_A);
|
State.OPENING, System.currentTimeMillis(), SERVERNAME_A);
|
||||||
|
am.getRegionStates().regionOnline(REGIONINFO, SERVERNAME_B);
|
||||||
am.getRegionStates().regionsInTransition.put(REGIONINFO.getEncodedName(), state);
|
am.getRegionStates().regionsInTransition.put(REGIONINFO.getEncodedName(), state);
|
||||||
// adding region plan
|
// adding region plan
|
||||||
am.regionPlans.put(REGIONINFO.getEncodedName(),
|
am.regionPlans.put(REGIONINFO.getEncodedName(),
|
||||||
|
@ -932,8 +933,9 @@ public class TestAssignmentManager {
|
||||||
am.getZKTable().setEnabledTable(REGIONINFO.getTableNameAsString());
|
am.getZKTable().setEnabledTable(REGIONINFO.getTableNameAsString());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
am.assignInvoked = false;
|
||||||
processServerShutdownHandler(ct, am, false);
|
processServerShutdownHandler(ct, am, false);
|
||||||
assertTrue("Transtion is timed out", state.getStamp() == 0);
|
assertTrue(am.assignInvoked);
|
||||||
} finally {
|
} finally {
|
||||||
am.getRegionStates().regionsInTransition.remove(REGIONINFO.getEncodedName());
|
am.getRegionStates().regionsInTransition.remove(REGIONINFO.getEncodedName());
|
||||||
am.regionPlans.remove(REGIONINFO.getEncodedName());
|
am.regionPlans.remove(REGIONINFO.getEncodedName());
|
||||||
|
@ -1084,7 +1086,7 @@ public class TestAssignmentManager {
|
||||||
@Override
|
@Override
|
||||||
public void assign(List<HRegionInfo> regions)
|
public void assign(List<HRegionInfo> regions)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
assignInvoked = true;
|
assignInvoked = (regions != null && regions.size() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** reset the watcher */
|
/** reset the watcher */
|
||||||
|
|
Loading…
Reference in New Issue