From dac73eceb03bf871ce6def7982b39950e68be1e2 Mon Sep 17 00:00:00 2001 From: binlijin Date: Thu, 10 Nov 2016 16:47:31 +0800 Subject: [PATCH] HBASE-16570 Compute region locality in parallel at startup (addendum) Addendum mainly for: 1. Avoid interfering with block location cache in RegionLocationFinder 2. Avoid refreshing block lcoations during HMaster startup (or else the startup could be really slow) Signed-off-by: Yu Li --- .../master/balancer/BaseLoadBalancer.java | 70 +++++------------ .../master/balancer/RegionLocationFinder.java | 78 +++++++++++++------ .../master/balancer/TestBaseLoadBalancer.java | 51 ++---------- .../balancer/TestRegionLocationFinder.java | 25 +++++- 4 files changed, 105 insertions(+), 119 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 93b29b6d7fb..498d03dc139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -33,7 +33,6 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; @@ -58,7 +57,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListenableFuture; /** * The base class for load balancers. It provides the the functions used to by @@ -117,7 +115,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { HRegionInfo[] regions; Deque[] regionLoads; private RegionLocationFinder regionFinder; - ArrayList> regionLocationFutures; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -167,8 +164,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Map> loads, RegionLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, - rackManager); + this(null, clusterState, loads, regionFinder, rackManager); } @SuppressWarnings("unchecked") @@ -239,13 +235,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; - regionLocationFutures = new ArrayList>( - numRegions); - if (regionFinder != null) { - for (int i = 0; i < numRegions; i++) { - regionLocationFutures.add(null); - } - } + regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; serverIndicesSortedByLocality = new Integer[numServers]; @@ -305,43 +295,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer { for (HRegionInfo region : entry.getValue()) { registerRegion(region, regionIndex, serverIndex, loads, regionFinder); - regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; regionIndex++; } } + for (HRegionInfo region : unassignedRegions) { registerRegion(region, regionIndex, -1, loads, regionFinder); regionIndex++; } - if (regionFinder != null) { - for (int index = 0; index < regionLocationFutures.size(); index++) { - ListenableFuture future = regionLocationFutures - .get(index); - HDFSBlocksDistribution blockDistbn = null; - try { - blockDistbn = future.get(); - } catch (InterruptedException ite) { - } catch (ExecutionException ee) { - LOG.debug( - "IOException during HDFSBlocksDistribution computation. for region = " - + regions[index].getEncodedName(), ee); - } finally { - if (blockDistbn == null) { - blockDistbn = new HDFSBlocksDistribution(); - } - } - List loc = regionFinder.getTopBlockLocations(blockDistbn); - regionLocations[index] = new int[loc.size()]; - for (int i = 0; i < loc.size(); i++) { - regionLocations[index][i] = loc.get(i) == null ? -1 - : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 - : serversToIndex.get(loc.get(i).getHostAndPort())); - } - } - } - for (int i = 0; i < serversPerHostList.size(); i++) { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { @@ -462,8 +425,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } /** Helper for Cluster constructor to handle a region */ - private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex, - Map> loads, RegionLocationFinder regionFinder) { + private void registerRegion(HRegionInfo region, int regionIndex, + int serverIndex, Map> loads, + RegionLocationFinder regionFinder) { String tableName = region.getTable().getNameAsString(); if (!tablesToIndex.containsKey(tableName)) { tables.add(tableName); @@ -490,8 +454,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (regionFinder != null) { // region location - regionLocationFutures.set(regionIndex, - regionFinder.asyncGetBlockDistribution(region)); + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = loc.get(i) == null ? -1 + : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 + : serversToIndex.get(loc.get(i).getHostAndPort())); + } } } @@ -1262,7 +1231,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); List unassignedRegions = new ArrayList(); roundRobinAssignment(cluster, regions, unassignedRegions, @@ -1309,7 +1278,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected Cluster createCluster(List servers, - Collection regions) { + Collection regions, boolean forceRefresh) { + if (forceRefresh == true) { + regionFinder.refreshAndWait(regions); + } // Get the snapshot of the current assignments for the regions in question, and then create // a cluster out of it. Note that we might have replicas already assigned to some servers // earlier. So we want to get the snapshot to see those assignments, but this will only contain @@ -1322,7 +1294,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } return new Cluster(regions, clusterState, null, this.regionFinder, - rackManager); + rackManager); } /** @@ -1383,7 +1355,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } List regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); return randomAssignment(cluster, regionInfo, servers); } @@ -1458,7 +1430,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; - Cluster cluster = createCluster(servers, regions.keySet()); + Cluster cluster = createCluster(servers, regions.keySet(), true); for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index fbe57d05b20..d5edfab5d28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -17,15 +17,17 @@ */ package org.apache.hadoop.hbase.master.balancer; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,17 +45,15 @@ import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This will find where data for a region is located in HDFS. It ranks @@ -70,7 +70,8 @@ class RegionLocationFinder { private volatile ClusterStatus status; private MasterServices services; private final ListeningExecutorService executor; - private long lastFullRefresh = 0; + // Do not scheduleFullRefresh at master startup + private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); private CacheLoader loader = new CacheLoader() { @@ -167,9 +168,8 @@ class RegionLocationFinder { return includesUserTables; } - protected List getTopBlockLocations( - HDFSBlocksDistribution blocksDistribution) { - List topHosts = blocksDistribution.getTopHosts(); + protected List getTopBlockLocations(HRegionInfo region) { + List topHosts = getBlockDistribution(region).getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -299,7 +299,7 @@ class RegionLocationFinder { } } - public ListenableFuture asyncGetBlockDistribution( + private ListenableFuture asyncGetBlockDistribution( HRegionInfo hri) { try { return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); @@ -307,4 +307,32 @@ class RegionLocationFinder { return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); } } + + public void refreshAndWait(Collection hris) { + ArrayList> regionLocationFutures = + new ArrayList>(hris.size()); + for (HRegionInfo hregionInfo : hris) { + regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); + } + int index = 0; + for (HRegionInfo hregionInfo : hris) { + ListenableFuture future = regionLocationFutures + .get(index); + try { + cache.put(hregionInfo, future.get()); + } catch (InterruptedException ite) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + LOG.debug( + "ExecutionException during HDFSBlocksDistribution computation. for region = " + + hregionInfo.getEncodedName(), ee); + } + index++; + } + } + + // For test + LoadingCache getCache() { + return cache; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index f27b2685d92..e3523b1779e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.ServerName; @@ -56,8 +55,6 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; @Category(MediumTests.class) public class TestBaseLoadBalancer extends BalancerTestBase { @@ -459,49 +456,17 @@ public class TestBaseLoadBalancer extends BalancerTestBase { // mock block locality for some regions RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); - HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture defaultFuture = Futures - .immediateFuture(emptyBlockDistribution); - for (HRegionInfo regionInfo : regions) { - when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn( - defaultFuture); - } // block locality: region:0 => {server:0} // region:1 => {server:0, server:1} // region:42 => {server:4, server:9, server:5} - HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future0 = Futures - .immediateFuture(region0BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn( - future0); - when(locationFinder.getTopBlockLocations(region0BlockDistribution)) - .thenReturn(Lists.newArrayList(servers.get(0))); - - HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future1 = Futures - .immediateFuture(region1BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn( - future1); - when(locationFinder.getTopBlockLocations(region1BlockDistribution)) - .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1))); - - HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future42 = Futures - .immediateFuture(region42BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn( - future42); - when(locationFinder.getTopBlockLocations(region42BlockDistribution)) - .thenReturn( - Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); - - HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future43 = Futures - .immediateFuture(region43BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn( - future43); - // this server does not exists in clusterStatus - when(locationFinder.getTopBlockLocations(region43BlockDistribution)) - .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); + when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn( + Lists.newArrayList(servers.get(0))); + when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn( + Lists.newArrayList(servers.get(0), servers.get(1))); + when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( + Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); + when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( + Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java index 0e1036fa128..d33b1cf5682 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -124,8 +125,8 @@ public class TestRegionLocationFinder { for (int i = 0; i < ServerNum; i++) { HRegionServer server = cluster.getRegionServer(i); for (Region region : server.getOnlineRegions(tableName)) { - List servers = finder.getTopBlockLocations(finder - .getBlockDistribution(region.getRegionInfo())); + List servers = finder.getTopBlockLocations(region + .getRegionInfo()); // test table may have empty region if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) { continue; @@ -142,4 +143,24 @@ public class TestRegionLocationFinder { } } } + + @Test + public void testRefreshAndWait() throws Exception { + finder.getCache().invalidateAll(); + for (int i = 0; i < ServerNum; i++) { + HRegionServer server = cluster.getRegionServer(i); + List regions = server.getOnlineRegions(tableName); + if (regions.size() <= 0) { + continue; + } + List regionInfos = new ArrayList(regions.size()); + for (Region region : regions) { + regionInfos.add(region.getRegionInfo()); + } + finder.refreshAndWait(regionInfos); + for (HRegionInfo regionInfo : regionInfos) { + assertNotNull(finder.getCache().getIfPresent(regionInfo)); + } + } + } }