HBASE-16398 optimize HRegion computeHDFSBlocksDistribution
This commit is contained in:
parent
f3a3069796
commit
ed39396497
|
@ -71,6 +71,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
|
@ -120,6 +121,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
|||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.FilterWrapper;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
|
@ -1149,14 +1151,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
FileSystem fs = tablePath.getFileSystem(conf);
|
||||
|
||||
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
|
||||
for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
|
||||
Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
|
||||
if (storeFiles == null) continue;
|
||||
for (StoreFileInfo storeFileInfo : storeFiles) {
|
||||
try {
|
||||
hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Error getting hdfs block distribution for " + storeFileInfo);
|
||||
for (HColumnDescriptor family : tableDescriptor.getFamilies()) {
|
||||
List<LocatedFileStatus> locatedFileStatusList = HRegionFileSystem
|
||||
.getStoreFilesLocatedStatus(regionFs, family.getNameAsString(), true);
|
||||
if (locatedFileStatusList == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (LocatedFileStatus status : locatedFileStatusList) {
|
||||
Path p = status.getPath();
|
||||
if (StoreFileInfo.isReference(p) || HFileLink.isHFileLink(p)) {
|
||||
// Only construct StoreFileInfo object if its not a hfile, save obj
|
||||
// creation
|
||||
StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, status);
|
||||
hdfsBlocksDistribution.add(storeFileInfo
|
||||
.computeHDFSBlocksDistribution(fs));
|
||||
} else if (StoreFileInfo.isHFile(p)) {
|
||||
// If its a HFile, then lets just add to the block distribution
|
||||
// lets not create more objects here, not even another HDFSBlocksDistribution
|
||||
FSUtils.addToHDFSBlocksDistribution(hdfsBlocksDistribution,
|
||||
status.getBlockLocations());
|
||||
} else {
|
||||
throw new IOException("path=" + p
|
||||
+ " doesn't look like a valid StoreFile");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
@ -37,6 +39,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -218,6 +221,36 @@ public class HRegionFileSystem {
|
|||
return storeFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the store files' LocatedFileStatus which available for the family.
|
||||
* This methods performs the filtering based on the valid store files.
|
||||
* @param familyName Column Family Name
|
||||
* @return a list of store files' LocatedFileStatus for the specified family.
|
||||
*/
|
||||
public static List<LocatedFileStatus> getStoreFilesLocatedStatus(
|
||||
final HRegionFileSystem regionfs, final String familyName,
|
||||
final boolean validate) throws IOException {
|
||||
Path familyDir = regionfs.getStoreDir(familyName);
|
||||
List<LocatedFileStatus> locatedFileStatuses = FSUtils.listLocatedStatus(
|
||||
regionfs.getFileSystem(), familyDir);
|
||||
if (locatedFileStatuses == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("No StoreFiles for: " + familyDir);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
List<LocatedFileStatus> validStoreFiles = Lists.newArrayList();
|
||||
for (LocatedFileStatus status : locatedFileStatuses) {
|
||||
if (validate && !StoreFileInfo.isValid(status)) {
|
||||
LOG.warn("Invalid StoreFile: " + status.getPath());
|
||||
} else {
|
||||
validStoreFiles.add(status);
|
||||
}
|
||||
}
|
||||
return validStoreFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return Qualified Path of the specified family/file
|
||||
*
|
||||
|
|
|
@ -18,6 +18,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.CheckForNull;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.EOFException;
|
||||
|
@ -52,7 +59,6 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
|
@ -60,8 +66,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.ClusterId;
|
||||
|
@ -71,6 +79,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
|
@ -89,12 +98,6 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.CheckForNull;
|
||||
|
||||
/**
|
||||
* Utility methods for interacting with the underlying file system.
|
||||
*/
|
||||
|
@ -1063,6 +1066,21 @@ public abstract class FSUtils {
|
|||
return blocksDistribution;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update blocksDistribution with blockLocations
|
||||
* @param blocksDistribution the hdfs blocks distribution
|
||||
* @param blockLocations an array containing block location
|
||||
*/
|
||||
static public void addToHDFSBlocksDistribution(
|
||||
HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
|
||||
throws IOException {
|
||||
for (BlockLocation bl : blockLocations) {
|
||||
String[] hosts = bl.getHosts();
|
||||
long len = bl.getLength();
|
||||
blocksDistribution.addHostsAndBlockWeight(hosts, len);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO move this method OUT of FSUtils. No dependencies to HMaster
|
||||
/**
|
||||
* Returns the total overall fragmentation percentage. Includes hbase:meta and
|
||||
|
@ -1887,6 +1905,34 @@ public abstract class FSUtils {
|
|||
return listStatus(fs, dir, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @return LocatedFileStatus list
|
||||
*/
|
||||
public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
|
||||
final Path dir) throws IOException {
|
||||
List<LocatedFileStatus> status = null;
|
||||
try {
|
||||
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
|
||||
.listFiles(dir, false);
|
||||
while (locatedFileStatusRemoteIterator.hasNext()) {
|
||||
if (status == null) {
|
||||
status = Lists.newArrayList();
|
||||
}
|
||||
status.add(locatedFileStatusRemoteIterator.next());
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// if directory doesn't exist, return null
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(dir + " doesn't exist");
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.delete() and returns the value returned by the fs.delete()
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue