HBASE-7462 TestDrainingServer is an integration test. It should be a unit test instead (Gustavo Anatoly)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1524995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-09-20 13:22:17 +00:00
parent 1eb261ab42
commit 015df40389
1 changed files with 227 additions and 243 deletions

View File

@ -18,294 +18,278 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
* Test the draining servers feature.
*
* This is typically an integration test: a unit test would be to check that the
* master does no assign regions to a regionserver marked as drained.
*
* @see <a href="https://issues.apache.org/jira/browse/HBASE-4298">HBASE-4298</a>
*/
@Category(MediumTests.class)
public class TestDrainingServer {
private static final Log LOG = LogFactory.getLog(TestDrainingServer.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int NB_SLAVES = 5;
private static final int COUNT_OF_REGIONS = NB_SLAVES * 2;
/**
* Spin up a cluster with a bunch of regions on it.
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(NB_SLAVES);
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
final List<String> families = new ArrayList<String>(1);
families.add("family");
TEST_UTIL.createRandomTable("table", families, 1, 0, 0, COUNT_OF_REGIONS, 0);
// Ensure a stable env
TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
boolean ready = false;
while (!ready){
waitForAllRegionsOnline();
// Assert that every regionserver has some regions on it.
int i = 0;
ready = true;
while (i < NB_SLAVES && ready){
HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i);
if (ProtobufUtil.getOnlineRegions(hrs).isEmpty()){
ready = false;
}
i++;
}
if (!ready){
TEST_UTIL.getHBaseAdmin().setBalancerRunning(true, true);
Assert.assertTrue("Can't start a balance!", TEST_UTIL.getHBaseAdmin().balancer());
TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
Thread.sleep(100);
}
private Abortable abortable = new Abortable() {
@Override
public boolean isAborted() {
return false;
}
}
private static HRegionServer setDrainingServer(final HRegionServer hrs)
throws KeeperException {
LOG.info("Making " + hrs.getServerName() + " the draining server; " +
"it has " + hrs.getNumberOfOnlineRegions() + " online regions");
ZooKeeperWatcher zkw = hrs.getZooKeeper();
String hrsDrainingZnode =
ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString());
ZKUtil.createWithParents(zkw, hrsDrainingZnode);
return hrs;
}
private static HRegionServer unsetDrainingServer(final HRegionServer hrs)
throws KeeperException {
ZooKeeperWatcher zkw = hrs.getZooKeeper();
String hrsDrainingZnode =
ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString());
ZKUtil.deleteNode(zkw, hrsDrainingZnode);
return hrs;
}
@Override
public void abort(String why, Throwable e) {
}
};
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
/**
* Test adding server to draining servers and then move regions off it.
* Make sure that no regions are moved back to the draining server.
* @throws IOException
* @throws KeeperException
*/
@Test // (timeout=30000)
public void testDrainingServerOffloading()
throws Exception {
// I need master in the below.
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
HRegionInfo hriToMoveBack = null;
// Set first server as draining server.
HRegionServer drainingServer =
setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0));
try {
final int regionsOnDrainingServer =
drainingServer.getNumberOfOnlineRegions();
Assert.assertTrue(regionsOnDrainingServer > 0);
List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(drainingServer);
for (HRegionInfo hri : hris) {
// Pass null and AssignmentManager will chose a random server BUT it
// should exclude draining servers.
master.moveRegion(null,
RequestConverter.buildMoveRegionRequest(hri.getEncodedNameAsBytes(), null));
// Save off region to move back.
hriToMoveBack = hri;
}
// Wait for regions to come back on line again.
waitForAllRegionsOnline();
Assert.assertEquals(0, drainingServer.getNumberOfOnlineRegions());
} finally {
unsetDrainingServer(drainingServer);
@Test
public void testAssignmentManagerDoesntUseDrainingServer() throws Exception {
AssignmentManager am;
Configuration conf = TEST_UTIL.getConfiguration();
final HMaster master = Mockito.mock(HMaster.class);
final Server server = Mockito.mock(Server.class);
final ServerManager serverManager = Mockito.mock(ServerManager.class);
final ServerName SERVERNAME_A = new ServerName("mockserver_a.org", 1000, 8000);
final ServerName SERVERNAME_B = new ServerName("mockserver_b.org", 1001, 8000);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class);
final HRegionInfo REGIONINFO = new HRegionInfo(TableName.valueOf("table_test"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"zkWatcher-Test", abortable, true);
Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
Mockito.when(server.getConfiguration()).thenReturn(conf);
Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1"));
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
Mockito.when(serverManager.getOnlineServersList())
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList())
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList(null))
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
for (ServerName sn : onlineServers.keySet()) {
Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true);
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true);
Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList<ServerName>()))
.thenReturn(RegionOpeningState.OPENED);
Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null))
.thenReturn(RegionOpeningState.OPENED);
Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true);
}
// Now we've unset the draining server, we should be able to move a region
// to what was the draining server.
master.moveRegion(null,
RequestConverter.buildMoveRegionRequest(hriToMoveBack.getEncodedNameAsBytes(),
Bytes.toBytes(drainingServer.getServerName().toString())));
// Wait for regions to come back on line again.
waitForAllRegionsOnline();
Assert.assertEquals(1, drainingServer.getNumberOfOnlineRegions());
Mockito.when(master.getServerManager()).thenReturn(serverManager);
am = new AssignmentManager(server, serverManager, catalogTracker,
balancer, startupMasterExecutor("mockExecutorService"), null, null);
Mockito.when(master.getAssignmentManager()).thenReturn(am);
Mockito.when(master.getZooKeeperWatcher()).thenReturn(zkWatcher);
Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher);
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A));
zkWatcher.registerListenerFirst(am);
addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager);
am.assign(REGIONINFO, true);
setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO);
setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO);
am.waitForAssignment(REGIONINFO);
assertTrue(am.getRegionStates().isRegionAssigned(REGIONINFO));
assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A);
}
/**
* Test that draining servers are ignored even after killing regionserver(s).
* Verify that the draining server is not given any of the dead servers regions.
* @throws KeeperException
* @throws IOException
*/
@Test (timeout=30000)
public void testDrainingServerWithAbort() throws KeeperException, Exception {
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
@Test
public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class);
AssignmentManager am;
final HMaster master = Mockito.mock(HMaster.class);
final Server server = Mockito.mock(Server.class);
final ServerManager serverManager = Mockito.mock(ServerManager.class);
final ServerName SERVERNAME_A = new ServerName("mockserverbulk_a.org", 1000, 8000);
final ServerName SERVERNAME_B = new ServerName("mockserverbulk_b.org", 1001, 8000);
final ServerName SERVERNAME_C = new ServerName("mockserverbulk_c.org", 1002, 8000);
final ServerName SERVERNAME_D = new ServerName("mockserverbulk_d.org", 1003, 8000);
final ServerName SERVERNAME_E = new ServerName("mockserverbulk_e.org", 1004, 8000);
final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>();
waitForAllRegionsOnline();
Set<ServerName> bunchServersAssigned = new HashSet<ServerName>();
HRegionInfo REGIONINFO_A = new HRegionInfo(TableName.valueOf("table_A"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_B = new HRegionInfo(TableName.valueOf("table_B"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_C = new HRegionInfo(TableName.valueOf("table_C"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_D = new HRegionInfo(TableName.valueOf("table_D"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_E = new HRegionInfo(TableName.valueOf("table_E"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
final long regionCount = TEST_UTIL.getMiniHBaseCluster().countServedRegions();
Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
List<ServerName> drainedServers = new ArrayList<ServerName>();
// Let's get a copy of the regions today.
Collection<HRegion> regions = getRegions();
LOG.info("All regions: " + regions);
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD);
// Choose the draining server
HRegionServer drainingServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions();
Assert.assertTrue(regionsOnDrainingServer > 0);
bulk.put(REGIONINFO_A, SERVERNAME_A);
bulk.put(REGIONINFO_B, SERVERNAME_B);
bulk.put(REGIONINFO_C, SERVERNAME_C);
bulk.put(REGIONINFO_D, SERVERNAME_D);
bulk.put(REGIONINFO_E, SERVERNAME_E);
ServerManager sm = master.getServerManager();
ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"zkWatcher-BulkAssignTest", abortable, true);
Collection<HRegion> regionsBefore = drainingServer.getOnlineRegionsLocalContext();
LOG.info("Regions of drained server are: "+ regionsBefore );
Mockito.when(server.getConfiguration()).thenReturn(conf);
Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1"));
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
try {
// Add first server to draining servers up in zk.
setDrainingServer(drainingServer);
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
Mockito.when(serverManager.getOnlineServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
Mockito.when(serverManager.sendRegionClose(entry.getValue(),
entry.getKey(), -1)).thenReturn(true);
Mockito.when(serverManager.sendRegionOpen(entry.getValue(),
entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED);
Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true);
}
Mockito.when(master.getServerManager()).thenReturn(serverManager);
//wait for the master to receive and manage the event
while (sm.createDestinationServersList().contains(drainingServer.getServerName())) {
Thread.sleep(1);
}
drainedServers.add(SERVERNAME_A);
drainedServers.add(SERVERNAME_B);
drainedServers.add(SERVERNAME_C);
drainedServers.add(SERVERNAME_D);
LOG.info("The available servers are: "+ sm.createDestinationServersList());
am = new AssignmentManager(server, serverManager, catalogTracker,
balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null);
Mockito.when(master.getAssignmentManager()).thenReturn(am);
Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer,
drainingServer.getNumberOfOnlineRegions());
Assert.assertFalse("We should not have regions in transition here. List is: " +
master.getAssignmentManager().getRegionStates().getRegionsInTransition(),
master.getAssignmentManager().getRegionStates().isRegionsInTransition());
zkWatcher.registerListener(am);
for (ServerName drained : drainedServers) {
addServerToDrainedList(drained, onlineServers, serverManager);
}
am.assign(bulk);
// Kill a few regionservers.
for (int aborted = 0; aborted <= 2; aborted++) {
HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(aborted + 1);
hrs.abort("Aborting");
}
// Wait for regions to come back online again. waitForAllRegionsOnline can come back before
// we've assigned out regions on the cluster so retry if we are shy the wanted number
Collection<HRegion> regionsAfter = null;
for (int i = 0; i < 1000; i++) {
waitForAllRegionsOnline();
regionsAfter = getRegions();
if (regionsAfter.size() >= regionCount) break;
LOG.info("Expecting " + regionCount + " but only " + regionsAfter);
Threads.sleep(10);
}
LOG.info("Regions of drained server: " + regionsAfter + ", all regions: " + getRegions());
Assert.assertEquals("Test conditions are not met: regions were" +
" created/deleted during the test. ",
regionCount, TEST_UTIL.getMiniHBaseCluster().countServedRegions());
// Assert the draining server still has the same regions.
regionsAfter = drainingServer.getOnlineRegionsLocalContext();
StringBuilder result = new StringBuilder();
for (HRegion r: regionsAfter){
if (!regionsBefore.contains(r)){
result.append(r).append(" was added after the drain");
if (regions.contains(r)){
result.append("(existing region");
} else {
result.append("(new region)");
}
result.append("; ");
}
}
for (HRegion r: regionsBefore){
if (!regionsAfter.contains(r)){
result.append(r).append(" was removed after the drain; ");
}
}
Assert.assertTrue("Errors are: "+ result.toString(), result.length()==0);
} finally {
unsetDrainingServer(drainingServer);
Map<String, RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition();
for (Entry<String, RegionState> entry : regionsInTransition.entrySet()) {
setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(),
entry.getValue().getRegion());
}
am.waitForAssignment(REGIONINFO_A);
am.waitForAssignment(REGIONINFO_B);
am.waitForAssignment(REGIONINFO_C);
am.waitForAssignment(REGIONINFO_D);
am.waitForAssignment(REGIONINFO_E);
Map<HRegionInfo, ServerName> regionAssignments = am.getRegionStates().getRegionAssignments();
for (Entry<HRegionInfo, ServerName> entry : regionAssignments.entrySet()) {
LOG.info("Region Assignment: "
+ entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue());
bunchServersAssigned.add(entry.getValue());
}
for (ServerName sn : drainedServers) {
assertFalse(bunchServersAssigned.contains(sn));
}
}
private Collection<HRegion> getRegions() {
Collection<HRegion> regions = new ArrayList<HRegion>();
List<RegionServerThread> rsthreads =
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
for (RegionServerThread t: rsthreads) {
HRegionServer rs = t.getRegionServer();
Collection<HRegion> lr = rs.getOnlineRegionsLocalContext();
LOG.info("Found " + lr + " on " + rs);
regions.addAll(lr);
}
return regions;
private void addServerToDrainedList(ServerName serverName,
Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) {
onlineServers.remove(serverName);
List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers);
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers);
}
private static void waitForAllRegionsOnline() throws Exception {
// Wait for regions to come back on line again.
boolean done = false;
while (!done) {
Thread.sleep(1);
// Nothing in ZK RIT for a start
ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher());
// Then we want all the regions to be marked as available...
if (!isAllRegionsOnline()) continue;
// And without any work in progress on the master side
if (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition()) continue;
// nor on the region server side
done = true;
for (JVMClusterUtil.RegionServerThread rs :
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
if (!rs.getRegionServer().getRegionsInTransitionInRS().isEmpty()) {
done = false;
}
// Sleep some else we spam the log w/ notice that servers are not yet alive.
Threads.sleep(10);
}
}
private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName,
HRegionInfo hregionInfo) throws Exception {
int version = ZKAssign.getVersion(zkWatcher, hregionInfo);
int versionTransition = ZKAssign.transitionNode(zkWatcher,
hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, version);
ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition);
}
private static boolean isAllRegionsOnline() {
return TEST_UTIL.getMiniHBaseCluster().countServedRegions() >=
(COUNT_OF_REGIONS + 2 /*catalog and namespace regions*/);
private ExecutorService startupMasterExecutor(final String name) {
ExecutorService executor = new ExecutorService(name);
executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
return executor;
}
}