HBASE-18166 [AMv2] We are splitting already-split files v2 Address Stephen Jiang reivew comments

This commit is contained in:
Michael Stack 2017-06-05 21:46:08 -07:00
parent 8b36da1108
commit f64512bee1
6 changed files with 54 additions and 27 deletions

View File

@ -25,10 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -408,7 +405,6 @@ public class SplitTableRegionProcedure
/**
* Action before splitting region in a table.
* @param env MasterProcedureEnv
* @param state the procedure state
* @throws IOException
* @throws InterruptedException
*/
@ -484,12 +480,10 @@ public class SplitTableRegionProcedure
* @param env MasterProcedureEnv
* @throws IOException
*/
private Pair<Integer, Integer> splitStoreFiles(
final MasterProcedureEnv env,
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Configuration conf = env.getMasterConfiguration();
// The following code sets up a thread pool executor with as many slots as
// there's files to split. It then fires up everything, waits for
// completion and finally checks for any exception
@ -498,10 +492,29 @@ public class SplitTableRegionProcedure
// Nothing to unroll here if failure -- re-run createSplitsDir will
// clean this up.
int nbFiles = 0;
final Map<String, Collection<StoreFileInfo>> files =
new HashMap<String, Collection<StoreFileInfo>>(regionFs.getFamilies().size());
for (String family: regionFs.getFamilies()) {
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
if (storeFiles != null) {
nbFiles += storeFiles.size();
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
if (sfis == null) continue;
Collection<StoreFileInfo> filteredSfis = null;
for (StoreFileInfo sfi: sfis) {
// Filter. There is a lag cleaning up compacted reference files. They get cleared
// after a delay in case outstanding Scanners still have references. Because of this,
// the listing of the Store content may have straggler reference files. Skip these.
// It should be safe to skip references at this point because we checked above with
// the region if it thinks it is splittable and if we are here, it thinks it is
// splitable.
if (sfi.isReference()) {
LOG.info("Skipping split of " + sfi + "; presuming ready for archiving.");
continue;
}
if (filteredSfis == null) {
filteredSfis = new ArrayList<StoreFileInfo>(sfis.size());
files.put(family, filteredSfis);
}
filteredSfis.add(sfi);
nbFiles++;
}
}
if (nbFiles == 0) {
@ -513,17 +526,18 @@ public class SplitTableRegionProcedure
conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
nbFiles);
LOG.info("pid=" + getProcId() + " preparing to split " + nbFiles + " storefiles for region " +
getParentRegion().getShortNameToLog() + " using " + maxThreads + " threads");
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(
maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
final List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>>(nbFiles);
// Split each store file.
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
for (String family: regionFs.getFamilies()) {
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(family.getBytes());
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
for (Map.Entry<String, Collection<StoreFileInfo>>e: files.entrySet()) {
byte [] familyName = Bytes.toBytes(e.getKey());
final HColumnDescriptor hcd = htd.getFamily(familyName);
final Collection<StoreFileInfo> storeFiles = e.getValue();
if (storeFiles != null && storeFiles.size() > 0) {
final CacheConfig cacheConf = new CacheConfig(conf, hcd);
for (StoreFileInfo storeFileInfo: storeFiles) {
@ -570,8 +584,10 @@ public class SplitTableRegionProcedure
}
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " split storefiles for region " + getParentRegion().getShortNameToLog() +
" Daughter A: " + daughterA + " storefiles, Daughter B: " + daughterB + " storefiles.");
LOG.debug("pid=" + getProcId() + " split storefiles for region " +
getParentRegion().getShortNameToLog() +
" Daughter A: " + daughterA + " storefiles, Daughter B: " +
daughterB + " storefiles.");
}
return new Pair<Integer, Integer>(daughterA, daughterB);
}

View File

@ -1393,7 +1393,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public boolean isSplittable() {
boolean result = isAvailable() && !hasReferences();
LOG.info("ASKED IF SPLITTABLE " + result, new Throwable("LOGGING"));
LOG.info("ASKED IF SPLITTABLE " + result + " " + getRegionInfo().getShortNameToLog(),
new Throwable("LOGGING: REMOVE"));
// REMOVE BELOW!!!!
LOG.info("DEBUG LIST ALL FILES");
for (Store store: this.stores.values()) {
LOG.info("store " + store.getColumnFamilyName());
for (StoreFile sf: store.getStorefiles()) {
LOG.info(sf.toStringDetailed());
}
}
return result;
}

View File

@ -1444,7 +1444,7 @@ public class HStore implements Store {
StringBuilder message = new StringBuilder(
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
+ this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
+ this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
if (sfs.isEmpty()) {
message.append("none, ");
} else {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
/**
* Split size is the number of regions that are on this server that all are
@ -84,9 +85,10 @@ public class IncreasingToUpperBoundRegionSplitPolicy extends ConstantSizeRegionS
// Mark if any store is big enough
long size = store.getSize();
if (size > sizeToCheck) {
LOG.debug("ShouldSplit because " + store.getColumnFamilyName() + " size=" + size
+ ", sizeToCheck=" + sizeToCheck + ", regionsWithCommonTable="
+ tableRegionsCount);
LOG.debug("ShouldSplit because " + store.getColumnFamilyName() +
" size=" + StringUtils.humanSize(size) +
", sizeToCheck=" + StringUtils.humanSize(sizeToCheck) +
", regionsWithCommonTable=" + tableRegionsCount);
foundABigStore = true;
}
}

View File

@ -2807,8 +2807,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HRegionInfo hri = rsh.s.getRegionInfo();
// Yes, should be the same instance
if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
String msg = "Region was re-opened after the scanner" + scannerName + " was created: "
+ hri.getRegionNameAsString();
String msg = "Region has changed on the scanner " + scannerName + ": regionName="
+ hri.getRegionName() + ", scannerRegionName=" + rsh.r;
LOG.warn(msg + ", closing...");
scanners.remove(scannerName);
try {

View File

@ -451,8 +451,8 @@ public class StoreFileInfo {
// Tabledir is up two directories from where Reference was written.
Path tableDir = p.getParent().getParent().getParent();
String nameStrippedOfSuffix = m.group(1);
if (LOG.isDebugEnabled()) {
LOG.debug("reference '" + p + "' to region=" + otherRegion
if (LOG.isTraceEnabled()) {
LOG.trace("reference '" + p + "' to region=" + otherRegion
+ " hfile=" + nameStrippedOfSuffix);
}