HBASE-7516 Make compaction policy pluggable

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1440737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-01-30 23:53:42 +00:00
parent 27a27a05d4
commit e1118764de
12 changed files with 688 additions and 462 deletions

View File

@ -157,10 +157,12 @@ public class CompactionTool extends Configured implements Tool {
HStore store = getStore(region, familyDir); HStore store = getStore(region, familyDir);
do { do {
CompactionRequest cr = store.requestCompaction(); CompactionRequest cr = store.requestCompaction();
StoreFile storeFile = store.compact(cr); List<StoreFile> storeFiles = store.compact(cr);
if (storeFile != null) { if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) { if (keepCompactedFiles && deleteCompacted) {
fs.delete(storeFile.getPath(), false); for (StoreFile storeFile: storeFiles) {
fs.delete(storeFile.getPath(), false);
}
} }
} }
} while (store.needsCompaction() && !compactOnce); } while (store.needsCompaction() && !compactOnce);

View File

@ -915,7 +915,7 @@ public class HRegion implements HeapSize { // , Writable{
return isAvailable() && !hasReferences(); return isAvailable() && !hasReferences();
} }
boolean areWritesEnabled() { public boolean areWritesEnabled() {
synchronized(this.writestate) { synchronized(this.writestate) {
return this.writestate.writesEnabled; return this.writestate.writesEnabled;
} }

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -130,8 +131,8 @@ public class HStore implements Store, StoreConfiguration {
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads; private final boolean verifyBulkLoads;
// not private for testing private ScanInfo scanInfo;
/* package */ScanInfo scanInfo;
/* /*
* List of store files inside this store. This is an immutable list that * List of store files inside this store. This is an immutable list that
* is atomically replaced when its contents change. * is atomically replaced when its contents change.
@ -154,7 +155,7 @@ public class HStore implements Store, StoreConfiguration {
// Comparing KeyValues // Comparing KeyValues
final KeyValue.KVComparator comparator; final KeyValue.KVComparator comparator;
private final Compactor compactor; private Compactor compactor;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
private static int flush_retries_number; private static int flush_retries_number;
@ -227,10 +228,7 @@ public class HStore implements Store, StoreConfiguration {
this.checksumType = getChecksumType(conf); this.checksumType = getChecksumType(conf);
// initilize bytes per checksum // initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf); this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance
this.compactor = new Compactor(conf);
// Create a compaction manager. // Create a compaction manager.
this.compactionPolicy = new CompactionPolicy(conf, this);
if (HStore.flush_retries_number == 0) { if (HStore.flush_retries_number == 0) {
HStore.flush_retries_number = conf.getInt( HStore.flush_retries_number = conf.getInt(
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
@ -242,6 +240,9 @@ public class HStore implements Store, StoreConfiguration {
+ HStore.flush_retries_number); + HStore.flush_retries_number);
} }
} }
this.compactionPolicy = CompactionPolicy.create(this, conf);
// Get the compaction tool instance for this policy
this.compactor = compactionPolicy.getCompactor();
} }
/** /**
@ -288,7 +289,7 @@ public class HStore implements Store, StoreConfiguration {
return homedir; return homedir;
} }
FileSystem getFileSystem() { public FileSystem getFileSystem() {
return this.fs; return this.fs;
} }
@ -332,6 +333,13 @@ public class HStore implements Store, StoreConfiguration {
} }
} }
/**
* @return how many bytes to write between status checks
*/
public static int getCloseCheckInterval() {
return closeCheckInterval;
}
public HColumnDescriptor getFamily() { public HColumnDescriptor getFamily() {
return this.family; return this.family;
} }
@ -933,7 +941,7 @@ public class HStore implements Store, StoreConfiguration {
* @param isCompaction whether we are creating a new file in a compaction * @param isCompaction whether we are creating a new file in a compaction
* @return Writer for a new StoreFile in the tmp dir. * @return Writer for a new StoreFile in the tmp dir.
*/ */
StoreFile.Writer createWriterInTmp(int maxKeyCount, public StoreFile.Writer createWriterInTmp(int maxKeyCount,
Compression.Algorithm compression, boolean isCompaction) Compression.Algorithm compression, boolean isCompaction)
throws IOException { throws IOException {
final CacheConfig writerCacheConf; final CacheConfig writerCacheConf;
@ -1074,7 +1082,7 @@ public class HStore implements Store, StoreConfiguration {
* @throws IOException * @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early. * @return Storefile we compacted into or null if we failed or opted out early.
*/ */
StoreFile compact(CompactionRequest cr) throws IOException { List<StoreFile> compact(CompactionRequest cr) throws IOException {
if (cr == null || cr.getFiles().isEmpty()) return null; if (cr == null || cr.getFiles().isEmpty()) return null;
Preconditions.checkArgument(cr.getStore().toString().equals(this.toString())); Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
List<StoreFile> filesToCompact = cr.getFiles(); List<StoreFile> filesToCompact = cr.getFiles();
@ -1084,31 +1092,34 @@ public class HStore implements Store, StoreConfiguration {
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
} }
// Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of " + this.region.getRegionInfo().getRegionNameAsString() + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + " into tmpdir=" + region.getTmpDir() + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize())); + StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null; List<StoreFile> sfs = new ArrayList<StoreFile>();
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
StoreFile.Writer writer = List<Path> newFiles =
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); this.compactor.compact(filesToCompact, cr.isMajor());
// Move the compaction into place. // Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
sf = completeCompaction(filesToCompact, writer); for (Path newFile: newFiles) {
if (region.getCoprocessorHost() != null) { StoreFile sf = completeCompaction(filesToCompact, newFile);
region.getCoprocessorHost().postCompact(this, sf); if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
}
sfs.add(sf);
} }
} else { } else {
// Create storefile around what we wrote with a reader on it. for (Path newFile: newFiles) {
sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf, // Create storefile around what we wrote with a reader on it.
this.family.getBloomFilterType(), this.dataBlockEncoder); StoreFile sf = new StoreFile(this.fs, newFile, this.conf, this.cacheConf,
sf.createReader(); this.family.getBloomFilterType(), this.dataBlockEncoder);
sf.createReader();
sfs.add(sf);
}
} }
} finally { } finally {
synchronized (filesCompacting) { synchronized (filesCompacting) {
@ -1117,25 +1128,34 @@ public class HStore implements Store, StoreConfiguration {
} }
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " StringBuilder message = new StringBuilder(
+ filesToCompact.size() + " file(s) in " + this + " of " "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ this.region.getRegionInfo().getRegionNameAsString() + filesToCompact.size() + " file(s) in " + this + " of "
+ " into " + + this.region.getRegionInfo().getRegionNameAsString()
(sf == null ? "none" : sf.getPath().getName()) + + " into ");
", size=" + (sf == null ? "none" : if (sfs.isEmpty()) {
StringUtils.humanReadableInt(sf.getReader().length())) message.append("none, ");
+ "; total size for store is " + StringUtils.humanReadableInt(storeSize) } else {
+ ". This selection was in queue for " for (StoreFile sf: sfs) {
+ StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) message.append(sf.getPath().getName());
+ ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime) message.append("(size=");
+ " to execute."); message.append(StringUtils.humanReadableInt(sf.getReader().length()));
return sf; message.append("), ");
}
}
message.append("total size for store is ")
.append(StringUtils.humanReadableInt(storeSize))
.append(". This selection was in queue for ")
.append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
.append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
.append(" to execute.");
LOG.info(message.toString());
return sfs;
} }
@Override @Override
public void compactRecentForTesting(int N) throws IOException { public void compactRecentForTesting(int N) throws IOException {
List<StoreFile> filesToCompact; List<StoreFile> filesToCompact;
long maxId;
boolean isMajor; boolean isMajor;
this.lock.readLock().lock(); this.lock.readLock().lock();
@ -1156,7 +1176,6 @@ public class HStore implements Store, StoreConfiguration {
} }
filesToCompact = filesToCompact.subList(count - N, count); filesToCompact = filesToCompact.subList(count - N, count);
maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
isMajor = (filesToCompact.size() == storefiles.size()); isMajor = (filesToCompact.size() == storefiles.size());
filesCompacting.addAll(filesToCompact); filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
@ -1167,12 +1186,14 @@ public class HStore implements Store, StoreConfiguration {
try { try {
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
StoreFile.Writer writer = List<Path> newFiles =
this.compactor.compact(this, filesToCompact, isMajor, maxId); this.compactor.compact(filesToCompact, isMajor);
// Move the compaction into place. for (Path newFile: newFiles) {
StoreFile sf = completeCompaction(filesToCompact, writer); // Move the compaction into place.
if (region.getCoprocessorHost() != null) { StoreFile sf = completeCompaction(filesToCompact, newFile);
region.getCoprocessorHost().postCompact(this, sf); if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
}
} }
} finally { } finally {
synchronized (filesCompacting) { synchronized (filesCompacting) {
@ -1336,26 +1357,25 @@ public class HStore implements Store, StoreConfiguration {
* </pre> * </pre>
* *
* @param compactedFiles list of files that were compacted * @param compactedFiles list of files that were compacted
* @param compactedFile StoreFile that is the result of the compaction * @param newFile StoreFile that is the result of the compaction
* @return StoreFile created. May be null. * @return StoreFile created. May be null.
* @throws IOException * @throws IOException
*/ */
StoreFile completeCompaction(final Collection<StoreFile> compactedFiles, StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
final StoreFile.Writer compactedFile) final Path newFile)
throws IOException { throws IOException {
// 1. Moving the new files into place -- if there is a new file (may not // 1. Moving the new files into place -- if there is a new file (may not
// be if all cells were expired or deleted). // be if all cells were expired or deleted).
StoreFile result = null; StoreFile result = null;
if (compactedFile != null) { if (newFile != null) {
validateStoreFile(compactedFile.getPath()); validateStoreFile(newFile);
// Move the file into the right spot // Move the file into the right spot
Path origPath = compactedFile.getPath(); Path destPath = new Path(homedir, newFile.getName());
Path destPath = new Path(homedir, origPath.getName()); LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
LOG.info("Renaming compacted file at " + origPath + " to " + destPath); if (!fs.rename(newFile, destPath)) {
if (!fs.rename(origPath, destPath)) { LOG.error("Failed move of compacted file " + newFile + " to " +
LOG.error("Failed move of compacted file " + origPath + " to " +
destPath); destPath);
throw new IOException("Failed move of compacted file " + origPath + throw new IOException("Failed move of compacted file " + newFile +
" to " + destPath); " to " + destPath);
} }
result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf, result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
@ -1938,6 +1958,14 @@ public class HStore implements Store, StoreConfiguration {
return scanInfo; return scanInfo;
} }
/**
* Set scan info, used by test
* @param scanInfo new scan info to use for test
*/
void setScanInfo(ScanInfo scanInfo) {
this.scanInfo = scanInfo;
}
/** /**
* Immutable information for scans over a store. * Immutable information for scans over a store.
*/ */

View File

@ -1744,7 +1744,7 @@ public class StoreFile {
return reader.getTrailer().getMajorVersion(); return reader.getTrailer().getMajorVersion();
} }
HFile.Reader getHFileReader() { public HFile.Reader getHFileReader() {
return reader; return reader;
} }

View File

@ -19,391 +19,133 @@
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.base.Predicate; import java.io.IOException;
import com.google.common.collect.Collections2; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Random;
/** /**
* The default (and only, as of now) algorithm for selecting files for compaction. * A compaction policy determines how to select files for compaction,
* Combines the compaction configuration and the provisional file selection that * how to compact them, and how to generate the compacted files.
* it's given to produce the list of suitable candidates for compaction.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class CompactionPolicy { public abstract class CompactionPolicy extends Configured {
private static final Log LOG = LogFactory.getLog(CompactionPolicy.class); /**
private final static Calendar calendar = new GregorianCalendar(); * The name of the configuration parameter that specifies
* the class of a compaction policy that is used to compact
* HBase store files.
*/
public static final String COMPACTION_POLICY_KEY =
"hbase.hstore.compaction.policy";
private static final Class<? extends CompactionPolicy>
DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
CompactionConfiguration comConf; CompactionConfiguration comConf;
StoreConfiguration storeConfig; Compactor compactor;
HStore store;
public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
updateConfiguration(configuration, storeConfig);
}
/** /**
* @param candidateFiles candidate files, ordered from oldest to newest * @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria * @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException * @throws java.io.IOException
*/ */
public CompactSelection selectCompaction(List<StoreFile> candidateFiles, public abstract CompactSelection selectCompaction(
boolean isUserCompaction, boolean forceMajor) final List<StoreFile> candidateFiles, final boolean isUserCompaction,
throws IOException { final boolean forceMajor) throws IOException;
// Prelimanry compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
long cfTtl = this.storeConfig.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
CompactSelection expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return expiredSelection;
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection =
removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
return candidateSelection;
}
/** /**
* Updates the compaction configuration. Used for tests.
* TODO: replace when HBASE-3909 is completed in some form.
*/
public void updateConfiguration(Configuration configuration,
StoreConfiguration storeConfig) {
this.comConf = new CompactionConfiguration(configuration, storeConfig);
this.storeConfig = storeConfig;
}
/**
* Select the expired store files to compact
*
* @param candidates the initial set of storeFiles
* @param maxExpiredTimeStamp
* The store file will be marked as expired if its max time stamp is
* less than this maxExpiredTimeStamp.
* @return A CompactSelection contains the expired store files as
* filesToCompact
*/
private CompactSelection selectExpiredStoreFiles(
CompactSelection candidates, long maxExpiredTimeStamp) {
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
if (filesToCompact == null || filesToCompact.size() == 0)
return null;
ArrayList<StoreFile> expiredStoreFiles = null;
boolean hasExpiredStoreFiles = false;
CompactSelection expiredSFSelection = null;
for (StoreFile storeFile : filesToCompact) {
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
LOG.info("Deleting the expired store file by compaction: "
+ storeFile.getPath() + " whose maxTimeStamp is "
+ storeFile.getReader().getMaxTimestamp()
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
if (!hasExpiredStoreFiles) {
expiredStoreFiles = new ArrayList<StoreFile>();
hasExpiredStoreFiles = true;
}
expiredStoreFiles.add(storeFile);
}
}
if (hasExpiredStoreFiles) {
expiredSFSelection = new CompactSelection(expiredStoreFiles);
}
return expiredSFSelection;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
private CompactSelection skipLargeFiles(CompactSelection candidates) {
int pos = 0;
while (pos < candidates.getFilesToCompact().size() &&
candidates.getFilesToCompact().get(pos).getReader().length() >
comConf.getMaxCompactSize() &&
!candidates.getFilesToCompact().get(pos).isReference()) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.clearSubList(0, pos);
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all bulk load files if configured
*/
private CompactSelection filterBulk(CompactSelection candidates) {
candidates.getFilesToCompact().removeAll(Collections2.filter(
candidates.getFilesToCompact(),
new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* take upto maxFilesToCompact from the start
*/
private CompactSelection removeExcessFiles(CompactSelection candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
" files because of a user-requested major compaction");
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.clearSubList(comConf.getMaxFilesToCompact(),
candidates.getFilesToCompact().size());
}
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
int minFiles = comConf.getMinFilesToCompact();
if (candidates.getFilesToCompact().size() < minFiles) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
candidates.getFilesToCompact().size() +
" files ready for compaction. Need " + minFiles + " to initiate.");
}
candidates.emptyFileList();
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration.
* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
* <p/>
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*
* normal skew:
*
* older ----> newer (increasing seqID)
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
if (candidates.getFilesToCompact().isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double ratio = comConf.getCompactionRatio();
if (isOffPeakHour() && candidates.trySetOffpeak()) {
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+ ", numOutstandingOffPeakCompactions is now "
+ CompactSelection.getNumOutStandingOffPeakCompactions());
}
// get store file sizes for incremental compacting selection.
int countOfFiles = candidates.getFilesToCompact().size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start < countOfFiles) {
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ " files from " + countOfFiles + " candidates");
}
candidates = candidates.getSubList(start, countOfFiles);
return candidates;
}
/*
* @param filesToCompact Files to compact. Can be null. * @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction. * @return True if we should run a major compaction.
*/ */
public boolean isMajorCompaction(final List<StoreFile> filesToCompact) public abstract boolean isMajorCompaction(
throws IOException { final List<StoreFile> filesToCompact) throws IOException;
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
long cfTtl = this.storeConfig.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
: now - minTimestamp.longValue();
if (sf.isMajorCompaction() &&
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + cfTtl);
}
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = comConf.getMajorCompactionJitter();
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
if (seed != null) {
double rnd = (new Random(seed)).nextDouble();
ret += jitter - Math.round(2L * jitter * rnd);
} else {
ret = 0; // no storefiles == no major compaction
}
}
}
return ret;
}
/** /**
* @param compactionSize Total size of some compaction * @param compactionSize Total size of some compaction
* @return whether this should be a large or small compaction * @return whether this should be a large or small compaction
*/ */
public boolean throttleCompaction(long compactionSize) { public abstract boolean throttleCompaction(long compactionSize);
return compactionSize > comConf.getThrottlePoint();
}
/** /**
* @param numCandidates Number of candidate store files * @param numCandidates Number of candidate store files
* @return whether a compactionSelection is possible * @return whether a compactionSelection is possible
*/ */
public boolean needsCompaction(int numCandidates) { public abstract boolean needsCompaction(int numCandidates);
return numCandidates > comConf.getMinFilesToCompact();
/**
* Inform the policy that some configuration has been change,
* so cached value should be updated it any.
*/
public void updateConfiguration() {
if (getConf() != null && store != null) {
comConf = new CompactionConfiguration(getConf(), store);
}
} }
/** /**
* @return whether this is off-peak hour * Get the compactor for this policy
* @return the compactor for this policy
*/ */
private boolean isOffPeakHour() { public Compactor getCompactor() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY); return compactor;
int startHour = comConf.getOffPeakStartHour();
int endHour = comConf.getOffPeakEndHour();
// If offpeak time checking is disabled just return false.
if (startHour == endHour) {
return false;
}
if (startHour < endHour) {
return (currentHour >= startHour && currentHour < endHour);
}
return (currentHour >= startHour || currentHour < endHour);
} }
}
/**
* Set the new configuration
*/
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
updateConfiguration();
}
/**
* Upon construction, this method will be called with the HStore
* to be governed. It will be called once and only once.
*/
protected void configureForStore(HStore store) {
this.store = store;
updateConfiguration();
}
/**
* Create the CompactionPolicy configured for the given HStore.
* @param store
* @param conf
* @return a CompactionPolicy
* @throws IOException
*/
public static CompactionPolicy create(HStore store,
Configuration conf) throws IOException {
Class<? extends CompactionPolicy> clazz =
getCompactionPolicyClass(store.getFamily(), conf);
CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
policy.configureForStore(store);
return policy;
}
static Class<? extends CompactionPolicy> getCompactionPolicyClass(
HColumnDescriptor family, Configuration conf) throws IOException {
String className = conf.get(COMPACTION_POLICY_KEY,
DEFAULT_COMPACTION_POLICY_CLASS.getName());
try {
Class<? extends CompactionPolicy> clazz =
Class.forName(className).asSubclass(CompactionPolicy.class);
return clazz;
} catch (Exception e) {
throw new IOException(
"Unable to load configured region compaction policy '"
+ className + "' for column '" + family.getNameAsString()
+ "'", e);
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreFile;
/**
* A compactor is a compaction algorithm associated a given policy.
*/
@InterfaceAudience.Private
public abstract class Compactor {
CompactionProgress progress;
CompactionPolicy policy;
Compactor(final CompactionPolicy policy) {
this.policy = policy;
}
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and
* nothing made it through the compaction.
* @throws IOException
*/
public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction) throws IOException;
public Configuration getConf() {
return policy.getConf();
}
public CompactionProgress getProgress() {
return this.progress;
}
}

View File

@ -0,0 +1,393 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
/**
* The default algorithm for selecting files for compaction.
* Combines the compaction configuration and the provisional file selection that
* it's given to produce the list of suitable candidates for compaction.
*/
@InterfaceAudience.Private
public class DefaultCompactionPolicy extends CompactionPolicy {
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
private final static Calendar calendar = new GregorianCalendar();
public DefaultCompactionPolicy() {
compactor = new DefaultCompactor(this);
}
/**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
boolean isUserCompaction, boolean forceMajor)
throws IOException {
// Preliminary compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
long cfTtl = this.store.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
CompactSelection expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return expiredSelection;
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection =
removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
return candidateSelection;
}
/**
* Select the expired store files to compact
*
* @param candidates the initial set of storeFiles
* @param maxExpiredTimeStamp
* The store file will be marked as expired if its max time stamp is
* less than this maxExpiredTimeStamp.
* @return A CompactSelection contains the expired store files as
* filesToCompact
*/
private CompactSelection selectExpiredStoreFiles(
CompactSelection candidates, long maxExpiredTimeStamp) {
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
if (filesToCompact == null || filesToCompact.size() == 0)
return null;
ArrayList<StoreFile> expiredStoreFiles = null;
boolean hasExpiredStoreFiles = false;
CompactSelection expiredSFSelection = null;
for (StoreFile storeFile : filesToCompact) {
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
LOG.info("Deleting the expired store file by compaction: "
+ storeFile.getPath() + " whose maxTimeStamp is "
+ storeFile.getReader().getMaxTimestamp()
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
if (!hasExpiredStoreFiles) {
expiredStoreFiles = new ArrayList<StoreFile>();
hasExpiredStoreFiles = true;
}
expiredStoreFiles.add(storeFile);
}
}
if (hasExpiredStoreFiles) {
expiredSFSelection = new CompactSelection(expiredStoreFiles);
}
return expiredSFSelection;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
private CompactSelection skipLargeFiles(CompactSelection candidates) {
int pos = 0;
while (pos < candidates.getFilesToCompact().size() &&
candidates.getFilesToCompact().get(pos).getReader().length() >
comConf.getMaxCompactSize() &&
!candidates.getFilesToCompact().get(pos).isReference()) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.clearSubList(0, pos);
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all bulk load files if configured
*/
private CompactSelection filterBulk(CompactSelection candidates) {
candidates.getFilesToCompact().removeAll(Collections2.filter(
candidates.getFilesToCompact(),
new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* take upto maxFilesToCompact from the start
*/
private CompactSelection removeExcessFiles(CompactSelection candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
" files because of a user-requested major compaction");
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.clearSubList(comConf.getMaxFilesToCompact(),
candidates.getFilesToCompact().size());
}
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
int minFiles = comConf.getMinFilesToCompact();
if (candidates.getFilesToCompact().size() < minFiles) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
candidates.getFilesToCompact().size() +
" files ready for compaction. Need " + minFiles + " to initiate.");
}
candidates.emptyFileList();
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration.
* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
* <p/>
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*
* normal skew:
*
* older ----> newer (increasing seqID)
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
if (candidates.getFilesToCompact().isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double ratio = comConf.getCompactionRatio();
if (isOffPeakHour() && candidates.trySetOffpeak()) {
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+ ", numOutstandingOffPeakCompactions is now "
+ CompactSelection.getNumOutStandingOffPeakCompactions());
}
// get store file sizes for incremental compacting selection.
int countOfFiles = candidates.getFilesToCompact().size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start < countOfFiles) {
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ " files from " + countOfFiles + " candidates");
}
candidates = candidates.getSubList(start, countOfFiles);
return candidates;
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
long cfTtl = this.store.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
: now - minTimestamp.longValue();
if (sf.isMajorCompaction() &&
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + cfTtl);
}
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = comConf.getMajorCompactionJitter();
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
if (seed != null) {
double rnd = (new Random(seed)).nextDouble();
ret += jitter - Math.round(2L * jitter * rnd);
} else {
ret = 0; // no storefiles == no major compaction
}
}
}
return ret;
}
/**
* @param compactionSize Total size of some compaction
* @return whether this should be a large or small compaction
*/
public boolean throttleCompaction(long compactionSize) {
return compactionSize > comConf.getThrottlePoint();
}
/**
* @param numCandidates Number of candidate store files
* @return whether a compactionSelection is possible
*/
public boolean needsCompaction(int numCandidates) {
return numCandidates > comConf.getMinFilesToCompact();
}
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
int startHour = comConf.getOffPeakStartHour();
int endHour = comConf.getOffPeakEndHour();
// If offpeak time checking is disabled just return false.
if (startHour == endHour) {
return false;
}
if (startHour < endHour) {
return (currentHour >= startHour && currentHour < endHour);
}
return (currentHour >= startHour || currentHour < endHour);
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -26,47 +26,52 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
* Compact passed set of files. * Compact passed set of files.
* Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}. * Create an instance and then call {@ink #compact(Collection, boolean, long)}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class Compactor extends Configured { class DefaultCompactor extends Compactor {
private static final Log LOG = LogFactory.getLog(Compactor.class); private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
private CompactionProgress progress;
Compactor(final Configuration c) { DefaultCompactor(final CompactionPolicy policy) {
super(c); super(policy);
} }
/** /**
* Do a minor/major compaction on an explicit set of storefiles from a Store. * Do a minor/major compaction on an explicit set of storefiles from a Store.
* *
* @param store Store the files belong to
* @param filesToCompact which files to compact * @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc) * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @param maxId Readers maximum sequence id. * @return Product of compaction or an empty list if all cells expired or deleted and
* @return Product of compaction or null if all cells expired or deleted and
* nothing made it through the compaction. * nothing made it through the compaction.
* @throws IOException * @throws IOException
*/ */
StoreFile.Writer compact(final HStore store, @SuppressWarnings("deprecation")
final Collection<StoreFile> filesToCompact, public List<Path> compact(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId) final boolean majorCompaction) throws IOException {
throws IOException { // Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
// Calculate maximum key count after compaction (for blooms) // Calculate maximum key count after compaction (for blooms)
// Also calculate earliest put timestamp if major compaction // Also calculate earliest put timestamp if major compaction
int maxKeyCount = 0; int maxKeyCount = 0;
HStore store = policy.store;
long earliestPutTs = HConstants.LATEST_TIMESTAMP; long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file: filesToCompact) { for (StoreFile file: filesToCompact) {
StoreFile.Reader r = file.getReader(); StoreFile.Reader r = file.getReader();
@ -120,6 +125,7 @@ class Compactor extends Configured {
// Make the instantiation lazy in case compaction produces no product; i.e. // Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted. // where all source cells are expired or deleted.
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
// Find the smallest read point across all the Scanners. // Find the smallest read point across all the Scanners.
long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
@ -138,7 +144,7 @@ class Compactor extends Configured {
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions()); scan.setMaxVersions(store.getFamily().getMaxVersions());
/* Include deletes, unless we are doing a major compaction */ /* Include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(store, store.scanInfo, scan, scanners, scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs); scanType, smallestReadPoint, earliestPutTs);
} }
if (store.getHRegion().getCoprocessorHost() != null) { if (store.getHRegion().getCoprocessorHost() != null) {
@ -146,7 +152,7 @@ class Compactor extends Configured {
store.getHRegion().getCoprocessorHost().preCompact(store, scanner, scanType); store.getHRegion().getCoprocessorHost().preCompact(store, scanner, scanType);
// NULL scanner returned from coprocessor hooks means skip normal processing // NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) { if (cpScanner == null) {
return null; return newFiles; // an empty list
} }
scanner = cpScanner; scanner = cpScanner;
} }
@ -156,6 +162,7 @@ class Compactor extends Configured {
// we have to use a do/while loop. // we have to use a do/while loop.
List<KeyValue> kvs = new ArrayList<KeyValue>(); List<KeyValue> kvs = new ArrayList<KeyValue>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval();
boolean hasMore; boolean hasMore;
do { do {
hasMore = scanner.next(kvs, compactionKVMax); hasMore = scanner.next(kvs, compactionKVMax);
@ -176,9 +183,9 @@ class Compactor extends Configured {
++progress.currentCompactedKVs; ++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested // check periodically to see if a system stop is requested
if (HStore.closeCheckInterval > 0) { if (closeCheckInterval > 0) {
bytesWritten += kv.getLength(); bytesWritten += kv.getLength();
if (bytesWritten > HStore.closeCheckInterval) { if (bytesWritten > closeCheckInterval) {
bytesWritten = 0; bytesWritten = 0;
isInterrupted(store, writer); isInterrupted(store, writer);
} }
@ -196,9 +203,10 @@ class Compactor extends Configured {
if (writer != null) { if (writer != null) {
writer.appendMetadata(maxId, majorCompaction); writer.appendMetadata(maxId, majorCompaction);
writer.close(); writer.close();
newFiles.add(writer.getPath());
} }
} }
return writer; return newFiles;
} }
void isInterrupted(final HStore store, final StoreFile.Writer writer) void isInterrupted(final HStore store, final StoreFile.Writer writer)
@ -210,8 +218,4 @@ class Compactor extends Configured {
throw new InterruptedIOException( "Aborting compaction of store " + store + throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getHRegion() + " because it was interrupted."); " in region " + store.getHRegion() + " because it was interrupted.");
} }
CompactionProgress getProgress() {
return this.progress;
}
} }

View File

@ -407,7 +407,7 @@ public class HFileReadWriteTest {
Scan scan = new Scan(); Scan scan = new Scan();
// Include deletes // Include deletes
scanner = new StoreScanner(store, store.scanInfo, scan, scanners, scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE); ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE);
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(); ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();

View File

@ -280,11 +280,11 @@ public class TestCompaction extends HBaseTestCase {
final int ttl = 1000; final int ttl = 1000;
for (Store hstore : this.r.stores.values()) { for (Store hstore : this.r.stores.values()) {
HStore store = ((HStore) hstore); HStore store = ((HStore) hstore);
HStore.ScanInfo old = store.scanInfo; HStore.ScanInfo old = store.getScanInfo();
HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(), HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl, old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator()); old.getKeepDeletedCells(), 0, old.getComparator());
store.scanInfo = si; store.setScanInfo(si);
} }
Thread.sleep(1000); Thread.sleep(1000);
@ -301,7 +301,7 @@ public class TestCompaction extends HBaseTestCase {
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
s.compactionPolicy.updateConfiguration(conf, s); s.compactionPolicy.setConf(conf);
try { try {
createStoreFile(r); createStoreFile(r);
createStoreFile(r); createStoreFile(r);
@ -313,7 +313,7 @@ public class TestCompaction extends HBaseTestCase {
assertEquals(2, s.getStorefilesCount()); assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic // ensure that major compaction time is deterministic
CompactionPolicy c = s.compactionPolicy; DefaultCompactionPolicy c = (DefaultCompactionPolicy)s.compactionPolicy;
List<StoreFile> storeFiles = s.getStorefiles(); List<StoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles); long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
@ -539,11 +539,11 @@ public class TestCompaction extends HBaseTestCase {
final int ttl = 1000; final int ttl = 1000;
for (Store hstore: this.r.stores.values()) { for (Store hstore: this.r.stores.values()) {
HStore store = (HStore)hstore; HStore store = (HStore)hstore;
HStore.ScanInfo old = store.scanInfo; HStore.ScanInfo old = store.getScanInfo();
HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(), HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl, old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator()); old.getKeepDeletedCells(), 0, old.getComparator());
store.scanInfo = si; store.setScanInfo(si);
} }
Thread.sleep(ttl); Thread.sleep(ttl);
@ -588,15 +588,15 @@ public class TestCompaction extends HBaseTestCase {
HStore store = (HStore) r.getStore(COLUMN_FAMILY); HStore store = (HStore) r.getStore(COLUMN_FAMILY);
List<StoreFile> storeFiles = store.getStorefiles(); List<StoreFile> storeFiles = store.getStorefiles();
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true); Compactor tool = store.compactionPolicy.getCompactor();
Compactor tool = new Compactor(this.conf);
StoreFile.Writer compactedFile = List<Path> newFiles =
tool.compact(store, storeFiles, false, maxId); tool.compact(storeFiles, false);
// Now lets corrupt the compacted file. // Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
Path origPath = compactedFile.getPath(); // default compaction policy created one and only one new compacted file
Path origPath = newFiles.get(0);
Path homedir = store.getHomedir(); Path homedir = store.getHomedir();
Path dstPath = new Path(homedir, origPath.getName()); Path dstPath = new Path(homedir, origPath.getName());
FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3, FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3,
@ -606,7 +606,7 @@ public class TestCompaction extends HBaseTestCase {
stream.close(); stream.close();
try { try {
store.completeCompaction(storeFiles, compactedFile); store.completeCompaction(storeFiles, origPath);
} catch (Exception e) { } catch (Exception e) {
// The complete compaction should fail and the corrupt file should remain // The complete compaction should fail and the corrupt file should remain
// in the 'tmp' directory; // in the 'tmp' directory;

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -58,7 +57,6 @@ public class TestDefaultCompactSelection extends TestCase {
private static final String DIR= private static final String DIR=
TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString(); TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString();
private static Path TEST_FILE; private static Path TEST_FILE;
private CompactionPolicy manager;
protected static final int minFiles = 3; protected static final int minFiles = 3;
protected static final int maxFiles = 5; protected static final int maxFiles = 5;
@ -84,7 +82,6 @@ public class TestDefaultCompactSelection extends TestCase {
Path basedir = new Path(DIR); Path basedir = new Path(DIR);
String logName = "logs"; String logName = "logs";
Path logdir = new Path(DIR, logName); Path logdir = new Path(DIR, logName);
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family")); HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
@ -102,7 +99,6 @@ public class TestDefaultCompactSelection extends TestCase {
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HStore(basedir, region, hcd, fs, conf); store = new HStore(basedir, region, hcd, fs, conf);
manager = store.compactionPolicy;
TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir()); TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
fs.create(TEST_FILE); fs.create(TEST_FILE);
@ -282,7 +278,7 @@ public class TestDefaultCompactSelection extends TestCase {
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12); compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1); conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0); conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
store.compactionPolicy.updateConfiguration(conf, store); store.compactionPolicy.updateConfiguration();
try { try {
// trigger an aged major compaction // trigger an aged major compaction
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12); compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
@ -313,7 +309,7 @@ public class TestDefaultCompactSelection extends TestCase {
* current compaction algorithm. Developed to ensure that refactoring * current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this. * doesn't implicitly alter this.
*/ */
long tooBig = maxSize + 1; //long tooBig = maxSize + 1;
Calendar calendar = new GregorianCalendar(); Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY); int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
@ -334,13 +330,13 @@ public class TestDefaultCompactSelection extends TestCase {
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne); this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" + LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")"); hourMinusOne + ", " + hourPlusOne + ")");
store.compactionPolicy.updateConfiguration(this.conf, store); store.compactionPolicy.updateConfiguration();
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1); compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection // set peak hour outside current selection and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo); this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne); this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
store.compactionPolicy.updateConfiguration(this.conf, store); store.compactionPolicy.updateConfiguration();
LOG.debug("Testing compact selection with off-peak settings (" + LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")"); hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1); compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
@ -203,7 +202,7 @@ public class TestStore extends TestCase {
hcd.setTimeToLive(ttl); hcd.setTimeToLive(ttl);
init(getName(), conf, hcd); init(getName(), conf, hcd);
long sleepTime = this.store.scanInfo.getTtl() / storeFileNum; long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
long timeStamp; long timeStamp;
// There are 4 store files and the max time stamp difference among these // There are 4 store files and the max time stamp difference among these
// store files will be (this.store.ttl / storeFileNum) // store files will be (this.store.ttl / storeFileNum)
@ -229,11 +228,12 @@ public class TestStore extends TestCase {
// If not the first compaction, there is another empty store file, // If not the first compaction, there is another empty store file,
assertEquals(Math.min(i, 2), cr.getFiles().size()); assertEquals(Math.min(i, 2), cr.getFiles().size());
for (int j = 0; i < cr.getFiles().size(); j++) { for (int j = 0; i < cr.getFiles().size(); j++) {
assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (System
(EnvironmentEdgeManager.currentTimeMillis() - this.store.scanInfo.getTtl())); .currentTimeMillis() - this.store.getScanInfo().getTtl()));
} }
// Verify that the expired store file is compacted to an empty store file. // Verify that the expired store file is compacted to an empty store file.
StoreFile compactedFile = this.store.compact(cr); // Default compaction policy creates just one and only one compacted file.
StoreFile compactedFile = this.store.compact(cr).get(0);
// It is an empty store file. // It is an empty store file.
assertEquals(0, compactedFile.getReader().getEntries()); assertEquals(0, compactedFile.getReader().getEntries());