diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java index c1723e2736e..5a29a5ff985 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java @@ -18,294 +18,278 @@ package org.apache.hadoop.hbase; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; /** * Test the draining servers feature. - * - * This is typically an integration test: a unit test would be to check that the - * master does no assign regions to a regionserver marked as drained. - * - * @see HBASE-4298 */ @Category(MediumTests.class) public class TestDrainingServer { private static final Log LOG = LogFactory.getLog(TestDrainingServer.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final int NB_SLAVES = 5; - private static final int COUNT_OF_REGIONS = NB_SLAVES * 2; - - /** - * Spin up a cluster with a bunch of regions on it. - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(NB_SLAVES); - TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); - TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); - - final List families = new ArrayList(1); - families.add("family"); - TEST_UTIL.createRandomTable("table", families, 1, 0, 0, COUNT_OF_REGIONS, 0); - // Ensure a stable env - TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); - - boolean ready = false; - while (!ready){ - waitForAllRegionsOnline(); - - // Assert that every regionserver has some regions on it. - int i = 0; - ready = true; - while (i < NB_SLAVES && ready){ - HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i); - if (ProtobufUtil.getOnlineRegions(hrs).isEmpty()){ - ready = false; - } - i++; - } - - if (!ready){ - TEST_UTIL.getHBaseAdmin().setBalancerRunning(true, true); - Assert.assertTrue("Can't start a balance!", TEST_UTIL.getHBaseAdmin().balancer()); - TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); - Thread.sleep(100); - } + private Abortable abortable = new Abortable() { + @Override + public boolean isAborted() { + return false; } - } - private static HRegionServer setDrainingServer(final HRegionServer hrs) - throws KeeperException { - LOG.info("Making " + hrs.getServerName() + " the draining server; " + - "it has " + hrs.getNumberOfOnlineRegions() + " online regions"); - ZooKeeperWatcher zkw = hrs.getZooKeeper(); - String hrsDrainingZnode = - ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString()); - ZKUtil.createWithParents(zkw, hrsDrainingZnode); - return hrs; - } - - private static HRegionServer unsetDrainingServer(final HRegionServer hrs) - throws KeeperException { - ZooKeeperWatcher zkw = hrs.getZooKeeper(); - String hrsDrainingZnode = - ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString()); - ZKUtil.deleteNode(zkw, hrsDrainingZnode); - return hrs; - } + @Override + public void abort(String why, Throwable e) { + } + }; @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); } - /** - * Test adding server to draining servers and then move regions off it. - * Make sure that no regions are moved back to the draining server. - * @throws IOException - * @throws KeeperException - */ - @Test // (timeout=30000) - public void testDrainingServerOffloading() - throws Exception { - // I need master in the below. - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - HRegionInfo hriToMoveBack = null; - // Set first server as draining server. - HRegionServer drainingServer = - setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)); - try { - final int regionsOnDrainingServer = - drainingServer.getNumberOfOnlineRegions(); - Assert.assertTrue(regionsOnDrainingServer > 0); - List hris = ProtobufUtil.getOnlineRegions(drainingServer); - for (HRegionInfo hri : hris) { - // Pass null and AssignmentManager will chose a random server BUT it - // should exclude draining servers. - master.moveRegion(null, - RequestConverter.buildMoveRegionRequest(hri.getEncodedNameAsBytes(), null)); - // Save off region to move back. - hriToMoveBack = hri; - } - // Wait for regions to come back on line again. - waitForAllRegionsOnline(); - Assert.assertEquals(0, drainingServer.getNumberOfOnlineRegions()); - } finally { - unsetDrainingServer(drainingServer); + @Test + public void testAssignmentManagerDoesntUseDrainingServer() throws Exception { + AssignmentManager am; + Configuration conf = TEST_UTIL.getConfiguration(); + final HMaster master = Mockito.mock(HMaster.class); + final Server server = Mockito.mock(Server.class); + final ServerManager serverManager = Mockito.mock(ServerManager.class); + final ServerName SERVERNAME_A = new ServerName("mockserver_a.org", 1000, 8000); + final ServerName SERVERNAME_B = new ServerName("mockserver_b.org", 1001, 8000); + LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf); + CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class); + final HRegionInfo REGIONINFO = new HRegionInfo(TableName.valueOf("table_test"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); + + ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "zkWatcher-Test", abortable, true); + + Map onlineServers = new HashMap(); + + onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); + onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); + + Mockito.when(server.getConfiguration()).thenReturn(conf); + Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1")); + Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); + + Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); + Mockito.when(serverManager.getOnlineServersList()) + .thenReturn(new ArrayList(onlineServers.keySet())); + + Mockito.when(serverManager.createDestinationServersList()) + .thenReturn(new ArrayList(onlineServers.keySet())); + Mockito.when(serverManager.createDestinationServersList(null)) + .thenReturn(new ArrayList(onlineServers.keySet())); + + for (ServerName sn : onlineServers.keySet()) { + Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true); + Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true); + Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true); + Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList())) + .thenReturn(RegionOpeningState.OPENED); + Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null)) + .thenReturn(RegionOpeningState.OPENED); + Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true); } - // Now we've unset the draining server, we should be able to move a region - // to what was the draining server. - master.moveRegion(null, - RequestConverter.buildMoveRegionRequest(hriToMoveBack.getEncodedNameAsBytes(), - Bytes.toBytes(drainingServer.getServerName().toString()))); - // Wait for regions to come back on line again. - waitForAllRegionsOnline(); - Assert.assertEquals(1, drainingServer.getNumberOfOnlineRegions()); + + Mockito.when(master.getServerManager()).thenReturn(serverManager); + + am = new AssignmentManager(server, serverManager, catalogTracker, + balancer, startupMasterExecutor("mockExecutorService"), null, null); + + Mockito.when(master.getAssignmentManager()).thenReturn(am); + Mockito.when(master.getZooKeeperWatcher()).thenReturn(zkWatcher); + Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher); + + am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A)); + + zkWatcher.registerListenerFirst(am); + + addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager); + + am.assign(REGIONINFO, true); + + setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO); + setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO); + + am.waitForAssignment(REGIONINFO); + + assertTrue(am.getRegionStates().isRegionAssigned(REGIONINFO)); + assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A); } - /** - * Test that draining servers are ignored even after killing regionserver(s). - * Verify that the draining server is not given any of the dead servers regions. - * @throws KeeperException - * @throws IOException - */ - @Test (timeout=30000) - public void testDrainingServerWithAbort() throws KeeperException, Exception { - HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + @Test + public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf); + CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class); + AssignmentManager am; + final HMaster master = Mockito.mock(HMaster.class); + final Server server = Mockito.mock(Server.class); + final ServerManager serverManager = Mockito.mock(ServerManager.class); + final ServerName SERVERNAME_A = new ServerName("mockserverbulk_a.org", 1000, 8000); + final ServerName SERVERNAME_B = new ServerName("mockserverbulk_b.org", 1001, 8000); + final ServerName SERVERNAME_C = new ServerName("mockserverbulk_c.org", 1002, 8000); + final ServerName SERVERNAME_D = new ServerName("mockserverbulk_d.org", 1003, 8000); + final ServerName SERVERNAME_E = new ServerName("mockserverbulk_e.org", 1004, 8000); + final Map bulk = new HashMap(); - waitForAllRegionsOnline(); + Set bunchServersAssigned = new HashSet(); + + HRegionInfo REGIONINFO_A = new HRegionInfo(TableName.valueOf("table_A"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); + HRegionInfo REGIONINFO_B = new HRegionInfo(TableName.valueOf("table_B"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); + HRegionInfo REGIONINFO_C = new HRegionInfo(TableName.valueOf("table_C"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); + HRegionInfo REGIONINFO_D = new HRegionInfo(TableName.valueOf("table_D"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); + HRegionInfo REGIONINFO_E = new HRegionInfo(TableName.valueOf("table_E"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); - final long regionCount = TEST_UTIL.getMiniHBaseCluster().countServedRegions(); + Map onlineServers = new HashMap(); + List drainedServers = new ArrayList(); - // Let's get a copy of the regions today. - Collection regions = getRegions(); - LOG.info("All regions: " + regions); + onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); + onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); + onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD); + onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD); + onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD); - // Choose the draining server - HRegionServer drainingServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); - final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions(); - Assert.assertTrue(regionsOnDrainingServer > 0); + bulk.put(REGIONINFO_A, SERVERNAME_A); + bulk.put(REGIONINFO_B, SERVERNAME_B); + bulk.put(REGIONINFO_C, SERVERNAME_C); + bulk.put(REGIONINFO_D, SERVERNAME_D); + bulk.put(REGIONINFO_E, SERVERNAME_E); - ServerManager sm = master.getServerManager(); + ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "zkWatcher-BulkAssignTest", abortable, true); - Collection regionsBefore = drainingServer.getOnlineRegionsLocalContext(); - LOG.info("Regions of drained server are: "+ regionsBefore ); + Mockito.when(server.getConfiguration()).thenReturn(conf); + Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1")); + Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); - try { - // Add first server to draining servers up in zk. - setDrainingServer(drainingServer); + Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); + Mockito.when(serverManager.getOnlineServersList()).thenReturn( + new ArrayList(onlineServers.keySet())); + + Mockito.when(serverManager.createDestinationServersList()).thenReturn( + new ArrayList(onlineServers.keySet())); + Mockito.when(serverManager.createDestinationServersList(null)).thenReturn( + new ArrayList(onlineServers.keySet())); + + for (Entry entry : bulk.entrySet()) { + Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true); + Mockito.when(serverManager.sendRegionClose(entry.getValue(), + entry.getKey(), -1)).thenReturn(true); + Mockito.when(serverManager.sendRegionOpen(entry.getValue(), + entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED); + Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true); + } + + Mockito.when(master.getServerManager()).thenReturn(serverManager); - //wait for the master to receive and manage the event - while (sm.createDestinationServersList().contains(drainingServer.getServerName())) { - Thread.sleep(1); - } + drainedServers.add(SERVERNAME_A); + drainedServers.add(SERVERNAME_B); + drainedServers.add(SERVERNAME_C); + drainedServers.add(SERVERNAME_D); - LOG.info("The available servers are: "+ sm.createDestinationServersList()); + am = new AssignmentManager(server, serverManager, catalogTracker, + balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null); + + Mockito.when(master.getAssignmentManager()).thenReturn(am); - Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer, - drainingServer.getNumberOfOnlineRegions()); - Assert.assertFalse("We should not have regions in transition here. List is: " + - master.getAssignmentManager().getRegionStates().getRegionsInTransition(), - master.getAssignmentManager().getRegionStates().isRegionsInTransition()); + zkWatcher.registerListener(am); + + for (ServerName drained : drainedServers) { + addServerToDrainedList(drained, onlineServers, serverManager); + } + + am.assign(bulk); - // Kill a few regionservers. - for (int aborted = 0; aborted <= 2; aborted++) { - HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(aborted + 1); - hrs.abort("Aborting"); - } - - // Wait for regions to come back online again. waitForAllRegionsOnline can come back before - // we've assigned out regions on the cluster so retry if we are shy the wanted number - Collection regionsAfter = null; - for (int i = 0; i < 1000; i++) { - waitForAllRegionsOnline(); - regionsAfter = getRegions(); - if (regionsAfter.size() >= regionCount) break; - LOG.info("Expecting " + regionCount + " but only " + regionsAfter); - Threads.sleep(10); - } - LOG.info("Regions of drained server: " + regionsAfter + ", all regions: " + getRegions()); - Assert.assertEquals("Test conditions are not met: regions were" + - " created/deleted during the test. ", - regionCount, TEST_UTIL.getMiniHBaseCluster().countServedRegions()); - - // Assert the draining server still has the same regions. - regionsAfter = drainingServer.getOnlineRegionsLocalContext(); - StringBuilder result = new StringBuilder(); - for (HRegion r: regionsAfter){ - if (!regionsBefore.contains(r)){ - result.append(r).append(" was added after the drain"); - if (regions.contains(r)){ - result.append("(existing region"); - } else { - result.append("(new region)"); - } - result.append("; "); - } - } - for (HRegion r: regionsBefore){ - if (!regionsAfter.contains(r)){ - result.append(r).append(" was removed after the drain; "); - } - } - Assert.assertTrue("Errors are: "+ result.toString(), result.length()==0); - - } finally { - unsetDrainingServer(drainingServer); + Map regionsInTransition = am.getRegionStates().getRegionsInTransition(); + for (Entry entry : regionsInTransition.entrySet()) { + setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(), + entry.getValue().getRegion()); + } + + am.waitForAssignment(REGIONINFO_A); + am.waitForAssignment(REGIONINFO_B); + am.waitForAssignment(REGIONINFO_C); + am.waitForAssignment(REGIONINFO_D); + am.waitForAssignment(REGIONINFO_E); + + Map regionAssignments = am.getRegionStates().getRegionAssignments(); + for (Entry entry : regionAssignments.entrySet()) { + LOG.info("Region Assignment: " + + entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue()); + bunchServersAssigned.add(entry.getValue()); + } + + for (ServerName sn : drainedServers) { + assertFalse(bunchServersAssigned.contains(sn)); } } - private Collection getRegions() { - Collection regions = new ArrayList(); - List rsthreads = - TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); - for (RegionServerThread t: rsthreads) { - HRegionServer rs = t.getRegionServer(); - Collection lr = rs.getOnlineRegionsLocalContext(); - LOG.info("Found " + lr + " on " + rs); - regions.addAll(lr); - } - return regions; + private void addServerToDrainedList(ServerName serverName, + Map onlineServers, ServerManager serverManager) { + onlineServers.remove(serverName); + List availableServers = new ArrayList(onlineServers.keySet()); + Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers); + Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers); } - private static void waitForAllRegionsOnline() throws Exception { - // Wait for regions to come back on line again. - boolean done = false; - while (!done) { - Thread.sleep(1); - - // Nothing in ZK RIT for a start - ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher()); - - // Then we want all the regions to be marked as available... - if (!isAllRegionsOnline()) continue; - - // And without any work in progress on the master side - if (TEST_UTIL.getMiniHBaseCluster().getMaster(). - getAssignmentManager().getRegionStates().isRegionsInTransition()) continue; - - // nor on the region server side - done = true; - for (JVMClusterUtil.RegionServerThread rs : - TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - if (!rs.getRegionServer().getRegionsInTransitionInRS().isEmpty()) { - done = false; - } - // Sleep some else we spam the log w/ notice that servers are not yet alive. - Threads.sleep(10); - } - } + private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName, + HRegionInfo hregionInfo) throws Exception { + int version = ZKAssign.getVersion(zkWatcher, hregionInfo); + int versionTransition = ZKAssign.transitionNode(zkWatcher, + hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, version); + ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition); } - private static boolean isAllRegionsOnline() { - return TEST_UTIL.getMiniHBaseCluster().countServedRegions() >= - (COUNT_OF_REGIONS + 2 /*catalog and namespace regions*/); + private ExecutorService startupMasterExecutor(final String name) { + ExecutorService executor = new ExecutorService(name); + executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3); + executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3); + executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3); + executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3); + return executor; } } \ No newline at end of file