HBASE-9792 Region states should update last assignments when a region is opened

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1539728 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-11-07 17:34:08 +00:00
parent ed1705d366
commit 698650982d
4 changed files with 120 additions and 151 deletions

View File

@ -579,8 +579,8 @@ public class AssignmentManager extends ZooKeeperListener {
return false; return false;
} }
} }
processRegionsInTransition(rt, hri, stat.getVersion()); return processRegionsInTransition(
return true; rt, hri, stat.getVersion());
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -589,11 +589,12 @@ public class AssignmentManager extends ZooKeeperListener {
/** /**
* This call is invoked only (1) master assign meta; * This call is invoked only (1) master assign meta;
* (2) during failover mode startup, zk assignment node processing. * (2) during failover mode startup, zk assignment node processing.
* The locker is set in the caller. * The locker is set in the caller. It returns true if the region
* is in transition for sure, false otherwise.
* *
* It should be private but it is used by some test too. * It should be private but it is used by some test too.
*/ */
void processRegionsInTransition( boolean processRegionsInTransition(
final RegionTransition rt, final HRegionInfo regionInfo, final RegionTransition rt, final HRegionInfo regionInfo,
final int expectedVersion) throws KeeperException { final int expectedVersion) throws KeeperException {
EventType et = rt.getEventType(); EventType et = rt.getEventType();
@ -609,101 +610,87 @@ public class AssignmentManager extends ZooKeeperListener {
+ et + ", does nothing since the region is already in transition " + et + ", does nothing since the region is already in transition "
+ regionStates.getRegionTransitionState(encodedName)); + regionStates.getRegionTransitionState(encodedName));
// Just return // Just return
return; return true;
}
if (!serverManager.isServerOnline(sn)) {
// It was on a dead server, it's closed now. Force to OFFLINE and put
// it in transition. Try to re-assign it, but it will fail most likely,
// since we have not done log splitting for the dead server yet.
LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
" was on deadserver; forcing offline");
ZKAssign.createOrForceNodeOffline(this.watcher, regionInfo, sn);
regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
invokeAssign(regionInfo);
return false;
} }
switch (et) { switch (et) {
case M_ZK_REGION_CLOSING: case M_ZK_REGION_CLOSING:
// If zk node of the region was updated by a live server skip this // Insert into RIT & resend the query to the region server: may be the previous master
// region and just add it into RIT. // died before sending the query the first time.
if (!serverManager.isServerOnline(sn)) { final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
// If was not online, its closed now. Force to OFFLINE and this this.executorService.submit(
// will get it reassigned if appropriate new EventHandler(server, EventType.M_MASTER_RECOVERY) {
forceOffline(regionInfo, rt); @Override
} else { public void process() throws IOException {
// Insert into RIT & resend the query to the region server: may be the previous master ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
// died before sending the query the first time. try {
final RegionState rs = regionStates.updateRegionState(rt, State.CLOSING); unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
this.executorService.submit( if (regionStates.isRegionOffline(regionInfo)) {
new EventHandler(server, EventType.M_MASTER_RECOVERY) { assign(regionInfo, true);
@Override
public void process() throws IOException {
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
try {
unassign(regionInfo, rs, expectedVersion, null, true, null);
if (regionStates.isRegionOffline(regionInfo)) {
assign(regionInfo, true);
}
} finally {
lock.unlock();
}
} }
}); } finally {
} lock.unlock();
}
}
});
break; break;
case RS_ZK_REGION_CLOSED: case RS_ZK_REGION_CLOSED:
case RS_ZK_REGION_FAILED_OPEN: case RS_ZK_REGION_FAILED_OPEN:
// Region is closed, insert into RIT and handle it // Region is closed, insert into RIT and handle it
addToRITandInvokeAssign(regionInfo, State.CLOSED, rt); regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
invokeAssign(regionInfo);
break; break;
case M_ZK_REGION_OFFLINE: case M_ZK_REGION_OFFLINE:
// If zk node of the region was updated by a live server skip this // Insert in RIT and resend to the regionserver
// region and just add it into RIT. regionStates.updateRegionState(rt, State.PENDING_OPEN);
if (!serverManager.isServerOnline(sn)) { final RegionState rsOffline = regionStates.getRegionState(regionInfo);
// Region is offline, insert into RIT and invoke assign this.executorService.submit(
addToRITandInvokeAssign(regionInfo, State.OFFLINE, rt); new EventHandler(server, EventType.M_MASTER_RECOVERY) {
} else { @Override
// Insert in RIT and resend to the regionserver public void process() throws IOException {
regionStates.updateRegionState(rt, State.PENDING_OPEN); ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
final RegionState rs = regionStates.getRegionState(regionInfo); try {
this.executorService.submit( RegionPlan plan = new RegionPlan(regionInfo, null, sn);
new EventHandler(server, EventType.M_MASTER_RECOVERY) { addPlan(encodedName, plan);
@Override assign(rsOffline, false, false);
public void process() throws IOException { } finally {
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); lock.unlock();
try { }
RegionPlan plan = new RegionPlan(regionInfo, null, sn); }
addPlan(encodedName, plan); });
assign(rs, false, false);
} finally {
lock.unlock();
}
}
});
}
break; break;
case RS_ZK_REGION_OPENING: case RS_ZK_REGION_OPENING:
if (!serverManager.isServerOnline(sn)) { regionStates.updateRegionState(rt, State.OPENING);
forceOffline(regionInfo, rt);
} else {
regionStates.updateRegionState(rt, State.OPENING);
}
break; break;
case RS_ZK_REGION_OPENED: case RS_ZK_REGION_OPENED:
if (!serverManager.isServerOnline(sn)) { // Region is opened, insert into RIT and handle it
forceOffline(regionInfo, rt); // This could be done asynchronously, we would need then to acquire the lock in the
} else { // handler.
// Region is opened, insert into RIT and handle it regionStates.updateRegionState(rt, State.OPEN);
// This could be done asynchronously, we would need then to acquire the lock in the new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
// handler.
regionStates.updateRegionState(rt, State.OPEN);
new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
}
break; break;
case RS_ZK_REQUEST_REGION_SPLIT: case RS_ZK_REQUEST_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING: case RS_ZK_REGION_SPLITTING:
case RS_ZK_REGION_SPLIT: case RS_ZK_REGION_SPLIT:
if (serverManager.isServerOnline(sn)) { // Splitting region should be online. We could have skipped it during
// Splitting region should be online. We could have skipped it during // user region rebuilding since we may consider the split is completed.
// user region rebuilding since we may consider the split is completed. // Put it in SPLITTING state to avoid complications.
// Put it in SPLITTING state to avoid complications. regionStates.regionOnline(regionInfo, sn);
regionStates.updateRegionState(regionInfo, State.OPEN, sn); regionStates.updateRegionState(rt, State.SPLITTING);
regionStates.regionOnline(regionInfo, sn);
regionStates.updateRegionState(rt, State.SPLITTING);
}
if (!handleRegionSplitting( if (!handleRegionSplitting(
rt, encodedName, prettyPrintedRegionName, sn)) { rt, encodedName, prettyPrintedRegionName, sn)) {
deleteSplittingNode(encodedName, sn); deleteSplittingNode(encodedName, sn);
@ -723,38 +710,7 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("Processed region " + prettyPrintedRegionName + " in state " LOG.info("Processed region " + prettyPrintedRegionName + " in state "
+ et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ") + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
+ "server: " + sn); + "server: " + sn);
} return true;
/**
* Put the region <code>hri</code> into an offline state up in zk.
*
* You need to have lock on the region before calling this method.
*
* @param hri
* @param oldRt
* @throws KeeperException
*/
private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
throws KeeperException {
// If was on dead server, its closed now. Force to OFFLINE and then
// invoke assign; this will get it reassigned if appropriate
LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
" was on deadserver; forcing offline");
ZKAssign.createOrForceNodeOffline(this.watcher, hri, oldRt.getServerName());
addToRITandInvokeAssign(hri, State.OFFLINE, oldRt);
}
/**
* Add to the in-memory copy of regions in transition and then invoke
* assign on passed region <code>hri</code>
* @param hri
* @param state
* @param oldData
*/
private void addToRITandInvokeAssign(final HRegionInfo hri,
final State state, final RegionTransition oldData) {
regionStates.updateRegionState(oldData, state);
invokeAssign(hri);
} }
/** /**
@ -1770,6 +1726,7 @@ public class AssignmentManager extends ZooKeeperListener {
state = regionStates.createRegionState(region); state = regionStates.createRegionState(region);
} }
ServerName sn = state.getServerName();
if (forceNewPlan && LOG.isDebugEnabled()) { if (forceNewPlan && LOG.isDebugEnabled()) {
LOG.debug("Force region state offline " + state); LOG.debug("Force region state offline " + state);
} }
@ -1788,23 +1745,27 @@ public class AssignmentManager extends ZooKeeperListener {
case FAILED_CLOSE: case FAILED_CLOSE:
case FAILED_OPEN: case FAILED_OPEN:
unassign(region, state, -1, null, false, null); unassign(region, state, -1, null, false, null);
RegionState oldState = state;
state = regionStates.getRegionState(region); state = regionStates.getRegionState(region);
if (state.isFailedClose()) { if (state.isFailedClose()) {
// If we can't close the region, we can't re-assign
// it so as to avoid possible double assignment/data loss.
LOG.info("Skip assigning " + LOG.info("Skip assigning " +
region + ", we couldn't close it: " + state); region + ", we couldn't close it: " + state);
return null; return null;
} }
// In these cases, we need to confirm with meta case OFFLINE:
// the region was not on a dead server if it's open/pending. // This region could have been open on this server
if ((oldState.isOpened() || oldState.isPendingOpenOrOpening()) // for a while. If the server is dead and not processed
&& wasRegionOnDeadServerByMeta(region, oldState.getServerName())) { // yet, we can move on only if the meta shows the
// region is not on this server actually, or on a server
// not dead, or dead and processed already.
if (regionStates.isServerDeadAndNotProcessed(sn)
&& wasRegionOnDeadServerByMeta(region, sn)) {
LOG.info("Skip assigning " + region.getRegionNameAsString() LOG.info("Skip assigning " + region.getRegionNameAsString()
+ ", it is on a dead but not processed yet server"); + ", it is on a dead but not processed yet server");
return null; return null;
} }
case CLOSED: case CLOSED:
case OFFLINE:
break; break;
default: default:
LOG.error("Trying to assign region " + region LOG.error("Trying to assign region " + region

View File

@ -268,9 +268,8 @@ public class RegionStates {
public synchronized RegionState updateRegionState( public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state) { final HRegionInfo hri, final State state) {
RegionState regionState = regionStates.get(hri.getEncodedName()); RegionState regionState = regionStates.get(hri.getEncodedName());
ServerName serverName = (regionState == null || state == State.CLOSED return updateRegionState(hri, state,
|| state == State.OFFLINE) ? null : regionState.getServerName(); regionState == null ? null : regionState.getServerName());
return updateRegionState(hri, state, serverName);
} }
/** /**
@ -300,42 +299,46 @@ public class RegionStates {
*/ */
public synchronized RegionState updateRegionState( public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state, final ServerName serverName) { final HRegionInfo hri, final State state, final ServerName serverName) {
ServerName newServerName = serverName;
if (serverName != null &&
(state == State.CLOSED || state == State.OFFLINE)) {
LOG.info(hri.getShortNameToLog() + " is " + state
+ ", reset server info from " + serverName + " to null");
newServerName = null;
}
if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) { if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
LOG.warn("Failed to transition " + hri.getShortNameToLog() LOG.warn("Failed to open/close " + hri.getShortNameToLog()
+ " on " + serverName + ", set to " + state); + " on " + serverName + ", set to " + state);
} }
String encodedName = hri.getEncodedName(); String encodedName = hri.getEncodedName();
RegionState regionState = new RegionState( RegionState regionState = new RegionState(
hri, state, System.currentTimeMillis(), newServerName); hri, state, System.currentTimeMillis(), serverName);
regionsInTransition.put(encodedName, regionState);
RegionState oldState = regionStates.put(encodedName, regionState); RegionState oldState = regionStates.put(encodedName, regionState);
if (oldState == null || oldState.getState() != regionState.getState()) { ServerName oldServerName = oldState == null ? null : oldState.getServerName();
if (oldState == null || oldState.getState() != regionState.getState()
|| (oldServerName == null && serverName != null)
|| (oldServerName != null && !oldServerName.equals(serverName))) {
LOG.info("Transitioned " + oldState + " to " + regionState); LOG.info("Transitioned " + oldState + " to " + regionState);
} }
if (newServerName != null || (
state != State.PENDING_CLOSE && state != State.CLOSING)) {
regionsInTransition.put(encodedName, regionState);
}
// For these states, region should be properly closed. // For these states, region should be properly closed.
// There should be no log splitting issue. // There should be no log splitting issue.
if ((state == State.CLOSED || state == State.MERGED if ((state == State.CLOSED || state == State.MERGED
|| state == State.SPLIT) && lastAssignments.containsKey(encodedName)) { || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
ServerName oldServerName = oldState == null ? null : oldState.getServerName();
ServerName last = lastAssignments.get(encodedName); ServerName last = lastAssignments.get(encodedName);
if (last.equals(oldServerName)) { if (last.equals(serverName)) {
lastAssignments.remove(encodedName); lastAssignments.remove(encodedName);
} else { } else {
LOG.warn(encodedName + " moved to " + state + " on " LOG.warn(encodedName + " moved to " + state + " on "
+ oldServerName + ", expected " + last); + serverName + ", expected " + last);
}
}
// Once a region is opened, record its last assignment right away.
if (serverName != null && state == State.OPEN) {
ServerName last = lastAssignments.get(encodedName);
if (!serverName.equals(last)) {
lastAssignments.put(encodedName, serverName);
if (last != null && isServerDeadAndNotProcessed(last)) {
LOG.warn(encodedName + " moved to " + serverName
+ ", while it's previous host " + last
+ " is dead but not processed yet");
}
} }
} }
@ -365,18 +368,10 @@ public class RegionStates {
RegionState oldState = regionStates.get(encodedName); RegionState oldState = regionStates.get(encodedName);
if (oldState == null) { if (oldState == null) {
LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog()); LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog());
} else {
ServerName sn = oldState.getServerName();
if (!oldState.isReadyToOnline() || sn == null || !sn.equals(serverName)) {
LOG.debug("Online " + hri.getShortNameToLog() + " with current state="
+ oldState.getState() + ", expected state=OPEN/MERGING_NEW/SPLITTING_NEW"
+ ", assigned to server: " + sn + " expected " + serverName);
}
} }
updateRegionState(hri, State.OPEN, serverName); updateRegionState(hri, State.OPEN, serverName);
regionsInTransition.remove(encodedName); regionsInTransition.remove(encodedName);
lastAssignments.put(encodedName, serverName);
ServerName oldServerName = regionAssignments.put(hri, serverName); ServerName oldServerName = regionAssignments.put(hri, serverName);
if (!serverName.equals(oldServerName)) { if (!serverName.equals(oldServerName)) {
LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName); LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);

View File

@ -848,6 +848,7 @@ public class TestAssignmentManager {
REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY); REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY);
version = ZKAssign.getVersion(this.watcher, REGIONINFO); version = ZKAssign.getVersion(this.watcher, REGIONINFO);
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(false); Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(false);
am.getRegionStates().logSplit(SERVERNAME_A); // Assume log splitting is done
am.getRegionStates().createRegionState(REGIONINFO); am.getRegionStates().createRegionState(REGIONINFO);
am.gate.set(false); am.gate.set(false);
am.processRegionsInTransition(rt, REGIONINFO, version); am.processRegionsInTransition(rt, REGIONINFO, version);

View File

@ -661,18 +661,30 @@ public class TestAssignmentManagerOnCluster {
// Hold SSH before killing the hosting server // Hold SSH before killing the hosting server
master.enableSSH(false); master.enableSSH(false);
// Kill the hosting server
AssignmentManager am = master.getAssignmentManager(); AssignmentManager am = master.getAssignmentManager();
RegionStates regionStates = am.getRegionStates(); RegionStates regionStates = am.getRegionStates();
assertTrue(am.waitForAssignment(hri)); ServerName metaServer = regionStates.getRegionServerOfRegion(
RegionState state = regionStates.getRegionState(hri); HRegionInfo.FIRST_META_REGIONINFO);
ServerName oldServerName = state.getServerName(); while (true) {
cluster.killRegionServer(oldServerName); assertTrue(am.waitForAssignment(hri));
cluster.waitForRegionServerToStop(oldServerName, -1); RegionState state = regionStates.getRegionState(hri);
ServerName oldServerName = state.getServerName();
if (!ServerName.isSameHostnameAndPort(oldServerName, metaServer)) {
// Kill the hosting server, which doesn't have meta on it.
cluster.killRegionServer(oldServerName);
cluster.waitForRegionServerToStop(oldServerName, -1);
break;
}
int i = cluster.getServerWithMeta();
HRegionServer rs = cluster.getRegionServer(i == 0 ? 1 : 0);
oldServerName = rs.getServerName();
master.move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(oldServerName.getServerName()));
}
// You can't assign a dead region before SSH // You can't assign a dead region before SSH
am.assign(hri, true, true); am.assign(hri, true, true);
state = regionStates.getRegionState(hri); RegionState state = regionStates.getRegionState(hri);
assertTrue(state.isFailedClose()); assertTrue(state.isFailedClose());
// You can't unassign a dead region before SSH either // You can't unassign a dead region before SSH either