HBASE-5992 Generalization of region move implementation + manage draining servers in bulk assign (N Keywal)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1337641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-05-12 20:47:25 +00:00
parent 1bbed5ab7b
commit 17a5076606
8 changed files with 219 additions and 113 deletions

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
@ -583,6 +582,7 @@ public class AssignmentManager extends ZooKeeperListener {
// So we will assign the ROOT and .META. region immediately.
processOpeningState(regionInfo);
break;
}
regionsInTransition.put(encodedRegionName,
getRegionState(regionInfo, RegionState.State.OPENING, rt));
@ -1847,29 +1847,15 @@ public class AssignmentManager extends ZooKeeperListener {
final ServerName serverToExclude, final boolean forceNewPlan) {
// Pickup existing plan or make a new one
final String encodedName = state.getRegion().getEncodedName();
final List<ServerName> servers = this.serverManager.getOnlineServersList();
final List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
final List<ServerName> destServers =
serverManager.createDestinationServersList(serverToExclude);
if (serverToExclude != null) servers.remove(serverToExclude);
// Loop through the draining server list and remove them from the server
// list.
if (!drainingServers.isEmpty()) {
for (final ServerName server: drainingServers) {
LOG.debug("Removing draining server: " + server +
" from eligible server pool.");
servers.remove(server);
}
if (destServers.isEmpty()){
LOG.warn("Can't move the region " + encodedName +
", there is no destination server available.");
return null;
}
// Remove the deadNotExpired servers from the server list.
removeDeadNotExpiredServers(servers);
if (servers.isEmpty()) return null;
RegionPlan randomPlan = null;
boolean newPlan = false;
RegionPlan existingPlan = null;
@ -1886,10 +1872,10 @@ public class AssignmentManager extends ZooKeeperListener {
if (forceNewPlan
|| existingPlan == null
|| existingPlan.getDestination() == null
|| drainingServers.contains(existingPlan.getDestination())) {
|| !destServers.contains(existingPlan.getDestination())) {
newPlan = true;
randomPlan = new RegionPlan(state.getRegion(), null,
balancer.randomAssignment(state.getRegion(), servers));
balancer.randomAssignment(state.getRegion(), destServers));
this.regionPlans.put(encodedName, randomPlan);
}
}
@ -1900,7 +1886,7 @@ public class AssignmentManager extends ZooKeeperListener {
" so generated a random one; " + randomPlan + "; " +
serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() +
", available=" + servers.size() + ") available servers");
", available=" + destServers.size() + ") available servers");
return randomPlan;
}
LOG.debug("Using pre-existing plan for region " +
@ -1912,17 +1898,11 @@ public class AssignmentManager extends ZooKeeperListener {
* Loop through the deadNotExpired server list and remove them from the
* servers.
* @param servers
* @deprecated the method is now available in ServerManager - deprecated in 0.96
*/
public void removeDeadNotExpiredServers(List<ServerName> servers) {
Set<ServerName> deadNotExpiredServers = this.serverManager
.getDeadNotExpiredServers();
if (!deadNotExpiredServers.isEmpty()) {
for (ServerName server : deadNotExpiredServers) {
LOG.debug("Removing dead but not expired server: " + server
+ " from eligible server pool.");
servers.remove(server);
}
}
@Deprecated
void removeDeadNotExpiredServers(List<ServerName> servers) {
this.serverManager.removeDeadNotExpiredServers(servers);
}
/**
@ -2246,9 +2226,8 @@ public class AssignmentManager extends ZooKeeperListener {
public void assignUserRegionsToOnlineServers(List<HRegionInfo> regions)
throws IOException,
InterruptedException {
List<ServerName> servers = this.serverManager.getOnlineServersList();
removeDeadNotExpiredServers(servers);
assignUserRegions(regions, servers);
List<ServerName> destServers = serverManager.createDestinationServersList();
assignUserRegions(regions, destServers);
}
/**
@ -2297,13 +2276,10 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public void assignAllUserRegions() throws IOException, InterruptedException {
// Get all available servers
List<ServerName> servers = serverManager.getOnlineServersList();
// Remove the deadNotExpired servers from the server list.
removeDeadNotExpiredServers(servers);
List<ServerName> destServers = serverManager.createDestinationServersList();
// If there are no servers we need not proceed with region assignment.
if(servers.isEmpty()) return;
if(destServers.isEmpty()) return;
// Scan META for all user regions, skipping any disabled tables
Map<HRegionInfo, ServerName> allRegions =
@ -2317,17 +2293,17 @@ public class AssignmentManager extends ZooKeeperListener {
Map<ServerName, List<HRegionInfo>> bulkPlan = null;
if (retainAssignment) {
// Reuse existing assignment info
bulkPlan = balancer.retainAssignment(allRegions, servers);
bulkPlan = balancer.retainAssignment(allRegions, destServers);
} else {
// assign regions in round-robin fashion
assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), destServers);
for (HRegionInfo hri : allRegions.keySet()) {
setEnabledTable(hri);
}
return;
}
LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
servers.size() + " server(s), retainAssignment=" + retainAssignment);
destServers.size() + " server(s), retainAssignment=" + retainAssignment);
// Use fixed count thread pool assigning.
BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
@ -2947,8 +2923,7 @@ public class AssignmentManager extends ZooKeeperListener {
protected void chore() {
// If bulkAssign in progress, suspend checks
if (this.bulkAssign) return;
boolean allRSsOffline = this.serverManager.getOnlineServersList().
isEmpty();
boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
synchronized (regionsInTransition) {
// Iterate all regions in transition checking for time outs
@ -2957,14 +2932,14 @@ public class AssignmentManager extends ZooKeeperListener {
if (regionState.getStamp() + timeout <= now) {
//decide on action upon timeout
actOnTimeOut(regionState);
} else if (this.allRegionServersOffline && !allRSsOffline) {
} else if (this.allRegionServersOffline && !noRSAvailable) {
// if some RSs just came back online, we can start the
// the assignment right away
actOnTimeOut(regionState);
}
}
}
setAllRegionServersOffline(allRSsOffline);
setAllRegionServersOffline(noRSAvailable);
}
private void actOnTimeOut(RegionState regionState) {

View File

@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableEventHandler;
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -1200,38 +1199,41 @@ Server {
if (p == null)
throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
HRegionInfo hri = p.getFirst();
ServerName dest = null;
ServerName dest;
if (destServerName == null || destServerName.length == 0) {
LOG.info("Passed destination servername is null/empty so " +
"choosing a server at random");
this.assignmentManager.clearRegionPlan(hri);
// Unassign will reassign it elsewhere choosing random server.
this.assignmentManager.unassign(hri);
final List<ServerName> destServers = this.serverManager.createDestinationServersList(
p.getSecond());
dest = balancer.randomAssignment(hri, destServers);
} else {
dest = new ServerName(Bytes.toString(destServerName));
if (dest.equals(p.getSecond())) {
LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
+ " because region already assigned to the same server " + dest +".");
+ " because region already assigned to the same server " + dest + ".");
return;
}
try {
if (this.cpHost != null) {
if (this.cpHost.preMove(p.getFirst(), p.getSecond(), dest)) {
return;
}
}
// Now we can do the move
RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
try {
if (this.cpHost != null) {
if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
return;
}
RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
LOG.info("Added move plan " + rp + ", running balancer");
this.assignmentManager.balance(rp);
if (this.cpHost != null) {
this.cpHost.postMove(p.getFirst(), p.getSecond(), dest);
}
} catch (IOException ioe) {
UnknownRegionException ure = new UnknownRegionException(
Bytes.toStringBinary(encodedRegionName));
ure.initCause(ioe);
throw ure;
}
LOG.info("Added move plan " + rp + ", running balancer");
this.assignmentManager.balance(rp);
if (this.cpHost != null) {
this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
}
} catch (IOException ioe) {
UnknownRegionException ure = new UnknownRegionException(
Bytes.toStringBinary(encodedRegionName));
ure.initCause(ioe);
throw ure;
}
}

View File

@ -205,8 +205,8 @@ public class ServerManager {
}
/**
* Checks if the clock skew between the server and the master. If the clock skew exceeds the
* configured max, it will throw an exception; if it exceeds the configured warning threshold,
* Checks if the clock skew between the server and the master. If the clock skew exceeds the
* configured max, it will throw an exception; if it exceeds the configured warning threshold,
* it will log a warning but start normally.
* @param serverName Incoming servers's name
* @param serverCurrentTime
@ -223,7 +223,7 @@ public class ServerManager {
throw new ClockOutOfSyncException(message);
} else if (skew > warningSkew){
String message = "Reported time for server " + serverName + " is out of sync with master " +
"by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
"by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
"error threshold is " + maxSkew + "ms)";
LOG.warn(message);
}
@ -687,4 +687,55 @@ public class ServerManager {
}
}
}
/**
* Creates a list of possible destinations for a region. It contains the online servers, but not
* the draining or dying servers.
* @param serverToExclude can be null if there is no server to exclude
*/
public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
final List<ServerName> destServers = getOnlineServersList();
if (serverToExclude != null){
destServers.remove(serverToExclude);
}
// Loop through the draining server list and remove them from the server list
final List<ServerName> drainingServersCopy = getDrainingServersList();
if (!drainingServersCopy.isEmpty()) {
for (final ServerName server: drainingServersCopy) {
destServers.remove(server);
}
}
// Remove the deadNotExpired servers from the server list.
removeDeadNotExpiredServers(destServers);
return destServers;
}
/**
* Calls {@link #createDestinationServersList} without server to exclude.
*/
public List<ServerName> createDestinationServersList(){
return createDestinationServersList(null);
}
/**
* Loop through the deadNotExpired server list and remove them from the
* servers.
* This function should be used carefully outside of this class. You should use a high level
* method such as {@link #createDestinationServersList()} instead of managing you own list.
*/
void removeDeadNotExpiredServers(List<ServerName> servers) {
Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
if (!deadNotExpiredServersCopy.isEmpty()) {
for (ServerName server : deadNotExpiredServersCopy) {
LOG.debug("Removing dead but not expired server: " + server
+ " from eligible server pool.");
servers.remove(server);
}
}
}
}

View File

@ -181,9 +181,7 @@ public class CreateTableHandler extends EventHandler {
}
// 4. Trigger immediate assignment of the regions in round-robin fashion
List<ServerName> servers = serverManager.getOnlineServersList();
// Remove the deadNotExpired servers from the server list.
assignmentManager.removeDeadNotExpiredServers(servers);
List<ServerName> servers = serverManager.createDestinationServersList();
try {
this.assignmentManager.assignUserRegions(Arrays.asList(newRegions),
servers);

View File

@ -311,7 +311,7 @@ public class ServerShutdownHandler extends EventHandler {
}
// Get all available servers
List<ServerName> availableServers = services.getServerManager()
.getOnlineServersList();
.createDestinationServersList();
this.services.getAssignmentManager().assign(toAssignRegions,
availableServers);
}

View File

@ -3426,7 +3426,7 @@ public class HRegionServer implements ClientProtocol,
}
}
LOG.info("Received request to open region: "
+ region.getRegionNameAsString());
+ region.getRegionNameAsString() + " on "+this.serverNameFromMasterPOV);
HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true);
// Need to pass the expected version in the constructor.

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import junit.framework.Assert;
@ -27,13 +29,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -55,13 +59,14 @@ public class TestDrainingServer {
private static final byte [] TABLENAME = Bytes.toBytes("t");
private static final byte [] FAMILY = Bytes.toBytes("f");
private static final int COUNT_OF_REGIONS = HBaseTestingUtility.KEYS.length;
private static final int NB_SLAVES = 5;
/**
* Spin up a cluster with a bunch of regions on it.
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(5);
TEST_UTIL.startMiniCluster(NB_SLAVES);
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
@ -74,14 +79,29 @@ public class TestDrainingServer {
createTableDescriptor(fs, FSUtils.getRootDir(TEST_UTIL.getConfiguration()), htd);
// Assign out the regions we just created.
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
admin.disableTable(TABLENAME);
admin.enableTable(TABLENAME);
ZKAssign.blockUntilNoRIT(zkw);
// Assert that every regionserver has some regions on it.
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = cluster.getRegionServer(i);
Assert.assertFalse(ProtobufUtil.getOnlineRegions(hrs).isEmpty());
boolean ready = false;
while (!ready){
ZKAssign.blockUntilNoRIT(zkw);
// Assert that every regionserver has some regions on it.
int i = 0;
ready = true;
while (i < NB_SLAVES && ready){
HRegionServer hrs = cluster.getRegionServer(i);
if (ProtobufUtil.getOnlineRegions(hrs).isEmpty()){
ready = false;
}
i++;
}
if (!ready){
admin.balancer();
Thread.sleep(100);
}
}
}
@ -159,42 +179,96 @@ public class TestDrainingServer {
* @throws IOException
*/
@Test (timeout=30000)
public void testDrainingServerWithAbort() throws KeeperException, IOException {
// Add first server to draining servers up in zk.
HRegionServer drainingServer =
setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0));
public void testDrainingServerWithAbort() throws KeeperException, Exception {
// Ensure a stable env
TEST_UTIL.getHBaseAdmin().balanceSwitch(false);
waitForAllRegionsOnline();
final long regionCount = TEST_UTIL.getMiniHBaseCluster().countServedRegions();
// Let's get a copy of the regions today.
Collection<HRegion> regions = new ArrayList<HRegion>();
for (int i = 0; i < NB_SLAVES; i++) {
HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i);
regions.addAll( hrs.getCopyOfOnlineRegionsSortedBySize().values() );
}
// Choose the draining server
HRegionServer drainingServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions();
Assert.assertTrue(regionsOnDrainingServer > 0);
ServerManager sm = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
Collection<HRegion> regionsBefore = drainingServer.
getCopyOfOnlineRegionsSortedBySize().values();
LOG.info("Regions of drained server are: "+ regionsBefore );
try {
final int regionsOnDrainingServer =
drainingServer.getNumberOfOnlineRegions();
Assert.assertTrue(regionsOnDrainingServer > 0);
// Kill a few regionservers.
int aborted = 0;
final int numberToAbort = 2;
for (int i = 1; i < TEST_UTIL.getMiniHBaseCluster().countServedRegions(); i++) {
HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i);
if (hrs.getServerName().equals(drainingServer.getServerName())) continue;
hrs.abort("Aborting");
aborted++;
if (aborted >= numberToAbort) break;
}
// Wait for regions to come back on line again.
waitForAllRegionsOnline();
// Assert the draining server still has the same number of regions.
Assert.assertEquals(regionsOnDrainingServer,
// Add first server to draining servers up in zk.
setDrainingServer(drainingServer);
//wait for the master to receive and manage the event
while (sm.createDestinationServersList().contains(drainingServer.getServerName())) ;
LOG.info("The available servers are: "+ sm.createDestinationServersList());
Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer,
drainingServer.getNumberOfOnlineRegions());
Assert.assertTrue("We should not have regions in transition here.",
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
getRegionsInTransition().isEmpty() );
// Kill a few regionservers.
for (int aborted = 0; aborted <= 2; aborted++) {
HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(aborted+1);
hrs.abort("Aborting");
}
// Wait for regions to come back online again.
waitForAllRegionsOnline();
Collection<HRegion> regionsAfter =
drainingServer.getCopyOfOnlineRegionsSortedBySize().values();
LOG.info("Regions of drained server are: "+ regionsAfter );
Assert.assertEquals("Test conditions are not met: regions were" +
" created/deleted during the test. ",
regionCount, TEST_UTIL.getMiniHBaseCluster().countServedRegions());
// Assert the draining server still has the same regions.
StringBuilder result = new StringBuilder();
for (HRegion r: regionsAfter){
if (!regionsBefore.contains(r)){
result.append(r).append(" was added after the drain");
if (regions.contains(r)){
result.append("(existing region");
} else {
result.append("(new region)");
}
result.append("; ");
}
}
for (HRegion r: regionsBefore){
if (!regionsAfter.contains(r)){
result.append(r).append(" was removed after the drain; ");
}
}
Assert.assertTrue("Errors are: "+ result.toString(), result.length()==0);
} finally {
unsetDrainingServer(drainingServer);
}
}
private void waitForAllRegionsOnline() {
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().isRegionsInTransition()) {
Threads.sleep(10);
}
// Wait for regions to come back on line again.
while (!isAllRegionsOnline()) {
Threads.sleep(10);
}
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().isRegionsInTransition()) {
}
}

View File

@ -134,6 +134,12 @@ public class TestAssignmentManager {
Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(this.serverManager.getOnlineServers()).thenReturn(onlineServers);
List<ServerName> avServers = new ArrayList<ServerName>();
avServers.addAll(onlineServers.keySet());
Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(avServers);
Mockito.when(this.serverManager.createDestinationServersList(null)).thenReturn(avServers);
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, -1)).
thenReturn(true);
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_B, REGIONINFO, -1)).