HBASE-18166 [AMv2] We are splitting already-split files v2 Address Stephen Jiang reivew comments
This commit is contained in:
parent
299850ea70
commit
c02a142143
|
@ -25,10 +25,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -408,7 +405,6 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Action before splitting region in a table.
|
||||
* @param env MasterProcedureEnv
|
||||
* @param state the procedure state
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
|
@ -484,12 +480,10 @@ public class SplitTableRegionProcedure
|
|||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private Pair<Integer, Integer> splitStoreFiles(
|
||||
final MasterProcedureEnv env,
|
||||
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
|
||||
final HRegionFileSystem regionFs) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final Configuration conf = env.getMasterConfiguration();
|
||||
|
||||
// The following code sets up a thread pool executor with as many slots as
|
||||
// there's files to split. It then fires up everything, waits for
|
||||
// completion and finally checks for any exception
|
||||
|
@ -498,10 +492,29 @@ public class SplitTableRegionProcedure
|
|||
// Nothing to unroll here if failure -- re-run createSplitsDir will
|
||||
// clean this up.
|
||||
int nbFiles = 0;
|
||||
final Map<String, Collection<StoreFileInfo>> files =
|
||||
new HashMap<String, Collection<StoreFileInfo>>(regionFs.getFamilies().size());
|
||||
for (String family: regionFs.getFamilies()) {
|
||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||
if (storeFiles != null) {
|
||||
nbFiles += storeFiles.size();
|
||||
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
|
||||
if (sfis == null) continue;
|
||||
Collection<StoreFileInfo> filteredSfis = null;
|
||||
for (StoreFileInfo sfi: sfis) {
|
||||
// Filter. There is a lag cleaning up compacted reference files. They get cleared
|
||||
// after a delay in case outstanding Scanners still have references. Because of this,
|
||||
// the listing of the Store content may have straggler reference files. Skip these.
|
||||
// It should be safe to skip references at this point because we checked above with
|
||||
// the region if it thinks it is splittable and if we are here, it thinks it is
|
||||
// splitable.
|
||||
if (sfi.isReference()) {
|
||||
LOG.info("Skipping split of " + sfi + "; presuming ready for archiving.");
|
||||
continue;
|
||||
}
|
||||
if (filteredSfis == null) {
|
||||
filteredSfis = new ArrayList<StoreFileInfo>(sfis.size());
|
||||
files.put(family, filteredSfis);
|
||||
}
|
||||
filteredSfis.add(sfi);
|
||||
nbFiles++;
|
||||
}
|
||||
}
|
||||
if (nbFiles == 0) {
|
||||
|
@ -513,17 +526,18 @@ public class SplitTableRegionProcedure
|
|||
conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
|
||||
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
|
||||
nbFiles);
|
||||
LOG.info("pid=" + getProcId() + " preparing to split " + nbFiles + " storefiles for region " +
|
||||
getParentRegion().getShortNameToLog() + " using " + maxThreads + " threads");
|
||||
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
||||
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
||||
final ExecutorService threadPool = Executors.newFixedThreadPool(
|
||||
maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
|
||||
final List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>>(nbFiles);
|
||||
|
||||
// Split each store file.
|
||||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||
for (String family: regionFs.getFamilies()) {
|
||||
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(family.getBytes());
|
||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||
for (Map.Entry<String, Collection<StoreFileInfo>>e: files.entrySet()) {
|
||||
byte [] familyName = Bytes.toBytes(e.getKey());
|
||||
final HColumnDescriptor hcd = htd.getFamily(familyName);
|
||||
final Collection<StoreFileInfo> storeFiles = e.getValue();
|
||||
if (storeFiles != null && storeFiles.size() > 0) {
|
||||
final CacheConfig cacheConf = new CacheConfig(conf, hcd);
|
||||
for (StoreFileInfo storeFileInfo: storeFiles) {
|
||||
|
@ -570,8 +584,10 @@ public class SplitTableRegionProcedure
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("pid=" + getProcId() + " split storefiles for region " + getParentRegion().getShortNameToLog() +
|
||||
" Daughter A: " + daughterA + " storefiles, Daughter B: " + daughterB + " storefiles.");
|
||||
LOG.debug("pid=" + getProcId() + " split storefiles for region " +
|
||||
getParentRegion().getShortNameToLog() +
|
||||
" Daughter A: " + daughterA + " storefiles, Daughter B: " +
|
||||
daughterB + " storefiles.");
|
||||
}
|
||||
return new Pair<Integer, Integer>(daughterA, daughterB);
|
||||
}
|
||||
|
|
|
@ -1393,7 +1393,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
@Override
|
||||
public boolean isSplittable() {
|
||||
boolean result = isAvailable() && !hasReferences();
|
||||
LOG.info("ASKED IF SPLITTABLE " + result, new Throwable("LOGGING"));
|
||||
LOG.info("ASKED IF SPLITTABLE " + result + " " + getRegionInfo().getShortNameToLog(),
|
||||
new Throwable("LOGGING: REMOVE"));
|
||||
// REMOVE BELOW!!!!
|
||||
LOG.info("DEBUG LIST ALL FILES");
|
||||
for (Store store: this.stores.values()) {
|
||||
LOG.info("store " + store.getColumnFamilyName());
|
||||
for (StoreFile sf: store.getStorefiles()) {
|
||||
LOG.info(sf.toStringDetailed());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -1443,7 +1443,7 @@ public class HStore implements Store {
|
|||
StringBuilder message = new StringBuilder(
|
||||
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
|
||||
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
|
||||
+ this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
|
||||
+ this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
|
||||
if (sfs.isEmpty()) {
|
||||
message.append("none, ");
|
||||
} else {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Split size is the number of regions that are on this server that all are
|
||||
|
@ -84,9 +85,10 @@ public class IncreasingToUpperBoundRegionSplitPolicy extends ConstantSizeRegionS
|
|||
// Mark if any store is big enough
|
||||
long size = store.getSize();
|
||||
if (size > sizeToCheck) {
|
||||
LOG.debug("ShouldSplit because " + store.getColumnFamilyName() + " size=" + size
|
||||
+ ", sizeToCheck=" + sizeToCheck + ", regionsWithCommonTable="
|
||||
+ tableRegionsCount);
|
||||
LOG.debug("ShouldSplit because " + store.getColumnFamilyName() +
|
||||
" size=" + StringUtils.humanSize(size) +
|
||||
", sizeToCheck=" + StringUtils.humanSize(sizeToCheck) +
|
||||
", regionsWithCommonTable=" + tableRegionsCount);
|
||||
foundABigStore = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2807,8 +2807,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
HRegionInfo hri = rsh.s.getRegionInfo();
|
||||
// Yes, should be the same instance
|
||||
if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
|
||||
String msg = "Region was re-opened after the scanner" + scannerName + " was created: "
|
||||
+ hri.getRegionNameAsString();
|
||||
String msg = "Region has changed on the scanner " + scannerName + ": regionName="
|
||||
+ hri.getRegionName() + ", scannerRegionName=" + rsh.r;
|
||||
LOG.warn(msg + ", closing...");
|
||||
scanners.remove(scannerName);
|
||||
try {
|
||||
|
|
|
@ -451,8 +451,8 @@ public class StoreFileInfo {
|
|||
// Tabledir is up two directories from where Reference was written.
|
||||
Path tableDir = p.getParent().getParent().getParent();
|
||||
String nameStrippedOfSuffix = m.group(1);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reference '" + p + "' to region=" + otherRegion
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("reference '" + p + "' to region=" + otherRegion
|
||||
+ " hfile=" + nameStrippedOfSuffix);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue