HBASE-5914 Bulk assign regions in the process of ServerShutdownHandler (Chunhui)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1336308 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-05-09 17:29:20 +00:00
parent d6c82520df
commit 26d737fbc5
3 changed files with 57 additions and 5 deletions

View File

@ -1372,11 +1372,12 @@ public class AssignmentManager extends ZooKeeperListener {
* Bulk assign regions to <code>destination</code>. * Bulk assign regions to <code>destination</code>.
* @param destination * @param destination
* @param regions Regions to assign. * @param regions Regions to assign.
* @return true if successful
*/ */
void assign(final ServerName destination, boolean assign(final ServerName destination,
final List<HRegionInfo> regions) { final List<HRegionInfo> regions) {
if (regions.size() == 0) { if (regions.size() == 0) {
return; return true;
} }
LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
destination.toString()); destination.toString());
@ -1403,7 +1404,7 @@ public class AssignmentManager extends ZooKeeperListener {
new CreateUnassignedAsyncCallback(this.watcher, destination, counter); new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
for (RegionState state: states) { for (RegionState state: states) {
if (!asyncSetOfflineInZooKeeper(state, cb, state)) { if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
return; return false;
} }
} }
// Wait until all unassigned nodes have been put up and watchers set. // Wait until all unassigned nodes have been put up and watchers set.
@ -1434,7 +1435,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (decodedException instanceof RegionServerStoppedException) { if (decodedException instanceof RegionServerStoppedException) {
LOG.warn("The region server was shut down, ", decodedException); LOG.warn("The region server was shut down, ", decodedException);
// No need to retry, the region server is a goner. // No need to retry, the region server is a goner.
return; return false;
} else if (decodedException instanceof ServerNotRunningYetException) { } else if (decodedException instanceof ServerNotRunningYetException) {
// This is the one exception to retry. For all else we should just fail // This is the one exception to retry. For all else we should just fail
// the startup. // the startup.
@ -1452,10 +1453,53 @@ public class AssignmentManager extends ZooKeeperListener {
// Can be a socket timeout, EOF, NoRouteToHost, etc // Can be a socket timeout, EOF, NoRouteToHost, etc
LOG.info("Unable to communicate with the region server in order" + LOG.info("Unable to communicate with the region server in order" +
" to assign regions", e); " to assign regions", e);
return false;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
LOG.debug("Bulk assigning done for " + destination.toString()); LOG.debug("Bulk assigning done for " + destination.toString());
return true;
}
/**
* Bulk assign regions to available servers if any with retry, else assign
* region singly.
*
* @param regions all regions to assign
* @param servers all available servers
*/
public void assign(List<HRegionInfo> regions, List<ServerName> servers) {
LOG.info("Quickly assigning " + regions.size() + " region(s) across "
+ servers.size() + " server(s)");
Map<ServerName, List<HRegionInfo>> bulkPlan = balancer
.roundRobinAssignment(regions, servers);
if (bulkPlan == null || bulkPlan.isEmpty()) {
LOG.info("Failed getting bulk plan, assigning region singly");
for (HRegionInfo region : regions) {
assign(region, true);
}
return;
}
Map<ServerName, List<HRegionInfo>> failedPlans = new HashMap<ServerName, List<HRegionInfo>>();
for (Map.Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) {
try {
if (!assign(e.getKey(), e.getValue())) {
failedPlans.put(e.getKey(), e.getValue());
}
} catch (Throwable t) {
failedPlans.put(e.getKey(), e.getValue());
}
}
if (!failedPlans.isEmpty()) {
servers.removeAll(failedPlans.keySet());
List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
LOG.info("Failed assigning " + e.getValue().size()
+ " regions to server " + e.getKey() + ", reassigning them");
reassigningRegions.addAll(e.getValue());
}
assign(reassigningRegions, servers);
}
} }
/** /**

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.master.handler; package org.apache.hadoop.hbase.master.handler;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -285,6 +286,7 @@ public class ServerShutdownHandler extends EventHandler {
// Iterate regions that were on this server and assign them // Iterate regions that were on this server and assign them
if (hris != null) { if (hris != null) {
List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) { for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
if (processDeadRegion(e.getKey(), e.getValue(), if (processDeadRegion(e.getKey(), e.getValue(),
this.services.getAssignmentManager(), this.services.getAssignmentManager(),
@ -303,10 +305,15 @@ public class ServerShutdownHandler extends EventHandler {
+ " because it has been opened in " + " because it has been opened in "
+ addressFromAM.getServerName()); + addressFromAM.getServerName());
} else { } else {
this.services.getAssignmentManager().assign(e.getKey(), true); toAssignRegions.add(e.getKey());
} }
} }
} }
// Get all available servers
List<ServerName> availableServers = services.getServerManager()
.getOnlineServersList();
this.services.getAssignmentManager().assign(toAssignRegions,
availableServers);
} }
} finally { } finally {
this.deadServers.finish(serverName); this.deadServers.finish(serverName);

View File

@ -426,6 +426,7 @@ public class TestAssignmentManager {
// I need a services instance that will return the AM // I need a services instance that will return the AM
MasterServices services = Mockito.mock(MasterServices.class); MasterServices services = Mockito.mock(MasterServices.class);
Mockito.when(services.getAssignmentManager()).thenReturn(am); Mockito.when(services.getAssignmentManager()).thenReturn(am);
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
ServerShutdownHandler handler = new ServerShutdownHandler(this.server, ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
services, deadServers, SERVERNAME_A, false); services, deadServers, SERVERNAME_A, false);
handler.process(); handler.process();