HBASE-16570 Compute region locality in parallel at startup (addendum)

Addendum mainly for:
1. Avoid interfering with block location cache in RegionLocationFinder
2. Avoid refreshing block lcoations during HMaster startup (or else the startup could be really slow)

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
binlijin 2016-11-10 16:47:31 +08:00 committed by Yu Li
parent e5a288e5c0
commit 7f08cd0e10
4 changed files with 106 additions and 119 deletions

View File

@ -33,7 +33,6 @@ import java.util.NavigableMap;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -60,7 +59,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
/** /**
* The base class for load balancers. It provides the the functions used to by * The base class for load balancers. It provides the the functions used to by
@ -119,7 +117,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
HRegionInfo[] regions; HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads; Deque<RegionLoad>[] regionLoads;
private RegionLocationFinder regionFinder; private RegionLocationFinder regionFinder;
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@ -169,8 +166,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
Map<String, Deque<RegionLoad>> loads, Map<String, Deque<RegionLoad>> loads,
RegionLocationFinder regionFinder, RegionLocationFinder regionFinder,
RackManager rackManager) { RackManager rackManager) {
this(null, clusterState, loads, regionFinder, this(null, clusterState, loads, regionFinder, rackManager);
rackManager);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -241,13 +237,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToTableIndex = new int[numRegions]; regionIndexToTableIndex = new int[numRegions];
regionIndexToPrimaryIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions];
regionLoads = new Deque[numRegions]; regionLoads = new Deque[numRegions];
regionLocationFutures = new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(
numRegions);
if (regionFinder != null) {
for (int i = 0; i < numRegions; i++) {
regionLocationFutures.add(null);
}
}
regionLocations = new int[numRegions][]; regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers]; serverIndicesSortedByRegionCount = new Integer[numServers];
serverIndicesSortedByLocality = new Integer[numServers]; serverIndicesSortedByLocality = new Integer[numServers];
@ -307,43 +297,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (HRegionInfo region : entry.getValue()) { for (HRegionInfo region : entry.getValue()) {
registerRegion(region, regionIndex, serverIndex, loads, regionFinder); registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
regionIndex++; regionIndex++;
} }
} }
for (HRegionInfo region : unassignedRegions) { for (HRegionInfo region : unassignedRegions) {
registerRegion(region, regionIndex, -1, loads, regionFinder); registerRegion(region, regionIndex, -1, loads, regionFinder);
regionIndex++; regionIndex++;
} }
if (regionFinder != null) {
for (int index = 0; index < regionLocationFutures.size(); index++) {
ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
.get(index);
HDFSBlocksDistribution blockDistbn = null;
try {
blockDistbn = future.get();
} catch (InterruptedException ite) {
} catch (ExecutionException ee) {
LOG.debug(
"IOException during HDFSBlocksDistribution computation. for region = "
+ regions[index].getEncodedName(), ee);
} finally {
if (blockDistbn == null) {
blockDistbn = new HDFSBlocksDistribution();
}
}
List<ServerName> loc = regionFinder.getTopBlockLocations(blockDistbn);
regionLocations[index] = new int[loc.size()];
for (int i = 0; i < loc.size(); i++) {
regionLocations[index][i] = loc.get(i) == null ? -1
: (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
: serversToIndex.get(loc.get(i).getHostAndPort()));
}
}
}
for (int i = 0; i < serversPerHostList.size(); i++) { for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()]; serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) { for (int j = 0; j < serversPerHost[i].length; j++) {
@ -464,8 +427,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} }
/** Helper for Cluster constructor to handle a region */ /** Helper for Cluster constructor to handle a region */
private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex, private void registerRegion(HRegionInfo region, int regionIndex,
Map<String, Deque<RegionLoad>> loads, RegionLocationFinder regionFinder) { int serverIndex, Map<String, Deque<RegionLoad>> loads,
RegionLocationFinder regionFinder) {
String tableName = region.getTable().getNameAsString(); String tableName = region.getTable().getNameAsString();
if (!tablesToIndex.containsKey(tableName)) { if (!tablesToIndex.containsKey(tableName)) {
tables.add(tableName); tables.add(tableName);
@ -492,8 +456,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
if (regionFinder != null) { if (regionFinder != null) {
// region location // region location
regionLocationFutures.set(regionIndex, List<ServerName> loc = regionFinder.getTopBlockLocations(region);
regionFinder.asyncGetBlockDistribution(region)); regionLocations[regionIndex] = new int[loc.size()];
for (int i = 0; i < loc.size(); i++) {
regionLocations[regionIndex][i] = loc.get(i) == null ? -1
: (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
: serversToIndex.get(loc.get(i).getHostAndPort()));
}
} }
} }
@ -1277,7 +1246,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return assignments; return assignments;
} }
Cluster cluster = createCluster(servers, regions); Cluster cluster = createCluster(servers, regions, false);
List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>(); List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
roundRobinAssignment(cluster, regions, unassignedRegions, roundRobinAssignment(cluster, regions, unassignedRegions,
@ -1324,7 +1293,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} }
protected Cluster createCluster(List<ServerName> servers, protected Cluster createCluster(List<ServerName> servers,
Collection<HRegionInfo> regions) { Collection<HRegionInfo> regions, boolean forceRefresh) {
if (forceRefresh == true) {
regionFinder.refreshAndWait(regions);
}
// Get the snapshot of the current assignments for the regions in question, and then create // Get the snapshot of the current assignments for the regions in question, and then create
// a cluster out of it. Note that we might have replicas already assigned to some servers // a cluster out of it. Note that we might have replicas already assigned to some servers
// earlier. So we want to get the snapshot to see those assignments, but this will only contain // earlier. So we want to get the snapshot to see those assignments, but this will only contain
@ -1337,7 +1309,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} }
} }
return new Cluster(regions, clusterState, null, this.regionFinder, return new Cluster(regions, clusterState, null, this.regionFinder,
rackManager); rackManager);
} }
/** /**
@ -1365,7 +1337,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} }
List<HRegionInfo> regions = Lists.newArrayList(regionInfo); List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
Cluster cluster = createCluster(servers, regions); Cluster cluster = createCluster(servers, regions, false);
return randomAssignment(cluster, regionInfo, servers); return randomAssignment(cluster, regionInfo, servers);
} }
@ -1440,7 +1412,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int numRandomAssignments = 0; int numRandomAssignments = 0;
int numRetainedAssigments = 0; int numRetainedAssigments = 0;
Cluster cluster = createCluster(servers, regions.keySet()); Cluster cluster = createCluster(servers, regions.keySet(), true);
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) { for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
HRegionInfo region = entry.getKey(); HRegionInfo region = entry.getKey();

View File

@ -17,15 +17,17 @@
*/ */
package org.apache.hadoop.hbase.master.balancer; package org.apache.hadoop.hbase.master.balancer;
import com.google.common.cache.CacheBuilder; import java.io.FileNotFoundException;
import com.google.common.cache.CacheLoader; import java.io.IOException;
import com.google.common.cache.LoadingCache; import java.util.ArrayList;
import com.google.common.collect.Lists; import java.util.Collection;
import com.google.common.util.concurrent.Futures; import java.util.HashMap;
import com.google.common.util.concurrent.ListenableFuture; import java.util.List;
import com.google.common.util.concurrent.ListeningExecutorService; import java.util.Set;
import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Callable;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -43,17 +45,15 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import java.io.FileNotFoundException; import com.google.common.cache.CacheBuilder;
import java.io.IOException; import com.google.common.cache.CacheLoader;
import java.util.ArrayList; import com.google.common.cache.LoadingCache;
import java.util.Collection; import com.google.common.collect.Lists;
import java.util.HashMap; import com.google.common.util.concurrent.Futures;
import java.util.List; import com.google.common.util.concurrent.ListenableFuture;
import java.util.Set; import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.Callable; import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** /**
* This will find where data for a region is located in HDFS. It ranks * This will find where data for a region is located in HDFS. It ranks
@ -70,7 +70,8 @@ class RegionLocationFinder {
private volatile ClusterStatus status; private volatile ClusterStatus status;
private MasterServices services; private MasterServices services;
private final ListeningExecutorService executor; private final ListeningExecutorService executor;
private long lastFullRefresh = 0; // Do not scheduleFullRefresh at master startup
private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader = private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() { new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
@ -167,9 +168,8 @@ class RegionLocationFinder {
return includesUserTables; return includesUserTables;
} }
protected List<ServerName> getTopBlockLocations( protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
HDFSBlocksDistribution blocksDistribution) { List<String> topHosts = getBlockDistribution(region).getTopHosts();
List<String> topHosts = blocksDistribution.getTopHosts();
return mapHostNameToServerName(topHosts); return mapHostNameToServerName(topHosts);
} }
@ -299,7 +299,7 @@ class RegionLocationFinder {
} }
} }
public ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution( private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
HRegionInfo hri) { HRegionInfo hri) {
try { try {
return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
@ -307,4 +307,32 @@ class RegionLocationFinder {
return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
} }
} }
public void refreshAndWait(Collection<HRegionInfo> hris) {
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(hris.size());
for (HRegionInfo hregionInfo : hris) {
regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
}
int index = 0;
for (HRegionInfo hregionInfo : hris) {
ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
.get(index);
try {
cache.put(hregionInfo, future.get());
} catch (InterruptedException ite) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
LOG.debug(
"ExecutionException during HDFSBlocksDistribution computation. for region = "
+ hregionInfo.getEncodedName(), ee);
}
index++;
}
}
// For test
LoadingCache<HRegionInfo, HDFSBlocksDistribution> getCache() {
return cache;
}
} }

View File

@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -57,8 +56,6 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@Category({MasterTests.class, MediumTests.class}) @Category({MasterTests.class, MediumTests.class})
public class TestBaseLoadBalancer extends BalancerTestBase { public class TestBaseLoadBalancer extends BalancerTestBase {
@ -451,49 +448,17 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// mock block locality for some regions // mock block locality for some regions
RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); RegionLocationFinder locationFinder = mock(RegionLocationFinder.class);
HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> defaultFuture = Futures
.immediateFuture(emptyBlockDistribution);
for (HRegionInfo regionInfo : regions) {
when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn(
defaultFuture);
}
// block locality: region:0 => {server:0} // block locality: region:0 => {server:0}
// region:1 => {server:0, server:1} // region:1 => {server:0, server:1}
// region:42 => {server:4, server:9, server:5} // region:42 => {server:4, server:9, server:5}
HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution(); when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn(
ListenableFuture<HDFSBlocksDistribution> future0 = Futures Lists.newArrayList(servers.get(0)));
.immediateFuture(region0BlockDistribution); when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn(
when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn( Lists.newArrayList(servers.get(0), servers.get(1)));
future0); when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
when(locationFinder.getTopBlockLocations(region0BlockDistribution)) Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
.thenReturn(Lists.newArrayList(servers.get(0))); when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future1 = Futures
.immediateFuture(region1BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn(
future1);
when(locationFinder.getTopBlockLocations(region1BlockDistribution))
.thenReturn(Lists.newArrayList(servers.get(0), servers.get(1)));
HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future42 = Futures
.immediateFuture(region42BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn(
future42);
when(locationFinder.getTopBlockLocations(region42BlockDistribution))
.thenReturn(
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future43 = Futures
.immediateFuture(region43BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn(
future43);
// this server does not exists in clusterStatus
when(locationFinder.getTopBlockLocations(region43BlockDistribution))
.thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer; package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -121,8 +123,8 @@ public class TestRegionLocationFinder {
for (int i = 0; i < ServerNum; i++) { for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i); HRegionServer server = cluster.getRegionServer(i);
for (Region region : server.getOnlineRegions(tableName)) { for (Region region : server.getOnlineRegions(tableName)) {
List<ServerName> servers = finder.getTopBlockLocations(finder List<ServerName> servers = finder.getTopBlockLocations(region
.getBlockDistribution(region.getRegionInfo())); .getRegionInfo());
// test table may have empty region // test table may have empty region
if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) { if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
continue; continue;
@ -139,4 +141,24 @@ public class TestRegionLocationFinder {
} }
} }
} }
@Test
public void testRefreshAndWait() throws Exception {
finder.getCache().invalidateAll();
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
List<Region> regions = server.getOnlineRegions(tableName);
if (regions.size() <= 0) {
continue;
}
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>(regions.size());
for (Region region : regions) {
regionInfos.add(region.getRegionInfo());
}
finder.refreshAndWait(regionInfos);
for (HRegionInfo regionInfo : regionInfos) {
assertNotNull(finder.getCache().getIfPresent(regionInfo));
}
}
}
} }