HBASE-23379 Clean Up FSUtil getRegionLocalityMappingFromFS

Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
This commit is contained in:
belugabehr 2019-12-10 15:56:53 -05:00 committed by Jan Hentschel
parent a580b1d2e9
commit c39339c004
1 changed files with 25 additions and 41 deletions

View File

@ -39,7 +39,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -1637,8 +1636,7 @@ public abstract class FSUtils extends CommonFSUtils {
final Configuration conf, final String desiredTable, int threadPoolSize)
throws IOException {
Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
regionDegreeLocalityMapping);
getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
return regionDegreeLocalityMapping;
}
@ -1654,24 +1652,19 @@ public abstract class FSUtils extends CommonFSUtils {
* the table you wish to scan locality for
* @param threadPoolSize
* the thread pool size to use
* @param regionToBestLocalityRSMapping
* the map into which to put the best locality mapping or null
* @param regionDegreeLocalityMapping
* the map into which to put the locality degree mapping or null,
* must be a thread-safe implementation
* @throws IOException
* in case of file system errors or interrupts
*/
private static void getRegionLocalityMappingFromFS(
final Configuration conf, final String desiredTable,
int threadPoolSize,
Map<String, String> regionToBestLocalityRSMapping,
Map<String, Map<String, Float>> regionDegreeLocalityMapping)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path rootPath = FSUtils.getRootDir(conf);
long startTime = EnvironmentEdgeManager.currentTime();
Path queryPath;
private static void getRegionLocalityMappingFromFS(final Configuration conf,
final String desiredTable, int threadPoolSize,
final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
final FileSystem fs = FileSystem.get(conf);
final Path rootPath = FSUtils.getRootDir(conf);
final long startTime = EnvironmentEdgeManager.currentTime();
final Path queryPath;
// The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
if (null == desiredTable) {
queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
@ -1708,44 +1701,36 @@ public abstract class FSUtils extends CommonFSUtils {
FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
if (LOG.isDebugEnabled()) {
LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
}
if (null == statusList) {
return;
} else {
LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
statusList.length);
}
// lower the number of threads in case we have very few expected regions
threadPoolSize = Math.min(threadPoolSize, statusList.length);
// run in multiple threads
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(statusList.length),
final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
Threads.newDaemonThreadFactory("FSRegionQuery"));
try {
// ignore all file status items that are not of interest
for (FileStatus regionStatus : statusList) {
if (null == regionStatus) {
if (null == regionStatus || !regionStatus.isDirectory()) {
continue;
}
if (!regionStatus.isDirectory()) {
continue;
final Path regionPath = regionStatus.getPath();
if (null != regionPath) {
tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
}
Path regionPath = regionStatus.getPath();
if (null == regionPath) {
continue;
}
tpe.execute(new FSRegionScanner(fs, regionPath,
regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
}
} finally {
tpe.shutdown();
int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
60 * 1000);
final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
try {
// here we wait until TPE terminates, which is either naturally or by
// exceptions in the execution of the threads
@ -1754,18 +1739,17 @@ public abstract class FSUtils extends CommonFSUtils {
// printing out rough estimate, so as to not introduce
// AtomicInteger
LOG.info("Locality checking is underway: { Scanned Regions : "
+ tpe.getCompletedTaskCount() + "/"
+ tpe.getTaskCount() + " }");
+ ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
+ ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
Thread.currentThread().interrupt();
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}
long overhead = EnvironmentEdgeManager.currentTime() - startTime;
String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
LOG.info(overheadMsg);
LOG.info("Scan DFS for locality info takes {}ms", overhead);
}
/**