diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 6748e5c8726..e7f237fa804 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -170,7 +170,17 @@ public class AssignmentManager extends ZooKeeperListener { private List ignoreStatesRSOffline = Arrays.asList(new EventType[]{ EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED }); - + + /** + * Set when we are doing master failover processing; cleared when failover + * completes. + */ + private volatile boolean failover = false; + + // Set holding all the regions which got processed while RIT was not + // populated during master failover. + private Map failoverProcessedRegions = + new HashMap(); /** * Constructs a new assignment manager. @@ -321,8 +331,7 @@ public class AssignmentManager extends ZooKeeperListener { // Scan META to build list of existing regions, servers, and assignment // Returns servers who have not checked in (assumed dead) and their regions - Map>> deadServers = - rebuildUserRegions(); + Map>> deadServers = rebuildUserRegions(); processDeadServersAndRegionsInTransition(deadServers); @@ -362,29 +371,29 @@ public class AssignmentManager extends ZooKeeperListener { watcher.assignmentZNode); // Run through all regions. If they are not assigned and not in RIT, then // its a clean cluster startup, else its a failover. - boolean regionsToProcess = false; for (Map.Entry e: this.regions.entrySet()) { if (!e.getKey().isMetaTable() && e.getValue() != null) { LOG.debug("Found " + e + " out on cluster"); - regionsToProcess = true; + this.failover = true; break; } if (nodes.contains(e.getKey().getEncodedName())) { LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs"); // Could be a meta region. - regionsToProcess = true; + this.failover = true; break; } } // If we found user regions out on cluster, its a failover. - if (regionsToProcess) { + if (this.failover) { LOG.info("Found regions out on cluster or in RIT; failover"); // Process list of dead servers and regions in RIT. // See HBASE-4580 for more information. processDeadServersAndRecoverLostRegions(deadServers, nodes); - + this.failover = false; + failoverProcessedRegions.clear(); } else { // Fresh cluster startup. LOG.info("Clean cluster startup. Assigning userregions"); @@ -440,10 +449,7 @@ public class AssignmentManager extends ZooKeeperListener { if (data == null) return false; HRegionInfo hri = regionInfo; if (hri == null) { - Pair p = - MetaReader.getRegion(catalogTracker, data.getRegionName()); - if (p == null) return false; - hri = p.getFirst(); + if ((hri = getHRegionInfo(data)) == null) return false; } processRegionsInTransition(data, hri, deadServers, stat.getVersion()); return true; @@ -458,6 +464,12 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + data.getEventType()); synchronized (regionsInTransition) { + RegionState regionState = regionsInTransition.get(encodedRegionName); + if (regionState != null || + failoverProcessedRegions.containsKey(encodedRegionName)) { + // Just return + return; + } switch (data.getEventType()) { case M_ZK_REGION_CLOSING: // If zk node of the region was updated by a live server skip this @@ -474,17 +486,35 @@ public class AssignmentManager extends ZooKeeperListener { regionInfo, RegionState.State.CLOSING, data.getStamp(), data.getOrigin())); } + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_CLOSED: case RS_ZK_REGION_FAILED_OPEN: // Region is closed, insert into RIT and handle it addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data); + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case M_ZK_REGION_OFFLINE: - // Region is offline, insert into RIT and handle it like a closed - addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + // If zk node of the region was updated by a live server skip this + // region and just add it into RIT. + if (isOnDeadServer(regionInfo, deadServers) && + (data.getOrigin() == null || + !serverManager.isServerOnline(data.getOrigin()))) { + // Region is offline, insert into RIT and handle it like a closed + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + } else if (data.getOrigin() != null && + !serverManager.isServerOnline(data.getOrigin())) { + // to handle cases where offline node is created but sendRegionOpen + // RPC is not yet sent + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + } else { + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data + .getOrigin())); + } + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_OPENING: @@ -507,6 +537,7 @@ public class AssignmentManager extends ZooKeeperListener { } regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, RegionState.State.OPENING, data.getStamp(), data.getOrigin())); + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_OPENED: @@ -530,10 +561,12 @@ public class AssignmentManager extends ZooKeeperListener { new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion) .process(); } + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; } } } + /** * Put the region hri into an offline state up in zk. @@ -609,6 +642,7 @@ public class AssignmentManager extends ZooKeeperListener { */ private void handleRegion(final RegionTransitionData data, int expectedVersion) { synchronized(regionsInTransition) { + HRegionInfo hri = null; if (data == null || data.getOrigin() == null) { LOG.warn("Unexpected NULL input " + data); return; @@ -686,6 +720,14 @@ public class AssignmentManager extends ZooKeeperListener { break; case M_ZK_REGION_CLOSING: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSING, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see CLOSING after we have asked it to CLOSE or additional // times after already being in state of CLOSING if (regionState == null || @@ -702,6 +744,17 @@ public class AssignmentManager extends ZooKeeperListener { break; case RS_ZK_REGION_CLOSED: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSED, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + removeClosedRegion(regionState.getRegion()); + new ClosedRegionHandler(master, this, regionState.getRegion()) + .process(); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see CLOSED after CLOSING but possible after PENDING_CLOSE if (regionState == null || (!regionState.isPendingClose() && !regionState.isClosing())) { @@ -716,12 +769,22 @@ public class AssignmentManager extends ZooKeeperListener { // what follows will fail because not in expected state. regionState.update(RegionState.State.CLOSED, data.getStamp(), data.getOrigin()); - removeClosedRegion(regionState.getRegion()); + removeClosedRegion(regionState.getRegion()); this.executorService.submit(new ClosedRegionHandler(master, this, regionState.getRegion())); break; case RS_ZK_REGION_FAILED_OPEN: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSED, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + new ClosedRegionHandler(master, this, regionState.getRegion()) + .process(); + failoverProcessedRegions.put(encodedName, hri); + break; + } if (regionState == null || (!regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName + @@ -737,6 +800,14 @@ public class AssignmentManager extends ZooKeeperListener { break; case RS_ZK_REGION_OPENING: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.OPENING, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see OPENING after we have asked it to OPEN or additional // times after already being in state of OPENING if (regionState == null || @@ -754,6 +825,16 @@ public class AssignmentManager extends ZooKeeperListener { break; case RS_ZK_REGION_OPENED: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.OPEN, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + new OpenedRegionHandler(master, this, regionState.getRegion(), data + .getOrigin(), expectedVersion).process(); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see OPENED after OPENING but possible after PENDING_OPEN if (regionState == null || (!regionState.isPendingOpen() && !regionState.isOpening())) { @@ -775,6 +856,44 @@ public class AssignmentManager extends ZooKeeperListener { } } + /** + * Checks whether the callback came while RIT was not yet populated during + * master failover. + * @param regionState + * @param encodedName + * @param data + * @return hri + */ + private HRegionInfo checkIfInFailover(RegionState regionState, + String encodedName, RegionTransitionData data) { + if (regionState == null && this.failover && + (failoverProcessedRegions.containsKey(encodedName) == false || + failoverProcessedRegions.get(encodedName) == null)) { + HRegionInfo hri = this.failoverProcessedRegions.get(encodedName); + if (hri == null) hri = getHRegionInfo(data); + return hri; + } + return null; + } + + /** + * Gets the HRegionInfo from the META table + * @param data + * @return HRegionInfo hri for the region + */ + private HRegionInfo getHRegionInfo(RegionTransitionData data) { + Pair p = null; + try { + p = MetaReader.getRegion(catalogTracker, data.getRegionName()); + if (p == null) return null; + return p.getFirst(); + } catch (IOException e) { + master.abort("Aborting because error occoured while reading " + + data.getRegionName() + " from .META.", e); + return null; + } + } + /** * @return Returns true if this RegionState is splittable; i.e. the * RegionState is currently in splitting state or pending_close or diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index a067bed195b..d68ce332ac6 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -59,6 +59,10 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.client.Get; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** @@ -127,10 +131,162 @@ public class TestAssignmentManager { } @After - public void after() { - if (this.watcher != null) this.watcher.close(); + public void after() throws KeeperException { + if (this.watcher != null) { + // Clean up all znodes + ZKAssign.deleteAllNodes(this.watcher); + this.watcher.close(); + } } + /** + * Test a balance going on at same time as a master failover + * + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test(timeout = 5000) + public void testBalanceOnMasterFailoverScenarioWithOpenedNode() + throws IOException, KeeperException, InterruptedException { + AssignmentManagerWithExtrasForTesting am = + setUpMockedAssignmentManager(this.server, this.serverManager); + try { + createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO); + startFakeFailedOverMasterAssignmentManager(am, this.watcher); + while (!am.processRITInvoked) Thread.sleep(1); + // Now fake the region closing successfully over on the regionserver; the + // regionserver will have set the region in CLOSED state. This will + // trigger callback into AM. The below zk close call is from the RS close + // region handler duplicated here because its down deep in a private + // method hard to expose. + int versionid = + ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); + assertNotSame(versionid, -1); + while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, + EventType.M_ZK_REGION_OFFLINE)) { + Threads.sleep(1); + } + // Get current versionid else will fail on transition from OFFLINE to + // OPENING below + versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + assertNotSame(-1, versionid); + // This uglyness below is what the openregionhandler on RS side does. + versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, + SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionid); + assertNotSame(-1, versionid); + // Move znode from OPENING to OPENED as RS does on successful open. + versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, + SERVERNAME_B, versionid); + assertNotSame(-1, versionid); + am.gate.set(false); + // Block here until our znode is cleared or until this test times out. + ZKAssign.blockUntilNoRIT(watcher); + } finally { + am.getExecutorService().shutdown(); + am.shutdown(); + } + } + + @Test(timeout = 5000) + public void testBalanceOnMasterFailoverScenarioWithClosedNode() + throws IOException, KeeperException, InterruptedException { + AssignmentManagerWithExtrasForTesting am = + setUpMockedAssignmentManager(this.server, this.serverManager); + try { + createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO); + startFakeFailedOverMasterAssignmentManager(am, this.watcher); + while (!am.processRITInvoked) Thread.sleep(1); + // Now fake the region closing successfully over on the regionserver; the + // regionserver will have set the region in CLOSED state. This will + // trigger callback into AM. The below zk close call is from the RS close + // region handler duplicated here because its down deep in a private + // method hard to expose. + int versionid = + ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); + assertNotSame(versionid, -1); + am.gate.set(false); + while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, + EventType.M_ZK_REGION_OFFLINE)) { + Threads.sleep(1); + } + // Get current versionid else will fail on transition from OFFLINE to + // OPENING below + versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + assertNotSame(-1, versionid); + // This uglyness below is what the openregionhandler on RS side does. + versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, + SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionid); + assertNotSame(-1, versionid); + // Move znode from OPENING to OPENED as RS does on successful open. + versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, + SERVERNAME_B, versionid); + assertNotSame(-1, versionid); + + // Block here until our znode is cleared or until this test timesout. + ZKAssign.blockUntilNoRIT(watcher); + } finally { + am.getExecutorService().shutdown(); + am.shutdown(); + } + } + + @Test(timeout = 5000) + public void testBalanceOnMasterFailoverScenarioWithOfflineNode() + throws IOException, KeeperException, InterruptedException { + AssignmentManagerWithExtrasForTesting am = + setUpMockedAssignmentManager(this.server, this.serverManager); + try { + createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO); + startFakeFailedOverMasterAssignmentManager(am, this.watcher); + while (!am.processRITInvoked) Thread.sleep(1); + // Now fake the region closing successfully over on the regionserver; the + // regionserver will have set the region in CLOSED state. This will + // trigger callback into AM. The below zk close call is from the RS close + // region handler duplicated here because its down deep in a private + // method hard to expose. + int versionid = + ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); + assertNotSame(versionid, -1); + while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, + EventType.M_ZK_REGION_OFFLINE)) { + Threads.sleep(1); + } + am.gate.set(false); + // Get current versionid else will fail on transition from OFFLINE to + // OPENING below + versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + assertNotSame(-1, versionid); + // This uglyness below is what the openregionhandler on RS side does. + versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, + SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionid); + assertNotSame(-1, versionid); + // Move znode from OPENING to OPENED as RS does on successful open. + versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, + SERVERNAME_B, versionid); + assertNotSame(-1, versionid); + // Block here until our znode is cleared or until this test timesout. + ZKAssign.blockUntilNoRIT(watcher); + } finally { + am.getExecutorService().shutdown(); + am.shutdown(); + } + } + + private void createRegionPlanAndBalance(final AssignmentManager am, + final ServerName from, final ServerName to, final HRegionInfo hri) { + // Call the balance function but fake the region being online first at + // servername from. + am.regionOnline(hri, from); + // Balance region from 'from' to 'to'. It calls unassign setting CLOSING state + // up in zk. Create a plan and balance + am.balance(new RegionPlan(hri, from, to)); + } + + /** * Tests AssignmentManager balance function. Runs a balance moving a region * from one server to another mocking regionserver responding over zk. @@ -386,6 +542,137 @@ public class TestAssignmentManager { am.unassign(hri); } + /** + * Create an {@link AssignmentManagerWithExtrasForTesting} that has mocked + * {@link CatalogTracker} etc. + * @param server + * @param manager + * @return An AssignmentManagerWithExtras with mock connections, etc. + * @throws IOException + * @throws KeeperException + */ + private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(final Server server, + final ServerManager manager) + throws IOException, KeeperException { + // We need a mocked catalog tracker. Its used by our AM instance. + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + // Make an RS Interface implementation. Make it so a scanner can go against + // it and a get to return the single region, REGIONINFO, this test is + // messing with. Needed when "new master" joins cluster. AM will try and + // rebuild its list of user regions and it will also get the HRI that goes + // with an encoded name by doing a Get on .META. + HRegionInterface ri = Mockito.mock(HRegionInterface.class); + // Get a meta row result that has region up on SERVERNAME_A for REGIONINFO + Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + Mockito.when(ri .openScanner((byte[]) Mockito.any(), (Scan) Mockito.any())). + thenReturn(System.currentTimeMillis()); + // Return good result 'r' first and then return null to indicate end of scan + Mockito.when(ri.next(Mockito.anyLong(), Mockito.anyInt())). + thenReturn(new Result[] { r }, (Result[]) null); + // If a get, return the above result too for REGIONINFO + Mockito.when(ri.get((byte[]) Mockito.any(), (Get) Mockito.any())). + thenReturn(r); + // Get a connection w/ mocked up common methods. + HConnection connection = HConnectionTestingUtility. + getMockedConnectionAndDecorate(HTU.getConfiguration(), ri, SERVERNAME_B, + REGIONINFO); + // Make it so we can get the connection from our mocked catalogtracker + Mockito.when(ct.getConnection()).thenReturn(connection); + // Create and startup an executor. Used by AM handling zk callbacks. + ExecutorService executor = startupMasterExecutor("mockedAMExecutor"); + AssignmentManagerWithExtrasForTesting am = + new AssignmentManagerWithExtrasForTesting(server, manager, ct, executor); + return am; + } + + /** + * An {@link AssignmentManager} with some extra facility used testing + */ + class AssignmentManagerWithExtrasForTesting extends AssignmentManager { + // Keep a reference so can give it out below in {@link #getExecutorService} + private final ExecutorService es; + // Ditto for ct + private final CatalogTracker ct; + boolean processRITInvoked = false; + AtomicBoolean gate = new AtomicBoolean(true); + + public AssignmentManagerWithExtrasForTesting(final Server master, + final ServerManager serverManager, + final CatalogTracker catalogTracker, final ExecutorService service) + throws KeeperException, IOException { + super(master, serverManager, catalogTracker, service); + this.es = service; + this.ct = catalogTracker; + } + + @Override + boolean processRegionInTransition(String encodedRegionName, + HRegionInfo regionInfo, + Map>> deadServers) + throws KeeperException, IOException { + this.processRITInvoked = true; + return super.processRegionInTransition(encodedRegionName, regionInfo, + deadServers); + } + @Override + void processRegionsInTransition(final RegionTransitionData data, + final HRegionInfo regionInfo, + final Map>> deadServers, + final int expectedVersion) throws KeeperException { + while (this.gate.get()) Threads.sleep(1); + super.processRegionsInTransition(data, regionInfo, deadServers, expectedVersion); + } + + /** + * @return ExecutorService used by this instance. + */ + ExecutorService getExecutorService() { + return this.es; + } + + /** + * @return CatalogTracker used by this AM (Its a mock). + */ + CatalogTracker getCatalogTracker() { + return this.ct; + } + } + + /** + * Call joinCluster on the passed AssignmentManager. Do it in a thread + * so it runs independent of what all else is going on. Try to simulate + * an AM running insided a failed over master by clearing all in-memory + * AM state first. + */ + private void startFakeFailedOverMasterAssignmentManager(final AssignmentManager am, + final ZooKeeperWatcher watcher) { + // Make sure our new AM gets callbacks; once registered, we can't unregister. + // Thats ok because we make a new zk watcher for each test. + watcher.registerListenerFirst(am); + Thread t = new Thread("RunAmJoinCluster") { + public void run() { + // Call the joinCluster function as though we were doing a master + // failover at this point. It will stall just before we go to add + // the RIT region to our RIT Map in AM at processRegionsInTransition. + // First clear any inmemory state from AM so it acts like a new master + // coming on line. + am.regionsInTransition.clear(); + am.regionPlans.clear(); + try { + am.joinCluster(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (KeeperException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + }; + t.start(); + while (!t.isAlive()) Threads.sleep(1); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();