HBASE-16398 optimize HRegion computeHDFSBlocksDistribution

This commit is contained in:
binlijin 2016-12-16 11:02:27 +08:00
parent 35f0718a41
commit 6acbee179f
3 changed files with 116 additions and 22 deletions

View File

@ -81,6 +81,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@ -130,6 +131,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.HFile;
@ -189,7 +191,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@ -1161,13 +1162,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
for (HColumnDescriptor family : tableDescriptor.getFamilies()) {
Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
if (storeFiles == null) continue;
for (StoreFileInfo storeFileInfo : storeFiles) {
try {
hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
} catch (IOException ioe) {
LOG.warn("Error getting hdfs block distribution for " + storeFileInfo);
List<LocatedFileStatus> locatedFileStatusList = HRegionFileSystem
.getStoreFilesLocatedStatus(regionFs, family.getNameAsString(), true);
if (locatedFileStatusList == null) {
continue;
}
for (LocatedFileStatus status : locatedFileStatusList) {
Path p = status.getPath();
if (StoreFileInfo.isReference(p) || HFileLink.isHFileLink(p)) {
// Only construct StoreFileInfo object if its not a hfile, save obj
// creation
StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, status);
hdfsBlocksDistribution.add(storeFileInfo
.computeHDFSBlocksDistribution(fs));
} else if (StoreFileInfo.isHFile(p)) {
// If its a HFile, then lets just add to the block distribution
// lets not create more objects here, not even another HDFSBlocksDistribution
FSUtils.addToHDFSBlocksDistribution(hdfsBlocksDistribution,
status.getBlockLocations());
} else {
throw new IOException("path=" + p
+ " doesn't look like a valid StoreFile");
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.Lists;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -30,24 +32,23 @@ import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes;
@ -220,6 +221,36 @@ public class HRegionFileSystem {
return storeFiles;
}
/**
* Returns the store files' LocatedFileStatus which available for the family.
* This methods performs the filtering based on the valid store files.
* @param familyName Column Family Name
* @return a list of store files' LocatedFileStatus for the specified family.
*/
public static List<LocatedFileStatus> getStoreFilesLocatedStatus(
final HRegionFileSystem regionfs, final String familyName,
final boolean validate) throws IOException {
Path familyDir = regionfs.getStoreDir(familyName);
List<LocatedFileStatus> locatedFileStatuses = FSUtils.listLocatedStatus(
regionfs.getFileSystem(), familyDir);
if (locatedFileStatuses == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("No StoreFiles for: " + familyDir);
}
return null;
}
List<LocatedFileStatus> validStoreFiles = Lists.newArrayList();
for (LocatedFileStatus status : locatedFileStatuses) {
if (validate && !StoreFileInfo.isValid(status)) {
LOG.warn("Invalid StoreFile: " + status.getPath());
} else {
validStoreFiles.add(status);
}
}
return validStoreFiles;
}
/**
* Return Qualified Path of the specified family/file
*

View File

@ -18,6 +18,13 @@
*/
package org.apache.hadoop.hbase.util;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.CheckForNull;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
@ -52,7 +59,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@ -60,8 +66,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.ClusterId;
@ -70,18 +78,20 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
@ -89,12 +99,6 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.CheckForNull;
/**
* Utility methods for interacting with the underlying file system.
*/
@ -1064,6 +1068,21 @@ public abstract class FSUtils {
return blocksDistribution;
}
/**
* Update blocksDistribution with blockLocations
* @param blocksDistribution the hdfs blocks distribution
* @param blockLocations an array containing block location
*/
static public void addToHDFSBlocksDistribution(
HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
throws IOException {
for (BlockLocation bl : blockLocations) {
String[] hosts = bl.getHosts();
long len = bl.getLength();
blocksDistribution.addHostsAndBlockWeight(hosts, len);
}
}
// TODO move this method OUT of FSUtils. No dependencies to HMaster
/**
* Returns the total overall fragmentation percentage. Includes hbase:meta and
@ -1856,6 +1875,34 @@ public abstract class FSUtils {
return listStatus(fs, dir, null);
}
/**
* Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
*
* @param fs file system
* @param dir directory
* @return LocatedFileStatus list
*/
public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
final Path dir) throws IOException {
List<LocatedFileStatus> status = null;
try {
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
.listFiles(dir, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
if (status == null) {
status = Lists.newArrayList();
}
status.add(locatedFileStatusRemoteIterator.next());
}
} catch (FileNotFoundException fnfe) {
// if directory doesn't exist, return null
if (LOG.isTraceEnabled()) {
LOG.trace(dir + " doesn't exist");
}
}
return status;
}
/**
* Calls fs.delete() and returns the value returned by the fs.delete()
*