HBASE-5200 AM.ProcessRegionInTransition() and AM.handleRegion() race thus leaving the region assignment inconsistent
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1290854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0e54a5f331
commit
588206f411
|
@ -170,7 +170,17 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
|
||||
EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
|
||||
|
||||
|
||||
/**
|
||||
* Set when we are doing master failover processing; cleared when failover
|
||||
* completes.
|
||||
*/
|
||||
private volatile boolean failover = false;
|
||||
|
||||
// Set holding all the regions which got processed while RIT was not
|
||||
// populated during master failover.
|
||||
private Map<String, HRegionInfo> failoverProcessedRegions =
|
||||
new HashMap<String, HRegionInfo>();
|
||||
|
||||
/**
|
||||
* Constructs a new assignment manager.
|
||||
|
@ -321,8 +331,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
// Scan META to build list of existing regions, servers, and assignment
|
||||
// Returns servers who have not checked in (assumed dead) and their regions
|
||||
Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers =
|
||||
rebuildUserRegions();
|
||||
Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers = rebuildUserRegions();
|
||||
|
||||
processDeadServersAndRegionsInTransition(deadServers);
|
||||
|
||||
|
@ -362,29 +371,29 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
watcher.assignmentZNode);
|
||||
// Run through all regions. If they are not assigned and not in RIT, then
|
||||
// its a clean cluster startup, else its a failover.
|
||||
boolean regionsToProcess = false;
|
||||
for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
|
||||
if (!e.getKey().isMetaTable()
|
||||
&& e.getValue() != null) {
|
||||
LOG.debug("Found " + e + " out on cluster");
|
||||
regionsToProcess = true;
|
||||
this.failover = true;
|
||||
break;
|
||||
}
|
||||
if (nodes.contains(e.getKey().getEncodedName())) {
|
||||
LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
|
||||
// Could be a meta region.
|
||||
regionsToProcess = true;
|
||||
this.failover = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If we found user regions out on cluster, its a failover.
|
||||
if (regionsToProcess) {
|
||||
if (this.failover) {
|
||||
LOG.info("Found regions out on cluster or in RIT; failover");
|
||||
// Process list of dead servers and regions in RIT.
|
||||
// See HBASE-4580 for more information.
|
||||
processDeadServersAndRecoverLostRegions(deadServers, nodes);
|
||||
|
||||
this.failover = false;
|
||||
failoverProcessedRegions.clear();
|
||||
} else {
|
||||
// Fresh cluster startup.
|
||||
LOG.info("Clean cluster startup. Assigning userregions");
|
||||
|
@ -440,10 +449,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (data == null) return false;
|
||||
HRegionInfo hri = regionInfo;
|
||||
if (hri == null) {
|
||||
Pair<HRegionInfo, ServerName> p =
|
||||
MetaReader.getRegion(catalogTracker, data.getRegionName());
|
||||
if (p == null) return false;
|
||||
hri = p.getFirst();
|
||||
if ((hri = getHRegionInfo(data)) == null) return false;
|
||||
}
|
||||
processRegionsInTransition(data, hri, deadServers, stat.getVersion());
|
||||
return true;
|
||||
|
@ -458,6 +464,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
|
||||
" in state " + data.getEventType());
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState regionState = regionsInTransition.get(encodedRegionName);
|
||||
if (regionState != null ||
|
||||
failoverProcessedRegions.containsKey(encodedRegionName)) {
|
||||
// Just return
|
||||
return;
|
||||
}
|
||||
switch (data.getEventType()) {
|
||||
case M_ZK_REGION_CLOSING:
|
||||
// If zk node of the region was updated by a live server skip this
|
||||
|
@ -474,17 +486,35 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
regionInfo, RegionState.State.CLOSING,
|
||||
data.getStamp(), data.getOrigin()));
|
||||
}
|
||||
failoverProcessedRegions.put(encodedRegionName, regionInfo);
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_CLOSED:
|
||||
case RS_ZK_REGION_FAILED_OPEN:
|
||||
// Region is closed, insert into RIT and handle it
|
||||
addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
|
||||
failoverProcessedRegions.put(encodedRegionName, regionInfo);
|
||||
break;
|
||||
|
||||
case M_ZK_REGION_OFFLINE:
|
||||
// Region is offline, insert into RIT and handle it like a closed
|
||||
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
|
||||
// If zk node of the region was updated by a live server skip this
|
||||
// region and just add it into RIT.
|
||||
if (isOnDeadServer(regionInfo, deadServers) &&
|
||||
(data.getOrigin() == null ||
|
||||
!serverManager.isServerOnline(data.getOrigin()))) {
|
||||
// Region is offline, insert into RIT and handle it like a closed
|
||||
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
|
||||
} else if (data.getOrigin() != null &&
|
||||
!serverManager.isServerOnline(data.getOrigin())) {
|
||||
// to handle cases where offline node is created but sendRegionOpen
|
||||
// RPC is not yet sent
|
||||
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
|
||||
} else {
|
||||
regionsInTransition.put(encodedRegionName, new RegionState(
|
||||
regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data
|
||||
.getOrigin()));
|
||||
}
|
||||
failoverProcessedRegions.put(encodedRegionName, regionInfo);
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_OPENING:
|
||||
|
@ -507,6 +537,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
|
||||
RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
|
||||
failoverProcessedRegions.put(encodedRegionName, regionInfo);
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_OPENED:
|
||||
|
@ -530,10 +561,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion)
|
||||
.process();
|
||||
}
|
||||
failoverProcessedRegions.put(encodedRegionName, regionInfo);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Put the region <code>hri</code> into an offline state up in zk.
|
||||
|
@ -609,6 +642,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
*/
|
||||
private void handleRegion(final RegionTransitionData data, int expectedVersion) {
|
||||
synchronized(regionsInTransition) {
|
||||
HRegionInfo hri = null;
|
||||
if (data == null || data.getOrigin() == null) {
|
||||
LOG.warn("Unexpected NULL input " + data);
|
||||
return;
|
||||
|
@ -686,6 +720,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
break;
|
||||
|
||||
case M_ZK_REGION_CLOSING:
|
||||
hri = checkIfInFailover(regionState, encodedName, data);
|
||||
if (hri != null) {
|
||||
regionState = new RegionState(hri, RegionState.State.CLOSING, data
|
||||
.getStamp(), data.getOrigin());
|
||||
regionsInTransition.put(encodedName, regionState);
|
||||
failoverProcessedRegions.put(encodedName, hri);
|
||||
break;
|
||||
}
|
||||
// Should see CLOSING after we have asked it to CLOSE or additional
|
||||
// times after already being in state of CLOSING
|
||||
if (regionState == null ||
|
||||
|
@ -702,6 +744,17 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
break;
|
||||
|
||||
case RS_ZK_REGION_CLOSED:
|
||||
hri = checkIfInFailover(regionState, encodedName, data);
|
||||
if (hri != null) {
|
||||
regionState = new RegionState(hri, RegionState.State.CLOSED, data
|
||||
.getStamp(), data.getOrigin());
|
||||
regionsInTransition.put(encodedName, regionState);
|
||||
removeClosedRegion(regionState.getRegion());
|
||||
new ClosedRegionHandler(master, this, regionState.getRegion())
|
||||
.process();
|
||||
failoverProcessedRegions.put(encodedName, hri);
|
||||
break;
|
||||
}
|
||||
// Should see CLOSED after CLOSING but possible after PENDING_CLOSE
|
||||
if (regionState == null ||
|
||||
(!regionState.isPendingClose() && !regionState.isClosing())) {
|
||||
|
@ -716,12 +769,22 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// what follows will fail because not in expected state.
|
||||
regionState.update(RegionState.State.CLOSED,
|
||||
data.getStamp(), data.getOrigin());
|
||||
removeClosedRegion(regionState.getRegion());
|
||||
removeClosedRegion(regionState.getRegion());
|
||||
this.executorService.submit(new ClosedRegionHandler(master,
|
||||
this, regionState.getRegion()));
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_FAILED_OPEN:
|
||||
hri = checkIfInFailover(regionState, encodedName, data);
|
||||
if (hri != null) {
|
||||
regionState = new RegionState(hri, RegionState.State.CLOSED, data
|
||||
.getStamp(), data.getOrigin());
|
||||
regionsInTransition.put(encodedName, regionState);
|
||||
new ClosedRegionHandler(master, this, regionState.getRegion())
|
||||
.process();
|
||||
failoverProcessedRegions.put(encodedName, hri);
|
||||
break;
|
||||
}
|
||||
if (regionState == null ||
|
||||
(!regionState.isPendingOpen() && !regionState.isOpening())) {
|
||||
LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
|
||||
|
@ -737,6 +800,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
break;
|
||||
|
||||
case RS_ZK_REGION_OPENING:
|
||||
hri = checkIfInFailover(regionState, encodedName, data);
|
||||
if (hri != null) {
|
||||
regionState = new RegionState(hri, RegionState.State.OPENING, data
|
||||
.getStamp(), data.getOrigin());
|
||||
regionsInTransition.put(encodedName, regionState);
|
||||
failoverProcessedRegions.put(encodedName, hri);
|
||||
break;
|
||||
}
|
||||
// Should see OPENING after we have asked it to OPEN or additional
|
||||
// times after already being in state of OPENING
|
||||
if (regionState == null ||
|
||||
|
@ -754,6 +825,16 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
break;
|
||||
|
||||
case RS_ZK_REGION_OPENED:
|
||||
hri = checkIfInFailover(regionState, encodedName, data);
|
||||
if (hri != null) {
|
||||
regionState = new RegionState(hri, RegionState.State.OPEN, data
|
||||
.getStamp(), data.getOrigin());
|
||||
regionsInTransition.put(encodedName, regionState);
|
||||
new OpenedRegionHandler(master, this, regionState.getRegion(), data
|
||||
.getOrigin(), expectedVersion).process();
|
||||
failoverProcessedRegions.put(encodedName, hri);
|
||||
break;
|
||||
}
|
||||
// Should see OPENED after OPENING but possible after PENDING_OPEN
|
||||
if (regionState == null ||
|
||||
(!regionState.isPendingOpen() && !regionState.isOpening())) {
|
||||
|
@ -775,6 +856,44 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the callback came while RIT was not yet populated during
|
||||
* master failover.
|
||||
* @param regionState
|
||||
* @param encodedName
|
||||
* @param data
|
||||
* @return hri
|
||||
*/
|
||||
private HRegionInfo checkIfInFailover(RegionState regionState,
|
||||
String encodedName, RegionTransitionData data) {
|
||||
if (regionState == null && this.failover &&
|
||||
(failoverProcessedRegions.containsKey(encodedName) == false ||
|
||||
failoverProcessedRegions.get(encodedName) == null)) {
|
||||
HRegionInfo hri = this.failoverProcessedRegions.get(encodedName);
|
||||
if (hri == null) hri = getHRegionInfo(data);
|
||||
return hri;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the HRegionInfo from the META table
|
||||
* @param data
|
||||
* @return HRegionInfo hri for the region
|
||||
*/
|
||||
private HRegionInfo getHRegionInfo(RegionTransitionData data) {
|
||||
Pair<HRegionInfo, ServerName> p = null;
|
||||
try {
|
||||
p = MetaReader.getRegion(catalogTracker, data.getRegionName());
|
||||
if (p == null) return null;
|
||||
return p.getFirst();
|
||||
} catch (IOException e) {
|
||||
master.abort("Aborting because error occoured while reading "
|
||||
+ data.getRegionName() + " from .META.", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns true if this RegionState is splittable; i.e. the
|
||||
* RegionState is currently in splitting state or pending_close or
|
||||
|
|
|
@ -59,6 +59,10 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -127,10 +131,162 @@ public class TestAssignmentManager {
|
|||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
if (this.watcher != null) this.watcher.close();
|
||||
public void after() throws KeeperException {
|
||||
if (this.watcher != null) {
|
||||
// Clean up all znodes
|
||||
ZKAssign.deleteAllNodes(this.watcher);
|
||||
this.watcher.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a balance going on at same time as a master failover
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test(timeout = 5000)
|
||||
public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
try {
|
||||
createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
|
||||
startFakeFailedOverMasterAssignmentManager(am, this.watcher);
|
||||
while (!am.processRITInvoked) Thread.sleep(1);
|
||||
// Now fake the region closing successfully over on the regionserver; the
|
||||
// regionserver will have set the region in CLOSED state. This will
|
||||
// trigger callback into AM. The below zk close call is from the RS close
|
||||
// region handler duplicated here because its down deep in a private
|
||||
// method hard to expose.
|
||||
int versionid =
|
||||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO,
|
||||
EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
// Get current versionid else will fail on transition from OFFLINE to
|
||||
// OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
assertNotSame(-1, versionid);
|
||||
// This uglyness below is what the openregionhandler on RS side does.
|
||||
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
|
||||
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
|
||||
EventType.RS_ZK_REGION_OPENING, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Move znode from OPENING to OPENED as RS does on successful open.
|
||||
versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO,
|
||||
SERVERNAME_B, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
am.gate.set(false);
|
||||
// Block here until our znode is cleared or until this test times out.
|
||||
ZKAssign.blockUntilNoRIT(watcher);
|
||||
} finally {
|
||||
am.getExecutorService().shutdown();
|
||||
am.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testBalanceOnMasterFailoverScenarioWithClosedNode()
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
try {
|
||||
createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
|
||||
startFakeFailedOverMasterAssignmentManager(am, this.watcher);
|
||||
while (!am.processRITInvoked) Thread.sleep(1);
|
||||
// Now fake the region closing successfully over on the regionserver; the
|
||||
// regionserver will have set the region in CLOSED state. This will
|
||||
// trigger callback into AM. The below zk close call is from the RS close
|
||||
// region handler duplicated here because its down deep in a private
|
||||
// method hard to expose.
|
||||
int versionid =
|
||||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
am.gate.set(false);
|
||||
while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO,
|
||||
EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
// Get current versionid else will fail on transition from OFFLINE to
|
||||
// OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
assertNotSame(-1, versionid);
|
||||
// This uglyness below is what the openregionhandler on RS side does.
|
||||
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
|
||||
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
|
||||
EventType.RS_ZK_REGION_OPENING, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Move znode from OPENING to OPENED as RS does on successful open.
|
||||
versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO,
|
||||
SERVERNAME_B, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
|
||||
// Block here until our znode is cleared or until this test timesout.
|
||||
ZKAssign.blockUntilNoRIT(watcher);
|
||||
} finally {
|
||||
am.getExecutorService().shutdown();
|
||||
am.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testBalanceOnMasterFailoverScenarioWithOfflineNode()
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
try {
|
||||
createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
|
||||
startFakeFailedOverMasterAssignmentManager(am, this.watcher);
|
||||
while (!am.processRITInvoked) Thread.sleep(1);
|
||||
// Now fake the region closing successfully over on the regionserver; the
|
||||
// regionserver will have set the region in CLOSED state. This will
|
||||
// trigger callback into AM. The below zk close call is from the RS close
|
||||
// region handler duplicated here because its down deep in a private
|
||||
// method hard to expose.
|
||||
int versionid =
|
||||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO,
|
||||
EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
am.gate.set(false);
|
||||
// Get current versionid else will fail on transition from OFFLINE to
|
||||
// OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
assertNotSame(-1, versionid);
|
||||
// This uglyness below is what the openregionhandler on RS side does.
|
||||
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
|
||||
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
|
||||
EventType.RS_ZK_REGION_OPENING, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Move znode from OPENING to OPENED as RS does on successful open.
|
||||
versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO,
|
||||
SERVERNAME_B, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Block here until our znode is cleared or until this test timesout.
|
||||
ZKAssign.blockUntilNoRIT(watcher);
|
||||
} finally {
|
||||
am.getExecutorService().shutdown();
|
||||
am.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void createRegionPlanAndBalance(final AssignmentManager am,
|
||||
final ServerName from, final ServerName to, final HRegionInfo hri) {
|
||||
// Call the balance function but fake the region being online first at
|
||||
// servername from.
|
||||
am.regionOnline(hri, from);
|
||||
// Balance region from 'from' to 'to'. It calls unassign setting CLOSING state
|
||||
// up in zk. Create a plan and balance
|
||||
am.balance(new RegionPlan(hri, from, to));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests AssignmentManager balance function. Runs a balance moving a region
|
||||
* from one server to another mocking regionserver responding over zk.
|
||||
|
@ -386,6 +542,137 @@ public class TestAssignmentManager {
|
|||
am.unassign(hri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an {@link AssignmentManagerWithExtrasForTesting} that has mocked
|
||||
* {@link CatalogTracker} etc.
|
||||
* @param server
|
||||
* @param manager
|
||||
* @return An AssignmentManagerWithExtras with mock connections, etc.
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(final Server server,
|
||||
final ServerManager manager)
|
||||
throws IOException, KeeperException {
|
||||
// We need a mocked catalog tracker. Its used by our AM instance.
|
||||
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
||||
// Make an RS Interface implementation. Make it so a scanner can go against
|
||||
// it and a get to return the single region, REGIONINFO, this test is
|
||||
// messing with. Needed when "new master" joins cluster. AM will try and
|
||||
// rebuild its list of user regions and it will also get the HRI that goes
|
||||
// with an encoded name by doing a Get on .META.
|
||||
HRegionInterface ri = Mockito.mock(HRegionInterface.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A for REGIONINFO
|
||||
Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
Mockito.when(ri .openScanner((byte[]) Mockito.any(), (Scan) Mockito.any())).
|
||||
thenReturn(System.currentTimeMillis());
|
||||
// Return good result 'r' first and then return null to indicate end of scan
|
||||
Mockito.when(ri.next(Mockito.anyLong(), Mockito.anyInt())).
|
||||
thenReturn(new Result[] { r }, (Result[]) null);
|
||||
// If a get, return the above result too for REGIONINFO
|
||||
Mockito.when(ri.get((byte[]) Mockito.any(), (Get) Mockito.any())).
|
||||
thenReturn(r);
|
||||
// Get a connection w/ mocked up common methods.
|
||||
HConnection connection = HConnectionTestingUtility.
|
||||
getMockedConnectionAndDecorate(HTU.getConfiguration(), ri, SERVERNAME_B,
|
||||
REGIONINFO);
|
||||
// Make it so we can get the connection from our mocked catalogtracker
|
||||
Mockito.when(ct.getConnection()).thenReturn(connection);
|
||||
// Create and startup an executor. Used by AM handling zk callbacks.
|
||||
ExecutorService executor = startupMasterExecutor("mockedAMExecutor");
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
new AssignmentManagerWithExtrasForTesting(server, manager, ct, executor);
|
||||
return am;
|
||||
}
|
||||
|
||||
/**
|
||||
* An {@link AssignmentManager} with some extra facility used testing
|
||||
*/
|
||||
class AssignmentManagerWithExtrasForTesting extends AssignmentManager {
|
||||
// Keep a reference so can give it out below in {@link #getExecutorService}
|
||||
private final ExecutorService es;
|
||||
// Ditto for ct
|
||||
private final CatalogTracker ct;
|
||||
boolean processRITInvoked = false;
|
||||
AtomicBoolean gate = new AtomicBoolean(true);
|
||||
|
||||
public AssignmentManagerWithExtrasForTesting(final Server master,
|
||||
final ServerManager serverManager,
|
||||
final CatalogTracker catalogTracker, final ExecutorService service)
|
||||
throws KeeperException, IOException {
|
||||
super(master, serverManager, catalogTracker, service);
|
||||
this.es = service;
|
||||
this.ct = catalogTracker;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean processRegionInTransition(String encodedRegionName,
|
||||
HRegionInfo regionInfo,
|
||||
Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
|
||||
throws KeeperException, IOException {
|
||||
this.processRITInvoked = true;
|
||||
return super.processRegionInTransition(encodedRegionName, regionInfo,
|
||||
deadServers);
|
||||
}
|
||||
@Override
|
||||
void processRegionsInTransition(final RegionTransitionData data,
|
||||
final HRegionInfo regionInfo,
|
||||
final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
|
||||
final int expectedVersion) throws KeeperException {
|
||||
while (this.gate.get()) Threads.sleep(1);
|
||||
super.processRegionsInTransition(data, regionInfo, deadServers, expectedVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ExecutorService used by this instance.
|
||||
*/
|
||||
ExecutorService getExecutorService() {
|
||||
return this.es;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return CatalogTracker used by this AM (Its a mock).
|
||||
*/
|
||||
CatalogTracker getCatalogTracker() {
|
||||
return this.ct;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call joinCluster on the passed AssignmentManager. Do it in a thread
|
||||
* so it runs independent of what all else is going on. Try to simulate
|
||||
* an AM running insided a failed over master by clearing all in-memory
|
||||
* AM state first.
|
||||
*/
|
||||
private void startFakeFailedOverMasterAssignmentManager(final AssignmentManager am,
|
||||
final ZooKeeperWatcher watcher) {
|
||||
// Make sure our new AM gets callbacks; once registered, we can't unregister.
|
||||
// Thats ok because we make a new zk watcher for each test.
|
||||
watcher.registerListenerFirst(am);
|
||||
Thread t = new Thread("RunAmJoinCluster") {
|
||||
public void run() {
|
||||
// Call the joinCluster function as though we were doing a master
|
||||
// failover at this point. It will stall just before we go to add
|
||||
// the RIT region to our RIT Map in AM at processRegionsInTransition.
|
||||
// First clear any inmemory state from AM so it acts like a new master
|
||||
// coming on line.
|
||||
am.regionsInTransition.clear();
|
||||
am.regionPlans.clear();
|
||||
try {
|
||||
am.joinCluster();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (KeeperException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
};
|
||||
t.start();
|
||||
while (!t.isAlive()) Threads.sleep(1);
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
|
|
Loading…
Reference in New Issue