diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index dc5bace2224..2b13b21e94e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -33,6 +33,7 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; @@ -59,6 +60,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; /** * The base class for load balancers. It provides the the functions used to by @@ -117,6 +119,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { HRegionInfo[] regions; Deque[] regionLoads; private RegionLocationFinder regionFinder; + ArrayList> regionLocationFutures; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -238,6 +241,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; + regionLocationFutures = new ArrayList>( + numRegions); + if (regionFinder != null) { + for (int i = 0; i < numRegions; i++) { + regionLocationFutures.add(null); + } + } regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; serverIndicesSortedByLocality = new Integer[numServers]; @@ -307,6 +317,33 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndex++; } + if (regionFinder != null) { + for (int index = 0; index < regionLocationFutures.size(); index++) { + ListenableFuture future = regionLocationFutures + .get(index); + HDFSBlocksDistribution blockDistbn = null; + try { + blockDistbn = future.get(); + } catch (InterruptedException ite) { + } catch (ExecutionException ee) { + LOG.debug( + "IOException during HDFSBlocksDistribution computation. for region = " + + regions[index].getEncodedName(), ee); + } finally { + if (blockDistbn == null) { + blockDistbn = new HDFSBlocksDistribution(); + } + } + List loc = regionFinder.getTopBlockLocations(blockDistbn); + regionLocations[index] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[index][i] = loc.get(i) == null ? -1 + : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 + : serversToIndex.get(loc.get(i).getHostAndPort())); + } + } + } + for (int i = 0; i < serversPerHostList.size(); i++) { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { @@ -454,15 +491,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } if (regionFinder != null) { - //region location - List loc = regionFinder.getTopBlockLocations(region); - regionLocations[regionIndex] = new int[loc.size()]; - for (int i=0; i < loc.size(); i++) { - regionLocations[regionIndex][i] = - loc.get(i) == null ? -1 : - (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 - : serversToIndex.get(loc.get(i).getHostAndPort())); - } + // region location + regionLocationFutures.set(regionIndex, + regionFinder.asyncGetBlockDistribution(region)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index a6724ee5284..fbe57d05b20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit; class RegionLocationFinder { private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class); private static final long CACHE_TIME = 240 * 60 * 1000; + private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private Configuration conf; private volatile ClusterStatus status; private MasterServices services; @@ -164,8 +167,8 @@ class RegionLocationFinder { return includesUserTables; } - protected List getTopBlockLocations(HRegionInfo region) { - HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); + protected List getTopBlockLocations( + HDFSBlocksDistribution blocksDistribution) { List topHosts = blocksDistribution.getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -208,7 +211,7 @@ class RegionLocationFinder { + region.getEncodedName(), ioe); } - return new HDFSBlocksDistribution(); + return EMPTY_BLOCK_DISTRIBUTION; } /** @@ -295,4 +298,13 @@ class RegionLocationFinder { return blockDistbn; } } + + public ListenableFuture asyncGetBlockDistribution( + HRegionInfo hri) { + try { + return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); + } catch (Exception e) { + return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index d8c0a3dc92f..37165d31c29 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -56,6 +57,8 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; @Category({MasterTests.class, MediumTests.class}) public class TestBaseLoadBalancer extends BalancerTestBase { @@ -448,17 +451,49 @@ public class TestBaseLoadBalancer extends BalancerTestBase { // mock block locality for some regions RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); + HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture defaultFuture = Futures + .immediateFuture(emptyBlockDistribution); + for (HRegionInfo regionInfo : regions) { + when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn( + defaultFuture); + } // block locality: region:0 => {server:0} // region:1 => {server:0, server:1} // region:42 => {server:4, server:9, server:5} - when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn( - Lists.newArrayList(servers.get(0))); - when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn( - Lists.newArrayList(servers.get(0), servers.get(1))); - when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( - Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); - when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( - Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus + HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future0 = Futures + .immediateFuture(region0BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn( + future0); + when(locationFinder.getTopBlockLocations(region0BlockDistribution)) + .thenReturn(Lists.newArrayList(servers.get(0))); + + HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future1 = Futures + .immediateFuture(region1BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn( + future1); + when(locationFinder.getTopBlockLocations(region1BlockDistribution)) + .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1))); + + HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future42 = Futures + .immediateFuture(region42BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn( + future42); + when(locationFinder.getTopBlockLocations(region42BlockDistribution)) + .thenReturn( + Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); + + HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future43 = Futures + .immediateFuture(region43BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn( + future43); + // this server does not exists in clusterStatus + when(locationFinder.getTopBlockLocations(region43BlockDistribution)) + .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java index daa89427858..039cac1d7a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java @@ -121,7 +121,8 @@ public class TestRegionLocationFinder { for (int i = 0; i < ServerNum; i++) { HRegionServer server = cluster.getRegionServer(i); for (Region region : server.getOnlineRegions(tableName)) { - List servers = finder.getTopBlockLocations(region.getRegionInfo()); + List servers = finder.getTopBlockLocations(finder + .getBlockDistribution(region.getRegionInfo())); // test table may have empty region if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) { continue;