HBASE-5616 Make compaction code standalone

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1304616 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-03-23 21:37:01 +00:00
parent 358dc6e6e6
commit 5a9f595e7d
9 changed files with 461 additions and 232 deletions

View File

@ -628,7 +628,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
// Log size // Log size
long totalSize = heapSize(); long totalSize = heapSize();
long freeSize = maxSize - totalSize; long freeSize = maxSize - totalSize;
LruBlockCache.LOG.debug("LRU Stats: " + LruBlockCache.LOG.debug("Stats: " +
"total=" + StringUtils.byteDesc(totalSize) + ", " + "total=" + StringUtils.byteDesc(totalSize) + ", " +
"free=" + StringUtils.byteDesc(freeSize) + ", " + "free=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
@ -636,11 +636,11 @@ public class LruBlockCache implements BlockCache, HeapSize {
"accesses=" + stats.getRequestCount() + ", " + "accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " + "hits=" + stats.getHitCount() + ", " +
"hitRatio=" + "hitRatio=" +
(stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " + "cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" + "cachingHitsRatio=" +
(stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " +
"evictions=" + stats.getEvictionCount() + ", " + "evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " +
"evictedPerRun=" + stats.evictedPerEviction()); "evictedPerRun=" + stats.evictedPerEviction());

View File

@ -0,0 +1,189 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
/**
* Compact passed set of files.
* Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}.
*/
@InterfaceAudience.Private
class Compactor extends Configured {
private static final Log LOG = LogFactory.getLog(Compactor.class);
private CompactionProgress progress;
Compactor(final Configuration c) {
super(c);
}
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*
* @param store Store the files belong to
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @param maxId Readers maximum sequence id.
* @return Product of compaction or null if all cells expired or deleted and
* nothing made it through the compaction.
* @throws IOException
*/
StoreFile.Writer compact(final Store store,
final Collection<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
// Calculate maximum key count after compaction (for blooms)
// Also calculate earliest put timestamp if major compaction
int maxKeyCount = 0;
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file: filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r == null) {
LOG.warn("Null reader for " + file.getPath());
continue;
}
// NOTE: getFilterEntries could cause under-sized blooms if the user
// switches bloom type (e.g. from ROW to ROWCOL)
long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())?
r.getFilterEntries() : r.getEntries();
maxKeyCount += keyCount;
// For major compactions calculate the earliest put timestamp of all
// involved storefiles. This is used to remove family delete marker during
// compaction.
if (majorCompaction) {
byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// There's a file with no information, must be an old one
// assume we have very old puts
earliestPutTs = HConstants.OLDEST_TIMESTAMP;
} else {
earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
", bloomtype=" + r.getBloomFilterType().toString() +
", size=" + StringUtils.humanReadableInt(r.length()) +
", encoding=" + r.getHFileReader().getEncodingOnDisk() +
(majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
}
}
// keep track of compaction progress
this.progress = new CompactionProgress(maxKeyCount);
// For each file, obtain a scanner:
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(filesToCompact, false, false, true);
// Get some configs
int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
Compression.Algorithm compression = store.getFamily().getCompression();
// Avoid overriding compression setting for major compactions if the user
// has not specified it separately
Compression.Algorithm compactionCompression =
(store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
store.getFamily().getCompactionCompression(): compression;
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
try {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
/* Include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(store, scan, scanners,
majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
smallestReadPoint, earliestPutTs);
if (store.getHRegion().getCoprocessorHost() != null) {
InternalScanner cpScanner =
store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return null;
}
scanner = cpScanner;
}
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<KeyValue> kvs = new ArrayList<KeyValue>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (writer == null && !kvs.isEmpty()) {
writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
}
if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
if (kv.getMemstoreTS() <= smallestReadPoint) {
kv.setMemstoreTS(0);
}
writer.append(kv);
// update progress per key
++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested
if (Store.closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > Store.closeCheckInterval) {
bytesWritten = 0;
isInterrupted(store, writer);
}
}
}
}
kvs.clear();
} while (hasMore);
} finally {
if (scanner != null) {
scanner.close();
}
}
} finally {
if (writer != null) {
writer.appendMetadata(maxId, majorCompaction);
writer.close();
}
}
return writer;
}
void isInterrupted(final Store store, final StoreFile.Writer writer)
throws IOException {
if (store.getHRegion().areWritesEnabled()) return;
// Else cleanup.
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getHRegion() + " because user requested stop.");
}
CompactionProgress getProgress() {
return this.progress;
}
}

View File

@ -1029,19 +1029,16 @@ public class HRegion implements HeapSize { // , Writable{
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
} }
private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) { final String threadNamePrefix) {
ThreadPoolExecutor openAndCloseThreadPool = Threads return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
new ThreadFactory() { private int count = 1;
private int count = 1;
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++); return new Thread(r, threadNamePrefix + "-" + count++);
return t; }
} });
});
return openAndCloseThreadPool;
} }
/** /**
@ -5235,11 +5232,11 @@ public class HRegion implements HeapSize { // , Writable{
final HLog log = new HLog(fs, logdir, oldLogDir, c); final HLog log = new HLog(fs, logdir, oldLogDir, c);
try { try {
processTable(fs, tableDir, log, c, majorCompact); processTable(fs, tableDir, log, c, majorCompact);
} finally { } finally {
log.close(); log.close();
// TODO: is this still right? // TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache(); BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown(); if (bc != null) bc.shutdown();
} }
} }
} }

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -132,9 +131,6 @@ public class Store extends SchemaConfigured implements HeapSize {
private volatile long totalUncompressedBytes = 0L; private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object(); private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final String storeNameStr;
private CompactionProgress progress;
private final int compactionKVMax;
private final boolean verifyBulkLoads; private final boolean verifyBulkLoads;
// not private for testing // not private for testing
@ -152,10 +148,6 @@ public class Store extends SchemaConfigured implements HeapSize {
new CopyOnWriteArraySet<ChangedReadersObserver>(); new CopyOnWriteArraySet<ChangedReadersObserver>();
private final int blocksize; private final int blocksize;
/** Compression algorithm for flush files and minor compaction */
private final Compression.Algorithm compression;
/** Compression algorithm for major compaction */
private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder; private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */ /** Checksum configuration */
@ -165,6 +157,8 @@ public class Store extends SchemaConfigured implements HeapSize {
// Comparing KeyValues // Comparing KeyValues
final KeyValue.KVComparator comparator; final KeyValue.KVComparator comparator;
private final Compactor compactor;
/** /**
* Constructor * Constructor
* @param basedir qualified path under which the region directory lives; * @param basedir qualified path under which the region directory lives;
@ -179,52 +173,37 @@ public class Store extends SchemaConfigured implements HeapSize {
protected Store(Path basedir, HRegion region, HColumnDescriptor family, protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration conf) FileSystem fs, Configuration conf)
throws IOException { throws IOException {
super(conf, region.getTableDesc().getNameAsString(), super(conf, region.getRegionInfo().getTableNameAsString(),
Bytes.toString(family.getName())); Bytes.toString(family.getName()));
HRegionInfo info = region.regionInfo; HRegionInfo info = region.getRegionInfo();
this.fs = fs; this.fs = fs;
this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); // Assemble the store's home directory.
if (!this.fs.exists(this.homedir)) { Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
if (!this.fs.mkdirs(this.homedir)) // Ensure it exists.
throw new IOException("Failed create of: " + this.homedir.toString()); this.homedir = createStoreHomeDir(this.fs, p);
}
this.region = region; this.region = region;
this.family = family; this.family = family;
this.conf = conf; this.conf = conf;
this.blocksize = family.getBlocksize(); this.blocksize = family.getBlocksize();
this.compression = family.getCompression();
// avoid overriding compression setting for major compactions if the user
// has not specified it separately
this.compactionCompression =
(family.getCompactionCompression() != Compression.Algorithm.NONE) ?
family.getCompactionCompression() : this.compression;
this.dataBlockEncoder = this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(), new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
family.getDataBlockEncoding()); family.getDataBlockEncoding());
this.comparator = info.getComparator(); this.comparator = info.getComparator();
// getTimeToLive returns ttl in seconds. Convert to milliseconds. // Get TTL
this.ttl = family.getTimeToLive(); this.ttl = getTTL(family);
if (ttl == HConstants.FOREVER) {
// default is unlimited ttl.
ttl = Long.MAX_VALUE;
} else if (ttl == -1) {
ttl = Long.MAX_VALUE;
} else {
// second -> ms adjust for user data
this.ttl *= 1000;
}
// used by ScanQueryMatcher // used by ScanQueryMatcher
long timeToPurgeDeletes = long timeToPurgeDeletes =
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
LOG.info("time to purge deletes set to " + timeToPurgeDeletes + LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
"ms in store " + this); "ms in store " + this);
// Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it?
scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
family.getMaxVersions(), ttl, family.getKeepDeletedCells(), family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
timeToPurgeDeletes, this.comparator); timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator); this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = getColumnFamilyName();
// By default, compact if storefile.count >= minFilesToCompact // By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2, this.minFilesToCompact = Math.max(2,
@ -241,10 +220,8 @@ public class Store extends SchemaConfigured implements HeapSize {
this.region.memstoreFlushSize); this.region.memstoreFlushSize);
this.maxCompactSize this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
false);
if (Store.closeCheckInterval == 0) { if (Store.closeCheckInterval == 0) {
Store.closeCheckInterval = conf.getInt( Store.closeCheckInterval = conf.getInt(
@ -256,6 +233,47 @@ public class Store extends SchemaConfigured implements HeapSize {
this.checksumType = getChecksumType(conf); this.checksumType = getChecksumType(conf);
// initilize bytes per checksum // initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf); this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance
this.compactor = new Compactor(this.conf);
}
/**
* @param family
* @return
*/
long getTTL(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
// Default is unlimited ttl.
ttl = Long.MAX_VALUE;
} else if (ttl == -1) {
ttl = Long.MAX_VALUE;
} else {
// Second -> ms adjust for user data
ttl *= 1000;
}
return ttl;
}
/**
* Create this store's homedir
* @param fs
* @param homedir
* @return Return <code>homedir</code>
* @throws IOException
*/
Path createStoreHomeDir(final FileSystem fs,
final Path homedir) throws IOException {
if (!fs.exists(homedir)) {
if (!fs.mkdirs(homedir))
throw new IOException("Failed create of: " + homedir.toString());
}
return homedir;
}
FileSystem getFileSystem() {
return this.fs;
} }
/** /**
@ -316,7 +334,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* Return the directory in which this store stores its * Return the directory in which this store stores its
* StoreFiles * StoreFiles
*/ */
public Path getHomedir() { Path getHomedir() {
return homedir; return homedir;
} }
@ -335,6 +353,10 @@ public class Store extends SchemaConfigured implements HeapSize {
this.dataBlockEncoder = blockEncoder; this.dataBlockEncoder = blockEncoder;
} }
FileStatus [] getStoreFiles() throws IOException {
return FSUtils.listStatus(this.fs, this.homedir, null);
}
/** /**
* Creates an unsorted list of StoreFile loaded in parallel * Creates an unsorted list of StoreFile loaded in parallel
* from the given directory. * from the given directory.
@ -342,7 +364,7 @@ public class Store extends SchemaConfigured implements HeapSize {
*/ */
private List<StoreFile> loadStoreFiles() throws IOException { private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>(); ArrayList<StoreFile> results = new ArrayList<StoreFile>();
FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null); FileStatus files[] = getStoreFiles();
if (files == null || files.length == 0) { if (files == null || files.length == 0) {
return results; return results;
@ -630,7 +652,7 @@ public class Store extends SchemaConfigured implements HeapSize {
storeFileCloserThreadPool.shutdownNow(); storeFileCloserThreadPool.shutdownNow();
} }
} }
LOG.debug("closed " + this.storeNameStr); LOG.info("Closed " + this);
return result; return result;
} finally { } finally {
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
@ -806,7 +828,7 @@ public class Store extends SchemaConfigured implements HeapSize {
*/ */
private StoreFile.Writer createWriterInTmp(int maxKeyCount) private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException { throws IOException {
return createWriterInTmp(maxKeyCount, this.compression, false); return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
} }
/* /*
@ -815,7 +837,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* @param isCompaction whether we are creating a new file in a compaction * @param isCompaction whether we are creating a new file in a compaction
* @return Writer for a new StoreFile in the tmp dir. * @return Writer for a new StoreFile in the tmp dir.
*/ */
private StoreFile.Writer createWriterInTmp(int maxKeyCount, StoreFile.Writer createWriterInTmp(int maxKeyCount,
Compression.Algorithm compression, boolean isCompaction) Compression.Algorithm compression, boolean isCompaction)
throws IOException { throws IOException {
final CacheConfig writerCacheConf; final CacheConfig writerCacheConf;
@ -959,16 +981,12 @@ public class Store extends SchemaConfigured implements HeapSize {
* @param CompactionRequest * @param CompactionRequest
* compaction details obtained from requestCompaction() * compaction details obtained from requestCompaction()
* @throws IOException * @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early.
*/ */
void compact(CompactionRequest cr) throws IOException { StoreFile compact(CompactionRequest cr) throws IOException {
if (cr == null || cr.getFiles().isEmpty()) { if (cr == null || cr.getFiles().isEmpty()) return null;
return; Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
}
Preconditions.checkArgument(cr.getStore().toString()
.equals(this.toString()));
List<StoreFile> filesToCompact = cr.getFiles(); List<StoreFile> filesToCompact = cr.getFiles();
synchronized (filesCompacting) { synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about // sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging // TODO: change this to LOG.error() after more debugging
@ -980,19 +998,26 @@ public class Store extends SchemaConfigured implements HeapSize {
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this.storeNameStr + " of " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString() + this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize())); + StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null; StoreFile sf = null;
try { try {
StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(), StoreFile.Writer writer =
maxId); this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place. // Move the compaction into place.
sf = completeCompaction(filesToCompact, writer); if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
if (region.getCoprocessorHost() != null) { sf = completeCompaction(filesToCompact, writer);
region.getCoprocessorHost().postCompact(this, sf); if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
}
} else {
// Create storefile around what we wrote with a reader on it.
sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
sf.createReader();
} }
} finally { } finally {
synchronized (filesCompacting) { synchronized (filesCompacting) {
@ -1001,7 +1026,7 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString() + this.region.getRegionInfo().getRegionNameAsString()
+ " into " + + " into " +
(sf == null ? "none" : sf.getPath().getName()) + (sf == null ? "none" : sf.getPath().getName()) +
@ -1009,6 +1034,7 @@ public class Store extends SchemaConfigured implements HeapSize {
StringUtils.humanReadableInt(sf.getReader().length())) StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is " + "; total size for store is "
+ StringUtils.humanReadableInt(storeSize)); + StringUtils.humanReadableInt(storeSize));
return sf;
} }
/** /**
@ -1048,7 +1074,8 @@ public class Store extends SchemaConfigured implements HeapSize {
try { try {
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); StoreFile.Writer writer =
this.compactor.compact(this, filesToCompact, isMajor, maxId);
// Move the compaction into place. // Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer); StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
@ -1097,10 +1124,10 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
/** getter for CompactionProgress object /** getter for CompactionProgress object
* @return CompactionProgress object * @return CompactionProgress object; can be null
*/ */
public CompactionProgress getCompactionProgress() { public CompactionProgress getCompactionProgress() {
return this.progress; return this.compactor.getProgress();
} }
/* /*
@ -1152,19 +1179,19 @@ public class Store extends SchemaConfigured implements HeapSize {
if (sf.isMajorCompaction() && if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) { (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this.storeNameStr + LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " + " because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl); oldest + "ms is < ttl=" + this.ttl);
} }
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
LOG.debug("Major compaction triggered on store " + this.storeNameStr + LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " + ", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms"); (now - lowTimestamp) + "ms");
result = true; result = true;
} }
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this.storeNameStr + LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms"); "; time since last major compaction " + (now - lowTimestamp) + "ms");
} }
result = true; result = true;
@ -1339,12 +1366,12 @@ public class Store extends SchemaConfigured implements HeapSize {
compactSelection.getFilesToCompact().get(pos).getReader().length() compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize && > maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos; !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
compactSelection.clearSubList(0, pos); if (pos != 0) compactSelection.clearSubList(0, pos);
} }
if (compactSelection.getFilesToCompact().isEmpty()) { if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
this.storeNameStr + ": no store files to compact"); this + ": no store files to compact");
compactSelection.emptyFileList(); compactSelection.emptyFileList();
return compactSelection; return compactSelection;
} }
@ -1420,7 +1447,7 @@ public class Store extends SchemaConfigured implements HeapSize {
// if we don't have enough files to compact, just wait // if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of " + this.storeNameStr LOG.debug("Skipped compaction of " + this
+ ". Only " + (end - start) + " file(s) of size " + ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize) + StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria."); + " have met compaction criteria.");
@ -1439,142 +1466,6 @@ public class Store extends SchemaConfigured implements HeapSize {
return compactSelection; return compactSelection;
} }
/**
* Do a minor/major compaction on an explicit set of storefiles in a Store.
* Uses the scan infrastructure to make it easy.
*
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @param maxId Readers maximum sequence id.
* @return Product of compaction or null if all cells expired or deleted and
* nothing made it through the compaction.
* @throws IOException
*/
StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
// calculate maximum key count after compaction (for blooms)
int maxKeyCount = 0;
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file : filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r != null) {
// NOTE: getFilterEntries could cause under-sized blooms if the user
// switches bloom type (e.g. from ROW to ROWCOL)
long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
? r.getFilterEntries() : r.getEntries();
maxKeyCount += keyCount;
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
", bloomtype=" + r.getBloomFilterType().toString() +
", size=" + StringUtils.humanReadableInt(r.length()) +
", encoding=" + r.getHFileReader().getEncodingOnDisk());
}
}
// For major compactions calculate the earliest put timestamp
// of all involved storefiles. This is used to remove
// family delete marker during the compaction.
if (majorCompaction) {
byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// there's a file with no information, must be an old one
// assume we have very old puts
earliestPutTs = HConstants.OLDEST_TIMESTAMP;
} else {
earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
}
}
}
// keep track of compaction progress
progress = new CompactionProgress(maxKeyCount);
// For each file, obtain a scanner:
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(filesToCompact, false, false, true);
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
try {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
/* include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(this, scan, scanners,
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
smallestReadPoint, earliestPutTs);
if (region.getCoprocessorHost() != null) {
InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
this, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return null;
}
scanner = cpScanner;
}
int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
boolean hasMore;
do {
hasMore = scanner.next(kvs, this.compactionKVMax);
if (writer == null && !kvs.isEmpty()) {
writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
true);
}
if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
if (kv.getMemstoreTS() <= smallestReadPoint) {
kv.setMemstoreTS(0);
}
writer.append(kv);
// update progress per key
++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested
if (Store.closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > Store.closeCheckInterval) {
bytesWritten = 0;
if (!this.region.areWritesEnabled()) {
writer.close();
fs.delete(writer.getPath(), false);
throw new InterruptedIOException(
"Aborting compaction of store " + this +
" in region " + this.region +
" because user requested stop.");
}
}
}
}
}
kvs.clear();
} while (hasMore);
} finally {
if (scanner != null) {
scanner.close();
}
}
} finally {
if (writer != null) {
writer.appendMetadata(maxId, majorCompaction);
writer.close();
}
}
return writer;
}
/** /**
* Validates a store file by opening and closing it. In HFileV2 this should * Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation. * not be an expensive operation.
@ -1677,7 +1568,7 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
} catch (IOException e) { } catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files in " + this.storeNameStr + LOG.error("Failed replacing compacted files in " + this +
". Compacted file is " + (result == null? "none": result.toString()) + ". Compacted file is " + (result == null? "none": result.toString()) +
". Files replaced " + compactedFiles.toString() + ". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e); " some of which may have been already removed", e);
@ -1962,7 +1853,7 @@ public class Store extends SchemaConfigured implements HeapSize {
return mk.getRow(); return mk.getRow();
} }
} catch(IOException e) { } catch(IOException e) {
LOG.warn("Failed getting store size for " + this.storeNameStr, e); LOG.warn("Failed getting store size for " + this, e);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
@ -2008,7 +1899,7 @@ public class Store extends SchemaConfigured implements HeapSize {
@Override @Override
public String toString() { public String toString() {
return this.storeNameStr; return getColumnFamilyName();
} }
/** /**
@ -2229,8 +2120,8 @@ public class Store extends SchemaConfigured implements HeapSize {
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+ (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

View File

@ -52,5 +52,4 @@ public class CompactionProgress {
public float getProgressPct() { public float getProgressPct() {
return currentCompactedKVs / totalCompactingKVs; return currentCompactedKVs / totalCompactingKVs;
} }
} }

View File

@ -25,9 +25,6 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ChecksumFactory;
/** /**
* Checksum types. The Checksum type is a one byte number * Checksum types. The Checksum type is a one byte number
* that stores a representation of the checksum algorithm * that stores a representation of the checksum algorithm
@ -70,7 +67,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32); ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.info("Checksum using " + PURECRC32); LOG.info("Checksum using " + PURECRC32);
} catch (Exception e) { } catch (Exception e) {
LOG.info(PURECRC32 + " not available."); LOG.trace(PURECRC32 + " not available.");
} }
try { try {
// The default checksum class name is java.util.zip.CRC32. // The default checksum class name is java.util.zip.CRC32.
@ -80,7 +77,7 @@ public enum ChecksumType {
LOG.info("Checksum can use " + JDKCRC); LOG.info("Checksum can use " + JDKCRC);
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn(JDKCRC + " not available. ", e); LOG.trace(JDKCRC + " not available.");
} }
} }
@ -113,7 +110,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32C); ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.info("Checksum can use " + PURECRC32C); LOG.info("Checksum can use " + PURECRC32C);
} catch (Exception e) { } catch (Exception e) {
LOG.info(PURECRC32C + " not available. "); LOG.trace(PURECRC32C + " not available.");
} }
} }

View File

@ -150,7 +150,7 @@ public abstract class FSUtils {
*/ */
public static FSDataOutputStream create(FileSystem fs, Path path, public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException { FsPermission perm, boolean overwrite) throws IOException {
LOG.debug("Creating file:" + path + "with permission:" + perm); LOG.debug("Creating file=" + path + " with permission=" + perm);
return fs.create(path, perm, overwrite, return fs.create(path, perm, overwrite,
fs.getConf().getInt("io.file.buffer.size", 4096), fs.getConf().getInt("io.file.buffer.size", 4096),

View File

@ -0,0 +1,154 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.mockito.Mockito;
/**
* Compact passed set of files.
* Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}.
* Call this classes {@link #main(String[])} to see how to run compaction code
* 'standalone'.
*/
public class CompactionTool implements Tool {
// Instantiates a Store instance and a mocked up HRegion. The compaction code
// requires a StoreScanner and a StoreScanner has to have a Store; its too
// tangled to do without (Store needs an HRegion which is another tangle).
// TODO: Undo the tangles some day.
private Configuration conf;
CompactionTool() {
super();
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public void setConf(Configuration c) {
this.conf = c;
}
private int usage(final int errCode) {
return usage(errCode, null);
}
private int usage(final int errCode, final String errMsg) {
if (errMsg != null) System.err.println("ERROR: " + errMsg);
System.err.println("Usage: CompactionTool [options] <inputdir>");
System.err.println(" To preserve input files, pass -Dhbase.hstore.compaction.complete=false");
System.err.println(" To set tmp dir, pass -Dhbase.tmp.dir=ALTERNATE_DIR");
System.err.println(" To stop delete of compacted file, pass -Dhbase.compactiontool.delete=false");
return errCode;
}
int checkdir(final FileSystem fs, final Path p) throws IOException {
if (!fs.exists(p)) {
return usage(-2, p.toString() + " does not exist.");
}
if (!fs.getFileStatus(p).isDir()) {
return usage(-3, p.toString() + " must be a directory");
}
return 0;
}
/**
* Mock up an HRegion instance. Need to return an HRegionInfo when asked.
* Also need an executor to run storefile open/closes. Need to repeat
* the thenReturn on getOpenAndCloseThreadPool because otherwise it returns
* cache of first which is closed during the opening. Also, need to return
* tmpdir, etc.
* @param hri
* @param tmpdir
* @return
*/
private HRegion createHRegion(final HRegionInfo hri, final Path tmpdir) {
HRegion mockedHRegion = Mockito.mock(HRegion.class);
Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(hri);
Mockito.when(mockedHRegion.getStoreFileOpenAndCloseThreadPool(Mockito.anyString())).
thenReturn(HRegion.getOpenAndCloseThreadPool(1, "mockedRegion.opener")).
thenReturn(HRegion.getOpenAndCloseThreadPool(1, "mockedRegion.closer"));
Mockito.when(mockedHRegion.areWritesEnabled()).thenReturn(true);
Mockito.when(mockedHRegion.getTmpDir()).thenReturn(tmpdir);
return mockedHRegion;
}
/**
* Fake up a Store around the passed <code>storedir</code>.
* @param fs
* @param storedir
* @param tmpdir
* @return
* @throws IOException
*/
private Store getStore(final FileSystem fs, final Path storedir, final Path tmpdir)
throws IOException {
// TODO: Let config on table and column family be configurable from
// command-line setting versions, etc. For now do defaults
HColumnDescriptor hcd = new HColumnDescriptor("f");
HRegionInfo hri = new HRegionInfo(Bytes.toBytes("t"));
// Get a shell of an HRegion w/ enough functionality to make Store happy.
HRegion region = createHRegion(hri, tmpdir);
// Create a Store w/ check of hbase.rootdir blanked out and return our
// list of files instead of have Store search its home dir.
return new Store(tmpdir, region, hcd, fs, getConf()) {
@Override
FileStatus[] getStoreFiles() throws IOException {
return this.fs.listStatus(getHomedir());
}
@Override
Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException {
return storedir;
}
};
}
@Override
public int run(final String[] args) throws Exception {
if (args.length == 0) return usage(-1);
FileSystem fs = FileSystem.get(getConf());
final Path inputdir = new Path(args[0]);
final Path tmpdir = new Path(getConf().get("hbase.tmp.dir"));
int errCode = checkdir(fs, inputdir);
if (errCode != 0) return errCode;
errCode = checkdir(fs, tmpdir);
if (errCode != 0) return errCode;
// Get a Store that wraps the inputdir of files to compact.
Store store = getStore(fs, inputdir, tmpdir);
// Now we have a Store, run a compaction of passed files.
try {
CompactionRequest cr = store.requestCompaction();
StoreFile sf = store.compact(cr);
if (sf != null) {
sf.closeReader(true);
if (conf.getBoolean("hbase.compactiontool.delete", true)) {
if (!fs.delete(sf.getPath(), false)) {
throw new IOException("Failed delete of " + sf.getPath());
}
}
}
} finally {
store.close();
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(),
new CompactionTool(), args));
}
}

View File

@ -585,8 +585,10 @@ public class TestCompaction extends HBaseTestCase {
List<StoreFile> storeFiles = store.getStorefiles(); List<StoreFile> storeFiles = store.getStorefiles();
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
Compactor tool = new Compactor(this.conf);
StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId); StoreFile.Writer compactedFile =
tool.compact(store, storeFiles, false, maxId);
// Now lets corrupt the compacted file. // Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);