HBASE-14473 Compute region locality in parallel

This commit is contained in:
Elliott Clark 2015-09-23 14:57:51 -04:00
parent 37877e3f56
commit dbbef06135
1 changed files with 94 additions and 25 deletions

View File

@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -35,13 +34,24 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* This will find where data for a region is located in HDFS. It ranks
@ -49,33 +59,58 @@ import com.google.common.collect.Lists;
* given region.
*
*/
@InterfaceAudience.Private
class RegionLocationFinder {
private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
private Configuration conf;
private volatile ClusterStatus status;
private MasterServices services;
private final ListeningExecutorService executor;
private long lastFullRefresh = 0;
private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
@Override
public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
return internalGetTopBlockLocation(key);
}
};
public ListenableFuture<HDFSBlocksDistribution> reload(final HRegionInfo hri,
HDFSBlocksDistribution oldValue) throws Exception {
return executor.submit(new Callable<HDFSBlocksDistribution>() {
@Override
public HDFSBlocksDistribution call() throws Exception {
return internalGetTopBlockLocation(hri);
}
});
}
@Override
public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
return internalGetTopBlockLocation(key);
}
};
// The cache for where regions are located.
private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
RegionLocationFinder() {
this.cache = createCache();
executor = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(
5,
new ThreadFactoryBuilder().
setDaemon(true)
.setNameFormat("region-location-%d")
.build()));
}
/**
* Create a cache for region to list of servers
* @param mins Number of mins to cache
* @param time time to cache the locations
* @return A new Cache.
*/
private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache(int mins) {
return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader);
private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache() {
return CacheBuilder.newBuilder()
.expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
.build(loader);
}
public Configuration getConf() {
@ -84,7 +119,6 @@ class RegionLocationFinder {
public void setConf(Configuration conf) {
this.conf = conf;
cache = createCache(conf.getInt("hbase.master.balancer.regionLocationCacheTime", 30));
}
public void setServices(MasterServices services) {
@ -92,7 +126,42 @@ class RegionLocationFinder {
}
public void setClusterStatus(ClusterStatus status) {
long currentTime = EnvironmentEdgeManager.currentTime();
this.status = status;
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
}
}
/**
* Refresh all the region locations.
*
* @return true if user created regions got refreshed.
*/
private boolean scheduleFullRefresh() {
// Protect from anything being null while starting up.
if (services == null) {
return false;
}
AssignmentManager am = services.getAssignmentManager();
if (am == null) {
return false;
}
RegionStates regionStates = am.getRegionStates();
if (regionStates == null) {
return false;
}
Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
boolean includesUserTables = false;
for (final HRegionInfo hri : regions) {
cache.refresh(hri);
includesUserTables = includesUserTables || !hri.isSystemTable();
}
return includesUserTables;
}
protected List<ServerName> getTopBlockLocations(HRegionInfo region) {