HBASE-20214 Review of RegionLocationFinder Class - revert due to the pending removal of commons-collections4 dependency

This commit is contained in:
tedyu 2018-03-19 16:52:27 -07:00
parent 45586ab300
commit 2a3f4a0a4e
1 changed files with 55 additions and 32 deletions

View File

@ -21,18 +21,12 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@ -47,10 +41,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
@ -138,6 +132,7 @@ class RegionLocationFinder {
// Only count the refresh if it includes user tables ( eg more than meta and namespace ). // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh; lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
} }
} }
/** /**
@ -176,10 +171,14 @@ class RegionLocationFinder {
*/ */
protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) { protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) {
HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
List<String> topHosts = blocksDistribution.getTopHosts(); List<String> topHosts = new ArrayList<>();
int toIndex = topHosts.indexOf(currentHost); for (String host : blocksDistribution.getTopHosts()) {
List<String> subTopHosts = (toIndex < 0) ? topHosts : topHosts.subList(0, toIndex); if (host.equals(currentHost)) {
return mapHostNameToServerName(subTopHosts); break;
}
topHosts.add(host);
}
return mapHostNameToServerName(topHosts);
} }
/** /**
@ -212,7 +211,7 @@ class RegionLocationFinder {
* *
* @param tableName the table name * @param tableName the table name
* @return TableDescriptor * @return TableDescriptor
* @throws IOException if table descriptor cannot be loaded * @throws IOException
*/ */
protected TableDescriptor getTableDescriptor(TableName tableName) throws IOException { protected TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
TableDescriptor tableDescriptor = null; TableDescriptor tableDescriptor = null;
@ -221,8 +220,8 @@ class RegionLocationFinder {
tableDescriptor = this.services.getTableDescriptors().get(tableName); tableDescriptor = this.services.getTableDescriptors().get(tableName);
} }
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
LOG.debug("FileNotFoundException during getTableDescriptors. Current table name = {}", LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
tableName, fnfe); + tableName, fnfe);
} }
return tableDescriptor; return tableDescriptor;
@ -236,36 +235,60 @@ class RegionLocationFinder {
* @return ServerName list * @return ServerName list
*/ */
protected List<ServerName> mapHostNameToServerName(List<String> hosts) { protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
if (hosts == null || status == null) {
if (hosts == null) { if (hosts == null) {
LOG.warn("RegionLocationFinder top hosts is null"); LOG.warn("RegionLocationFinder top hosts is null");
return Collections.emptyList();
} }
if (status == null) { return Lists.newArrayList();
return Collections.emptyList();
} }
List<ServerName> topServerNames = new ArrayList<>(); List<ServerName> topServerNames = new ArrayList<>();
Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet(); Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet();
// create a mapping from hostname to ServerName for fast lookup // create a mapping from hostname to ServerName for fast lookup
MultiValuedMap<String, ServerName> hostToServerName = new ArrayListValuedHashMap<>(); HashMap<String, List<ServerName>> hostToServerName = new HashMap<>();
for (ServerName sn : regionServers) { for (ServerName sn : regionServers) {
String hostName = sn.getHostname(); String host = sn.getHostname();
hostToServerName.put(hostName, sn); if (!hostToServerName.containsKey(host)) {
hostToServerName.put(host, new ArrayList<>());
}
hostToServerName.get(host).add(sn);
} }
for (String host : hosts) { for (String host : hosts) {
if (!hostToServerName.containsKey(host)) {
continue;
}
for (ServerName sn : hostToServerName.get(host)) { for (ServerName sn : hostToServerName.get(host)) {
// it is possible that HDFS is up ( thus host is valid ), // it is possible that HDFS is up ( thus host is valid ),
// but RS is down ( thus sn is null ) // but RS is down ( thus sn is null )
CollectionUtils.addIgnoreNull(topServerNames, sn); if (sn != null) {
topServerNames.add(sn);
}
} }
} }
return topServerNames; return topServerNames;
} }
public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) { public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) {
return cache.getUnchecked(hri); HDFSBlocksDistribution blockDistbn = null;
try {
if (cache.asMap().containsKey(hri)) {
blockDistbn = cache.get(hri);
return blockDistbn;
} else {
LOG.debug("HDFSBlocksDistribution not found in cache for region "
+ hri.getRegionNameAsString());
blockDistbn = internalGetTopBlockLocation(hri);
cache.put(hri, blockDistbn);
return blockDistbn;
}
} catch (ExecutionException e) {
LOG.warn("Error while fetching cache entry ", e);
blockDistbn = internalGetTopBlockLocation(hri);
cache.put(hri, blockDistbn);
return blockDistbn;
}
} }
private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution( private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
@ -278,24 +301,24 @@ class RegionLocationFinder {
} }
public void refreshAndWait(Collection<RegionInfo> hris) { public void refreshAndWait(Collection<RegionInfo> hris) {
Map<RegionInfo, ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = new ArrayList<>(hris.size());
new HashMap<>(hris.size() * 2);
for (RegionInfo hregionInfo : hris) { for (RegionInfo hregionInfo : hris) {
regionLocationFutures.put(hregionInfo, asyncGetBlockDistribution(hregionInfo)); regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
} }
int index = 0;
for (RegionInfo hregionInfo : hris) { for (RegionInfo hregionInfo : hris) {
ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
.get(hregionInfo); .get(index);
try { try {
cache.put(hregionInfo, future.get()); cache.put(hregionInfo, future.get());
} catch (InterruptedException ite) { } catch (InterruptedException ite) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
LOG.debug( LOG.debug(
"ExecutionException during HDFSBlocksDistribution computation. for region = {}", "ExecutionException during HDFSBlocksDistribution computation. for region = "
hregionInfo.getEncodedName(), ee); + hregionInfo.getEncodedName(), ee);
} }
index++;
} }
} }