HBASE-16570 Compute region locality in parallel at startup (binlijin)

This commit is contained in:
chenheng 2016-09-09 10:54:48 +08:00
parent 46c756a4a7
commit e11aafae95
4 changed files with 100 additions and 21 deletions

View File

@ -33,6 +33,7 @@ import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
@ -59,6 +60,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
/**
* The base class for load balancers. It provides the the functions used to by
@ -117,6 +119,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads;
private RegionLocationFinder regionFinder;
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@ -238,6 +241,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToTableIndex = new int[numRegions];
regionIndexToPrimaryIndex = new int[numRegions];
regionLoads = new Deque[numRegions];
regionLocationFutures = new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(
numRegions);
if (regionFinder != null) {
for (int i = 0; i < numRegions; i++) {
regionLocationFutures.add(null);
}
}
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
serverIndicesSortedByLocality = new Integer[numServers];
@ -307,6 +317,33 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndex++;
}
if (regionFinder != null) {
for (int index = 0; index < regionLocationFutures.size(); index++) {
ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
.get(index);
HDFSBlocksDistribution blockDistbn = null;
try {
blockDistbn = future.get();
} catch (InterruptedException ite) {
} catch (ExecutionException ee) {
LOG.debug(
"IOException during HDFSBlocksDistribution computation. for region = "
+ regions[index].getEncodedName(), ee);
} finally {
if (blockDistbn == null) {
blockDistbn = new HDFSBlocksDistribution();
}
}
List<ServerName> loc = regionFinder.getTopBlockLocations(blockDistbn);
regionLocations[index] = new int[loc.size()];
for (int i = 0; i < loc.size(); i++) {
regionLocations[index][i] = loc.get(i) == null ? -1
: (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
: serversToIndex.get(loc.get(i).getHostAndPort()));
}
}
}
for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
@ -454,15 +491,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
if (regionFinder != null) {
//region location
List<ServerName> loc = regionFinder.getTopBlockLocations(region);
regionLocations[regionIndex] = new int[loc.size()];
for (int i=0; i < loc.size(); i++) {
regionLocations[regionIndex][i] =
loc.get(i) == null ? -1 :
(serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
: serversToIndex.get(loc.get(i).getHostAndPort()));
}
// region location
regionLocationFutures.set(regionIndex,
regionFinder.asyncGetBlockDistribution(region));
}
}

View File

@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit;
class RegionLocationFinder {
private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
private Configuration conf;
private volatile ClusterStatus status;
private MasterServices services;
@ -164,8 +167,8 @@ class RegionLocationFinder {
return includesUserTables;
}
protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
protected List<ServerName> getTopBlockLocations(
HDFSBlocksDistribution blocksDistribution) {
List<String> topHosts = blocksDistribution.getTopHosts();
return mapHostNameToServerName(topHosts);
}
@ -208,7 +211,7 @@ class RegionLocationFinder {
+ region.getEncodedName(), ioe);
}
return new HDFSBlocksDistribution();
return EMPTY_BLOCK_DISTRIBUTION;
}
/**
@ -295,4 +298,13 @@ class RegionLocationFinder {
return blockDistbn;
}
}
public ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
HRegionInfo hri) {
try {
return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
} catch (Exception e) {
return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
}
}
}

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -56,6 +57,8 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@Category({MasterTests.class, MediumTests.class})
public class TestBaseLoadBalancer extends BalancerTestBase {
@ -448,17 +451,49 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// mock block locality for some regions
RegionLocationFinder locationFinder = mock(RegionLocationFinder.class);
HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> defaultFuture = Futures
.immediateFuture(emptyBlockDistribution);
for (HRegionInfo regionInfo : regions) {
when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn(
defaultFuture);
}
// block locality: region:0 => {server:0}
// region:1 => {server:0, server:1}
// region:42 => {server:4, server:9, server:5}
when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn(
Lists.newArrayList(servers.get(0)));
when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn(
Lists.newArrayList(servers.get(0), servers.get(1)));
when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future0 = Futures
.immediateFuture(region0BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn(
future0);
when(locationFinder.getTopBlockLocations(region0BlockDistribution))
.thenReturn(Lists.newArrayList(servers.get(0)));
HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future1 = Futures
.immediateFuture(region1BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn(
future1);
when(locationFinder.getTopBlockLocations(region1BlockDistribution))
.thenReturn(Lists.newArrayList(servers.get(0), servers.get(1)));
HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future42 = Futures
.immediateFuture(region42BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn(
future42);
when(locationFinder.getTopBlockLocations(region42BlockDistribution))
.thenReturn(
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution();
ListenableFuture<HDFSBlocksDistribution> future43 = Futures
.immediateFuture(region43BlockDistribution);
when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn(
future43);
// this server does not exists in clusterStatus
when(locationFinder.getTopBlockLocations(region43BlockDistribution))
.thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);

View File

@ -121,7 +121,8 @@ public class TestRegionLocationFinder {
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
for (Region region : server.getOnlineRegions(tableName)) {
List<ServerName> servers = finder.getTopBlockLocations(region.getRegionInfo());
List<ServerName> servers = finder.getTopBlockLocations(finder
.getBlockDistribution(region.getRegionInfo()));
// test table may have empty region
if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
continue;