HBASE-16570 Compute region locality in parallel at startup (binlijin)
This commit is contained in:
parent
52963b3428
commit
b4086795f2
@ -33,6 +33,7 @@ import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -57,6 +58,7 @@ import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* The base class for load balancers. It provides the the functions used to by
|
||||
@ -115,6 +117,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||
HRegionInfo[] regions;
|
||||
Deque<RegionLoad>[] regionLoads;
|
||||
private RegionLocationFinder regionFinder;
|
||||
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures;
|
||||
|
||||
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
|
||||
|
||||
@ -236,6 +239,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||
regionIndexToTableIndex = new int[numRegions];
|
||||
regionIndexToPrimaryIndex = new int[numRegions];
|
||||
regionLoads = new Deque[numRegions];
|
||||
regionLocationFutures = new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(
|
||||
numRegions);
|
||||
if (regionFinder != null) {
|
||||
for (int i = 0; i < numRegions; i++) {
|
||||
regionLocationFutures.add(null);
|
||||
}
|
||||
}
|
||||
regionLocations = new int[numRegions][];
|
||||
serverIndicesSortedByRegionCount = new Integer[numServers];
|
||||
serverIndicesSortedByLocality = new Integer[numServers];
|
||||
@ -305,6 +315,33 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||
regionIndex++;
|
||||
}
|
||||
|
||||
if (regionFinder != null) {
|
||||
for (int index = 0; index < regionLocationFutures.size(); index++) {
|
||||
ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
|
||||
.get(index);
|
||||
HDFSBlocksDistribution blockDistbn = null;
|
||||
try {
|
||||
blockDistbn = future.get();
|
||||
} catch (InterruptedException ite) {
|
||||
} catch (ExecutionException ee) {
|
||||
LOG.debug(
|
||||
"IOException during HDFSBlocksDistribution computation. for region = "
|
||||
+ regions[index].getEncodedName(), ee);
|
||||
} finally {
|
||||
if (blockDistbn == null) {
|
||||
blockDistbn = new HDFSBlocksDistribution();
|
||||
}
|
||||
}
|
||||
List<ServerName> loc = regionFinder.getTopBlockLocations(blockDistbn);
|
||||
regionLocations[index] = new int[loc.size()];
|
||||
for (int i = 0; i < loc.size(); i++) {
|
||||
regionLocations[index][i] = loc.get(i) == null ? -1
|
||||
: (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
|
||||
: serversToIndex.get(loc.get(i).getHostAndPort()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < serversPerHostList.size(); i++) {
|
||||
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
@ -452,15 +489,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||
}
|
||||
|
||||
if (regionFinder != null) {
|
||||
//region location
|
||||
List<ServerName> loc = regionFinder.getTopBlockLocations(region);
|
||||
regionLocations[regionIndex] = new int[loc.size()];
|
||||
for (int i=0; i < loc.size(); i++) {
|
||||
regionLocations[regionIndex][i] =
|
||||
loc.get(i) == null ? -1 :
|
||||
(serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
|
||||
: serversToIndex.get(loc.get(i).getHostAndPort()));
|
||||
}
|
||||
// region location
|
||||
regionLocationFutures.set(regionIndex,
|
||||
regionFinder.asyncGetBlockDistribution(region));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit;
|
||||
class RegionLocationFinder {
|
||||
private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
|
||||
private static final long CACHE_TIME = 240 * 60 * 1000;
|
||||
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
|
||||
private Configuration conf;
|
||||
private volatile ClusterStatus status;
|
||||
private MasterServices services;
|
||||
@ -164,8 +167,8 @@ class RegionLocationFinder {
|
||||
return includesUserTables;
|
||||
}
|
||||
|
||||
protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
|
||||
HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
|
||||
protected List<ServerName> getTopBlockLocations(
|
||||
HDFSBlocksDistribution blocksDistribution) {
|
||||
List<String> topHosts = blocksDistribution.getTopHosts();
|
||||
return mapHostNameToServerName(topHosts);
|
||||
}
|
||||
@ -208,7 +211,7 @@ class RegionLocationFinder {
|
||||
+ region.getEncodedName(), ioe);
|
||||
}
|
||||
|
||||
return new HDFSBlocksDistribution();
|
||||
return EMPTY_BLOCK_DISTRIBUTION;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -295,4 +298,13 @@ class RegionLocationFinder {
|
||||
return blockDistbn;
|
||||
}
|
||||
}
|
||||
|
||||
public ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
|
||||
HRegionInfo hri) {
|
||||
try {
|
||||
return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
|
||||
} catch (Exception e) {
|
||||
return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
@ -55,6 +56,8 @@ import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestBaseLoadBalancer extends BalancerTestBase {
|
||||
@ -456,17 +459,49 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
||||
|
||||
// mock block locality for some regions
|
||||
RegionLocationFinder locationFinder = mock(RegionLocationFinder.class);
|
||||
HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution();
|
||||
ListenableFuture<HDFSBlocksDistribution> defaultFuture = Futures
|
||||
.immediateFuture(emptyBlockDistribution);
|
||||
for (HRegionInfo regionInfo : regions) {
|
||||
when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn(
|
||||
defaultFuture);
|
||||
}
|
||||
// block locality: region:0 => {server:0}
|
||||
// region:1 => {server:0, server:1}
|
||||
// region:42 => {server:4, server:9, server:5}
|
||||
when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn(
|
||||
Lists.newArrayList(servers.get(0)));
|
||||
when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn(
|
||||
Lists.newArrayList(servers.get(0), servers.get(1)));
|
||||
when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
|
||||
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
|
||||
when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
|
||||
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
|
||||
HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution();
|
||||
ListenableFuture<HDFSBlocksDistribution> future0 = Futures
|
||||
.immediateFuture(region0BlockDistribution);
|
||||
when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn(
|
||||
future0);
|
||||
when(locationFinder.getTopBlockLocations(region0BlockDistribution))
|
||||
.thenReturn(Lists.newArrayList(servers.get(0)));
|
||||
|
||||
HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution();
|
||||
ListenableFuture<HDFSBlocksDistribution> future1 = Futures
|
||||
.immediateFuture(region1BlockDistribution);
|
||||
when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn(
|
||||
future1);
|
||||
when(locationFinder.getTopBlockLocations(region1BlockDistribution))
|
||||
.thenReturn(Lists.newArrayList(servers.get(0), servers.get(1)));
|
||||
|
||||
HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution();
|
||||
ListenableFuture<HDFSBlocksDistribution> future42 = Futures
|
||||
.immediateFuture(region42BlockDistribution);
|
||||
when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn(
|
||||
future42);
|
||||
when(locationFinder.getTopBlockLocations(region42BlockDistribution))
|
||||
.thenReturn(
|
||||
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
|
||||
|
||||
HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution();
|
||||
ListenableFuture<HDFSBlocksDistribution> future43 = Futures
|
||||
.immediateFuture(region43BlockDistribution);
|
||||
when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn(
|
||||
future43);
|
||||
// this server does not exists in clusterStatus
|
||||
when(locationFinder.getTopBlockLocations(region43BlockDistribution))
|
||||
.thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
|
||||
|
||||
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);
|
||||
|
||||
|
@ -124,7 +124,8 @@ public class TestRegionLocationFinder {
|
||||
for (int i = 0; i < ServerNum; i++) {
|
||||
HRegionServer server = cluster.getRegionServer(i);
|
||||
for (Region region : server.getOnlineRegions(tableName)) {
|
||||
List<ServerName> servers = finder.getTopBlockLocations(region.getRegionInfo());
|
||||
List<ServerName> servers = finder.getTopBlockLocations(finder
|
||||
.getBlockDistribution(region.getRegionInfo()));
|
||||
// test table may have empty region
|
||||
if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
|
||||
continue;
|
||||
|
Loading…
x
Reference in New Issue
Block a user