From 26d737fbc5e11752e264d6d27b25ca74d09077df Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 9 May 2012 17:29:20 +0000 Subject: [PATCH] HBASE-5914 Bulk assign regions in the process of ServerShutdownHandler (Chunhui) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1336308 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/master/AssignmentManager.java | 52 +++++++++++++++++-- .../master/handler/ServerShutdownHandler.java | 9 +++- .../hbase/master/TestAssignmentManager.java | 1 + 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 6609f337220..2d47fce9771 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -1372,11 +1372,12 @@ public class AssignmentManager extends ZooKeeperListener { * Bulk assign regions to destination. * @param destination * @param regions Regions to assign. + * @return true if successful */ - void assign(final ServerName destination, + boolean assign(final ServerName destination, final List regions) { if (regions.size() == 0) { - return; + return true; } LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + destination.toString()); @@ -1403,7 +1404,7 @@ public class AssignmentManager extends ZooKeeperListener { new CreateUnassignedAsyncCallback(this.watcher, destination, counter); for (RegionState state: states) { if (!asyncSetOfflineInZooKeeper(state, cb, state)) { - return; + return false; } } // Wait until all unassigned nodes have been put up and watchers set. @@ -1434,7 +1435,7 @@ public class AssignmentManager extends ZooKeeperListener { if (decodedException instanceof RegionServerStoppedException) { LOG.warn("The region server was shut down, ", decodedException); // No need to retry, the region server is a goner. - return; + return false; } else if (decodedException instanceof ServerNotRunningYetException) { // This is the one exception to retry. For all else we should just fail // the startup. @@ -1452,10 +1453,53 @@ public class AssignmentManager extends ZooKeeperListener { // Can be a socket timeout, EOF, NoRouteToHost, etc LOG.info("Unable to communicate with the region server in order" + " to assign regions", e); + return false; } catch (InterruptedException e) { throw new RuntimeException(e); } LOG.debug("Bulk assigning done for " + destination.toString()); + return true; + } + + /** + * Bulk assign regions to available servers if any with retry, else assign + * region singly. + * + * @param regions all regions to assign + * @param servers all available servers + */ + public void assign(List regions, List servers) { + LOG.info("Quickly assigning " + regions.size() + " region(s) across " + + servers.size() + " server(s)"); + Map> bulkPlan = balancer + .roundRobinAssignment(regions, servers); + if (bulkPlan == null || bulkPlan.isEmpty()) { + LOG.info("Failed getting bulk plan, assigning region singly"); + for (HRegionInfo region : regions) { + assign(region, true); + } + return; + } + Map> failedPlans = new HashMap>(); + for (Map.Entry> e : bulkPlan.entrySet()) { + try { + if (!assign(e.getKey(), e.getValue())) { + failedPlans.put(e.getKey(), e.getValue()); + } + } catch (Throwable t) { + failedPlans.put(e.getKey(), e.getValue()); + } + } + if (!failedPlans.isEmpty()) { + servers.removeAll(failedPlans.keySet()); + List reassigningRegions = new ArrayList(); + for (Map.Entry> e : failedPlans.entrySet()) { + LOG.info("Failed assigning " + e.getValue().size() + + " regions to server " + e.getKey() + ", reassigning them"); + reassigningRegions.addAll(e.getValue()); + } + assign(reassigningRegions, servers); + } } /** diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 2ec6677c2cb..a6616cccf1c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.handler; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -285,6 +286,7 @@ public class ServerShutdownHandler extends EventHandler { // Iterate regions that were on this server and assign them if (hris != null) { + List toAssignRegions = new ArrayList(); for (Map.Entry e: hris.entrySet()) { if (processDeadRegion(e.getKey(), e.getValue(), this.services.getAssignmentManager(), @@ -303,10 +305,15 @@ public class ServerShutdownHandler extends EventHandler { + " because it has been opened in " + addressFromAM.getServerName()); } else { - this.services.getAssignmentManager().assign(e.getKey(), true); + toAssignRegions.add(e.getKey()); } } } + // Get all available servers + List availableServers = services.getServerManager() + .getOnlineServersList(); + this.services.getAssignmentManager().assign(toAssignRegions, + availableServers); } } finally { this.deadServers.finish(serverName); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index b17ed1fee2c..004ac868044 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -426,6 +426,7 @@ public class TestAssignmentManager { // I need a services instance that will return the AM MasterServices services = Mockito.mock(MasterServices.class); Mockito.when(services.getAssignmentManager()).thenReturn(am); + Mockito.when(services.getServerManager()).thenReturn(this.serverManager); ServerShutdownHandler handler = new ServerShutdownHandler(this.server, services, deadServers, SERVERNAME_A, false); handler.process();