From 5a9f595e7d1f7e23cc87e015562cde879ca5140b Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 23 Mar 2012 21:37:01 +0000 Subject: [PATCH] HBASE-5616 Make compaction code standalone git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1304616 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/io/hfile/LruBlockCache.java | 6 +- .../hadoop/hbase/regionserver/Compactor.java | 189 +++++++++++ .../hadoop/hbase/regionserver/HRegion.java | 23 +- .../hadoop/hbase/regionserver/Store.java | 305 ++++++------------ .../compactions/CompactionProgress.java | 1 - .../hadoop/hbase/util/ChecksumType.java | 9 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 2 +- .../hbase/regionserver/CompactionTool.java | 154 +++++++++ .../hbase/regionserver/TestCompaction.java | 4 +- 9 files changed, 461 insertions(+), 232 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java create mode 100644 src/test/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index b52e5d35bc3..91562f7e7ee 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -628,7 +628,7 @@ public class LruBlockCache implements BlockCache, HeapSize { // Log size long totalSize = heapSize(); long freeSize = maxSize - totalSize; - LruBlockCache.LOG.debug("LRU Stats: " + + LruBlockCache.LOG.debug("Stats: " + "total=" + StringUtils.byteDesc(totalSize) + ", " + "free=" + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + @@ -636,11 +636,11 @@ public class LruBlockCache implements BlockCache, HeapSize { "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " + "hitRatio=" + - (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" + - (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " + "evictedPerRun=" + stats.evictedPerEviction()); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java new file mode 100644 index 00000000000..53446c19868 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java @@ -0,0 +1,189 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; + +/** + * Compact passed set of files. + * Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}. + */ +@InterfaceAudience.Private +class Compactor extends Configured { + private static final Log LOG = LogFactory.getLog(Compactor.class); + private CompactionProgress progress; + + Compactor(final Configuration c) { + super(c); + } + + /** + * Do a minor/major compaction on an explicit set of storefiles from a Store. + * + * @param store Store the files belong to + * @param filesToCompact which files to compact + * @param majorCompaction true to major compact (prune all deletes, max versions, etc) + * @param maxId Readers maximum sequence id. + * @return Product of compaction or null if all cells expired or deleted and + * nothing made it through the compaction. + * @throws IOException + */ + StoreFile.Writer compact(final Store store, + final Collection filesToCompact, + final boolean majorCompaction, final long maxId) + throws IOException { + // Calculate maximum key count after compaction (for blooms) + // Also calculate earliest put timestamp if major compaction + int maxKeyCount = 0; + long earliestPutTs = HConstants.LATEST_TIMESTAMP; + for (StoreFile file: filesToCompact) { + StoreFile.Reader r = file.getReader(); + if (r == null) { + LOG.warn("Null reader for " + file.getPath()); + continue; + } + // NOTE: getFilterEntries could cause under-sized blooms if the user + // switches bloom type (e.g. from ROW to ROWCOL) + long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())? + r.getFilterEntries() : r.getEntries(); + maxKeyCount += keyCount; + // For major compactions calculate the earliest put timestamp of all + // involved storefiles. This is used to remove family delete marker during + // compaction. + if (majorCompaction) { + byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + if (tmp == null) { + // There's a file with no information, must be an old one + // assume we have very old puts + earliestPutTs = HConstants.OLDEST_TIMESTAMP; + } else { + earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compacting " + file + + ", keycount=" + keyCount + + ", bloomtype=" + r.getBloomFilterType().toString() + + ", size=" + StringUtils.humanReadableInt(r.length()) + + ", encoding=" + r.getHFileReader().getEncodingOnDisk() + + (majorCompaction? ", earliestPutTs=" + earliestPutTs: "")); + } + } + + // keep track of compaction progress + this.progress = new CompactionProgress(maxKeyCount); + + // For each file, obtain a scanner: + List scanners = StoreFileScanner + .getScannersForStoreFiles(filesToCompact, false, false, true); + + // Get some configs + int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10); + Compression.Algorithm compression = store.getFamily().getCompression(); + // Avoid overriding compression setting for major compactions if the user + // has not specified it separately + Compression.Algorithm compactionCompression = + (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ? + store.getFamily().getCompactionCompression(): compression; + // Make the instantiation lazy in case compaction produces no product; i.e. + // where all source cells are expired or deleted. + StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); + try { + InternalScanner scanner = null; + try { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + /* Include deletes, unless we are doing a major compaction */ + scanner = new StoreScanner(store, scan, scanners, + majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + smallestReadPoint, earliestPutTs); + if (store.getHRegion().getCoprocessorHost() != null) { + InternalScanner cpScanner = + store.getHRegion().getCoprocessorHost().preCompact(store, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; + } + + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List kvs = new ArrayList(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + boolean hasMore; + do { + hasMore = scanner.next(kvs, compactionKVMax); + if (writer == null && !kvs.isEmpty()) { + writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true); + } + if (writer != null) { + // output to writer: + for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } + writer.append(kv); + // update progress per key + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (Store.closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > Store.closeCheckInterval) { + bytesWritten = 0; + isInterrupted(store, writer); + } + } + } + } + kvs.clear(); + } while (hasMore); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } finally { + if (writer != null) { + writer.appendMetadata(maxId, majorCompaction); + writer.close(); + } + } + return writer; + } + + void isInterrupted(final Store store, final StoreFile.Writer writer) + throws IOException { + if (store.getHRegion().areWritesEnabled()) return; + // Else cleanup. + writer.close(); + store.getFileSystem().delete(writer.getPath(), false); + throw new InterruptedIOException( "Aborting compaction of store " + store + + " in region " + store.getHRegion() + " because user requested stop."); + } + + CompactionProgress getProgress() { + return this.progress; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e0d09dfb29e..02d55d4761f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1029,19 +1029,16 @@ public class HRegion implements HeapSize { // , Writable{ return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); } - private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, + static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, final String threadNamePrefix) { - ThreadPoolExecutor openAndCloseThreadPool = Threads - .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, - new ThreadFactory() { - private int count = 1; + return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; - public Thread newThread(Runnable r) { - Thread t = new Thread(r, threadNamePrefix + "-" + count++); - return t; - } - }); - return openAndCloseThreadPool; + public Thread newThread(Runnable r) { + return new Thread(r, threadNamePrefix + "-" + count++); + } + }); } /** @@ -5235,11 +5232,11 @@ public class HRegion implements HeapSize { // , Writable{ final HLog log = new HLog(fs, logdir, oldLogDir, c); try { processTable(fs, tableDir, log, c, majorCompact); - } finally { + } finally { log.close(); // TODO: is this still right? BlockCache bc = new CacheConfig(c).getBlockCache(); if (bc != null) bc.shutdown(); - } + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index dcede5aff96..0c7b3969c8e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -132,9 +131,6 @@ public class Store extends SchemaConfigured implements HeapSize { private volatile long totalUncompressedBytes = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final String storeNameStr; - private CompactionProgress progress; - private final int compactionKVMax; private final boolean verifyBulkLoads; // not private for testing @@ -152,10 +148,6 @@ public class Store extends SchemaConfigured implements HeapSize { new CopyOnWriteArraySet(); private final int blocksize; - /** Compression algorithm for flush files and minor compaction */ - private final Compression.Algorithm compression; - /** Compression algorithm for major compaction */ - private final Compression.Algorithm compactionCompression; private HFileDataBlockEncoder dataBlockEncoder; /** Checksum configuration */ @@ -165,6 +157,8 @@ public class Store extends SchemaConfigured implements HeapSize { // Comparing KeyValues final KeyValue.KVComparator comparator; + private final Compactor compactor; + /** * Constructor * @param basedir qualified path under which the region directory lives; @@ -179,52 +173,37 @@ public class Store extends SchemaConfigured implements HeapSize { protected Store(Path basedir, HRegion region, HColumnDescriptor family, FileSystem fs, Configuration conf) throws IOException { - super(conf, region.getTableDesc().getNameAsString(), + super(conf, region.getRegionInfo().getTableNameAsString(), Bytes.toString(family.getName())); - HRegionInfo info = region.regionInfo; + HRegionInfo info = region.getRegionInfo(); this.fs = fs; - this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); - if (!this.fs.exists(this.homedir)) { - if (!this.fs.mkdirs(this.homedir)) - throw new IOException("Failed create of: " + this.homedir.toString()); - } + // Assemble the store's home directory. + Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); + // Ensure it exists. + this.homedir = createStoreHomeDir(this.fs, p); this.region = region; this.family = family; this.conf = conf; this.blocksize = family.getBlocksize(); - this.compression = family.getCompression(); - // avoid overriding compression setting for major compactions if the user - // has not specified it separately - this.compactionCompression = - (family.getCompactionCompression() != Compression.Algorithm.NONE) ? - family.getCompactionCompression() : this.compression; this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(), family.getDataBlockEncoding()); this.comparator = info.getComparator(); - // getTimeToLive returns ttl in seconds. Convert to milliseconds. - this.ttl = family.getTimeToLive(); - if (ttl == HConstants.FOREVER) { - // default is unlimited ttl. - ttl = Long.MAX_VALUE; - } else if (ttl == -1) { - ttl = Long.MAX_VALUE; - } else { - // second -> ms adjust for user data - this.ttl *= 1000; - } + // Get TTL + this.ttl = getTTL(family); // used by ScanQueryMatcher long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); - LOG.info("time to purge deletes set to " + timeToPurgeDeletes + + LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes + "ms in store " + this); + // Why not just pass a HColumnDescriptor in here altogether? Even if have + // to clone it? scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family.getKeepDeletedCells(), timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); - this.storeNameStr = getColumnFamilyName(); // By default, compact if storefile.count >= minFilesToCompact this.minFilesToCompact = Math.max(2, @@ -241,10 +220,8 @@ public class Store extends SchemaConfigured implements HeapSize { this.region.memstoreFlushSize); this.maxCompactSize = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); - this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10); - this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", - false); + this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); if (Store.closeCheckInterval == 0) { Store.closeCheckInterval = conf.getInt( @@ -256,6 +233,47 @@ public class Store extends SchemaConfigured implements HeapSize { this.checksumType = getChecksumType(conf); // initilize bytes per checksum this.bytesPerChecksum = getBytesPerChecksum(conf); + // Create a compaction tool instance + this.compactor = new Compactor(this.conf); + } + + /** + * @param family + * @return + */ + long getTTL(final HColumnDescriptor family) { + // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. + long ttl = family.getTimeToLive(); + if (ttl == HConstants.FOREVER) { + // Default is unlimited ttl. + ttl = Long.MAX_VALUE; + } else if (ttl == -1) { + ttl = Long.MAX_VALUE; + } else { + // Second -> ms adjust for user data + ttl *= 1000; + } + return ttl; + } + + /** + * Create this store's homedir + * @param fs + * @param homedir + * @return Return homedir + * @throws IOException + */ + Path createStoreHomeDir(final FileSystem fs, + final Path homedir) throws IOException { + if (!fs.exists(homedir)) { + if (!fs.mkdirs(homedir)) + throw new IOException("Failed create of: " + homedir.toString()); + } + return homedir; + } + + FileSystem getFileSystem() { + return this.fs; } /** @@ -316,7 +334,7 @@ public class Store extends SchemaConfigured implements HeapSize { * Return the directory in which this store stores its * StoreFiles */ - public Path getHomedir() { + Path getHomedir() { return homedir; } @@ -335,6 +353,10 @@ public class Store extends SchemaConfigured implements HeapSize { this.dataBlockEncoder = blockEncoder; } + FileStatus [] getStoreFiles() throws IOException { + return FSUtils.listStatus(this.fs, this.homedir, null); + } + /** * Creates an unsorted list of StoreFile loaded in parallel * from the given directory. @@ -342,7 +364,7 @@ public class Store extends SchemaConfigured implements HeapSize { */ private List loadStoreFiles() throws IOException { ArrayList results = new ArrayList(); - FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null); + FileStatus files[] = getStoreFiles(); if (files == null || files.length == 0) { return results; @@ -630,7 +652,7 @@ public class Store extends SchemaConfigured implements HeapSize { storeFileCloserThreadPool.shutdownNow(); } } - LOG.debug("closed " + this.storeNameStr); + LOG.info("Closed " + this); return result; } finally { this.lock.writeLock().unlock(); @@ -806,7 +828,7 @@ public class Store extends SchemaConfigured implements HeapSize { */ private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.compression, false); + return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); } /* @@ -815,7 +837,7 @@ public class Store extends SchemaConfigured implements HeapSize { * @param isCompaction whether we are creating a new file in a compaction * @return Writer for a new StoreFile in the tmp dir. */ - private StoreFile.Writer createWriterInTmp(int maxKeyCount, + StoreFile.Writer createWriterInTmp(int maxKeyCount, Compression.Algorithm compression, boolean isCompaction) throws IOException { final CacheConfig writerCacheConf; @@ -959,16 +981,12 @@ public class Store extends SchemaConfigured implements HeapSize { * @param CompactionRequest * compaction details obtained from requestCompaction() * @throws IOException + * @return Storefile we compacted into or null if we failed or opted out early. */ - void compact(CompactionRequest cr) throws IOException { - if (cr == null || cr.getFiles().isEmpty()) { - return; - } - Preconditions.checkArgument(cr.getStore().toString() - .equals(this.toString())); - + StoreFile compact(CompactionRequest cr) throws IOException { + if (cr == null || cr.getFiles().isEmpty()) return null; + Preconditions.checkArgument(cr.getStore().toString().equals(this.toString())); List filesToCompact = cr.getFiles(); - synchronized (filesCompacting) { // sanity check: we're compacting files that this store knows about // TODO: change this to LOG.error() after more debugging @@ -980,19 +998,26 @@ public class Store extends SchemaConfigured implements HeapSize { // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " - + this.storeNameStr + " of " + + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); StoreFile sf = null; try { - StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(), - maxId); + StoreFile.Writer writer = + this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); // Move the compaction into place. - sf = completeCompaction(filesToCompact, writer); - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompact(this, sf); + if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { + sf = completeCompaction(filesToCompact, writer); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postCompact(this, sf); + } + } else { + // Create storefile around what we wrote with a reader on it. + sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf, + this.family.getBloomFilterType(), this.dataBlockEncoder); + sf.createReader(); } } finally { synchronized (filesCompacting) { @@ -1001,7 +1026,7 @@ public class Store extends SchemaConfigured implements HeapSize { } LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " - + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + + filesToCompact.size() + " file(s) in " + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into " + (sf == null ? "none" : sf.getPath().getName()) + @@ -1009,6 +1034,7 @@ public class Store extends SchemaConfigured implements HeapSize { StringUtils.humanReadableInt(sf.getReader().length())) + "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + return sf; } /** @@ -1048,7 +1074,8 @@ public class Store extends SchemaConfigured implements HeapSize { try { // Ready to go. Have list of files to compact. - StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); + StoreFile.Writer writer = + this.compactor.compact(this, filesToCompact, isMajor, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); if (region.getCoprocessorHost() != null) { @@ -1097,10 +1124,10 @@ public class Store extends SchemaConfigured implements HeapSize { } /** getter for CompactionProgress object - * @return CompactionProgress object + * @return CompactionProgress object; can be null */ public CompactionProgress getCompactionProgress() { - return this.progress; + return this.compactor.getProgress(); } /* @@ -1152,19 +1179,19 @@ public class Store extends SchemaConfigured implements HeapSize { if (sf.isMajorCompaction() && (this.ttl == HConstants.FOREVER || oldest < this.ttl)) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this.storeNameStr + + LOG.debug("Skipping major compaction of " + this + " because one (major) compacted file only and oldestTime " + oldest + "ms is < ttl=" + this.ttl); } } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { - LOG.debug("Major compaction triggered on store " + this.storeNameStr + + LOG.debug("Major compaction triggered on store " + this + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this.storeNameStr + + LOG.debug("Major compaction triggered on store " + this + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } result = true; @@ -1339,12 +1366,12 @@ public class Store extends SchemaConfigured implements HeapSize { compactSelection.getFilesToCompact().get(pos).getReader().length() > maxCompactSize && !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos; - compactSelection.clearSubList(0, pos); + if (pos != 0) compactSelection.clearSubList(0, pos); } if (compactSelection.getFilesToCompact().isEmpty()) { LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + - this.storeNameStr + ": no store files to compact"); + this + ": no store files to compact"); compactSelection.emptyFileList(); return compactSelection; } @@ -1420,7 +1447,7 @@ public class Store extends SchemaConfigured implements HeapSize { // if we don't have enough files to compact, just wait if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipped compaction of " + this.storeNameStr + LOG.debug("Skipped compaction of " + this + ". Only " + (end - start) + " file(s) of size " + StringUtils.humanReadableInt(totalSize) + " have met compaction criteria."); @@ -1439,142 +1466,6 @@ public class Store extends SchemaConfigured implements HeapSize { return compactSelection; } - /** - * Do a minor/major compaction on an explicit set of storefiles in a Store. - * Uses the scan infrastructure to make it easy. - * - * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @param maxId Readers maximum sequence id. - * @return Product of compaction or null if all cells expired or deleted and - * nothing made it through the compaction. - * @throws IOException - */ - StoreFile.Writer compactStore(final Collection filesToCompact, - final boolean majorCompaction, final long maxId) - throws IOException { - // calculate maximum key count after compaction (for blooms) - int maxKeyCount = 0; - long earliestPutTs = HConstants.LATEST_TIMESTAMP; - for (StoreFile file : filesToCompact) { - StoreFile.Reader r = file.getReader(); - if (r != null) { - // NOTE: getFilterEntries could cause under-sized blooms if the user - // switches bloom type (e.g. from ROW to ROWCOL) - long keyCount = (r.getBloomFilterType() == family.getBloomFilterType()) - ? r.getFilterEntries() : r.getEntries(); - maxKeyCount += keyCount; - if (LOG.isDebugEnabled()) { - LOG.debug("Compacting " + file + - ", keycount=" + keyCount + - ", bloomtype=" + r.getBloomFilterType().toString() + - ", size=" + StringUtils.humanReadableInt(r.length()) + - ", encoding=" + r.getHFileReader().getEncodingOnDisk()); - } - } - // For major compactions calculate the earliest put timestamp - // of all involved storefiles. This is used to remove - // family delete marker during the compaction. - if (majorCompaction) { - byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); - if (tmp == null) { - // there's a file with no information, must be an old one - // assume we have very old puts - earliestPutTs = HConstants.OLDEST_TIMESTAMP; - } else { - earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); - } - } - } - - // keep track of compaction progress - progress = new CompactionProgress(maxKeyCount); - - // For each file, obtain a scanner: - List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false, true); - - // Make the instantiation lazy in case compaction produces no product; i.e. - // where all source cells are expired or deleted. - StoreFile.Writer writer = null; - // Find the smallest read point across all the Scanners. - long smallestReadPoint = region.getSmallestReadPoint(); - MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); - try { - InternalScanner scanner = null; - try { - Scan scan = new Scan(); - scan.setMaxVersions(family.getMaxVersions()); - /* include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(this, scan, scanners, - majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, - smallestReadPoint, earliestPutTs); - if (region.getCoprocessorHost() != null) { - InternalScanner cpScanner = region.getCoprocessorHost().preCompact( - this, scanner); - // NULL scanner returned from coprocessor hooks means skip normal processing - if (cpScanner == null) { - return null; - } - - scanner = cpScanner; - } - - int bytesWritten = 0; - // since scanner.next() can return 'false' but still be delivering data, - // we have to use a do/while loop. - ArrayList kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - boolean hasMore; - do { - hasMore = scanner.next(kvs, this.compactionKVMax); - if (writer == null && !kvs.isEmpty()) { - writer = createWriterInTmp(maxKeyCount, this.compactionCompression, - true); - } - if (writer != null) { - // output to writer: - for (KeyValue kv : kvs) { - if (kv.getMemstoreTS() <= smallestReadPoint) { - kv.setMemstoreTS(0); - } - writer.append(kv); - // update progress per key - ++progress.currentCompactedKVs; - - // check periodically to see if a system stop is requested - if (Store.closeCheckInterval > 0) { - bytesWritten += kv.getLength(); - if (bytesWritten > Store.closeCheckInterval) { - bytesWritten = 0; - if (!this.region.areWritesEnabled()) { - writer.close(); - fs.delete(writer.getPath(), false); - throw new InterruptedIOException( - "Aborting compaction of store " + this + - " in region " + this.region + - " because user requested stop."); - } - } - } - } - } - kvs.clear(); - } while (hasMore); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } finally { - if (writer != null) { - writer.appendMetadata(maxId, majorCompaction); - writer.close(); - } - } - return writer; - } - /** * Validates a store file by opening and closing it. In HFileV2 this should * not be an expensive operation. @@ -1677,7 +1568,7 @@ public class Store extends SchemaConfigured implements HeapSize { } } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + + LOG.error("Failed replacing compacted files in " + this + ". Compacted file is " + (result == null? "none": result.toString()) + ". Files replaced " + compactedFiles.toString() + " some of which may have been already removed", e); @@ -1962,7 +1853,7 @@ public class Store extends SchemaConfigured implements HeapSize { return mk.getRow(); } } catch(IOException e) { - LOG.warn("Failed getting store size for " + this.storeNameStr, e); + LOG.warn("Failed getting store size for " + this, e); } finally { this.lock.readLock().unlock(); } @@ -2008,7 +1899,7 @@ public class Store extends SchemaConfigured implements HeapSize { @Override public String toString() { - return this.storeNameStr; + return getColumnFamilyName(); } /** @@ -2229,8 +2120,8 @@ public class Store extends SchemaConfigured implements HeapSize { public static final long FIXED_OVERHEAD = ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + - + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) - + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 553eee0a9cc..4444d42a5f0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -52,5 +52,4 @@ public class CompactionProgress { public float getProgressPct() { return currentCompactedKVs / totalCompactingKVs; } - } diff --git a/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java b/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java index d2329e10c58..885625b044c 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java +++ b/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java @@ -25,9 +25,6 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.ChecksumFactory; - /** * Checksum types. The Checksum type is a one byte number * that stores a representation of the checksum algorithm @@ -70,7 +67,7 @@ public enum ChecksumType { ctor = ChecksumFactory.newConstructor(PURECRC32); LOG.info("Checksum using " + PURECRC32); } catch (Exception e) { - LOG.info(PURECRC32 + " not available."); + LOG.trace(PURECRC32 + " not available."); } try { // The default checksum class name is java.util.zip.CRC32. @@ -80,7 +77,7 @@ public enum ChecksumType { LOG.info("Checksum can use " + JDKCRC); } } catch (Exception e) { - LOG.warn(JDKCRC + " not available. ", e); + LOG.trace(JDKCRC + " not available."); } } @@ -113,7 +110,7 @@ public enum ChecksumType { ctor = ChecksumFactory.newConstructor(PURECRC32C); LOG.info("Checksum can use " + PURECRC32C); } catch (Exception e) { - LOG.info(PURECRC32C + " not available. "); + LOG.trace(PURECRC32C + " not available."); } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 4bfd42f390a..aebe5b025cd 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -150,7 +150,7 @@ public abstract class FSUtils { */ public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, boolean overwrite) throws IOException { - LOG.debug("Creating file:" + path + "with permission:" + perm); + LOG.debug("Creating file=" + path + " with permission=" + perm); return fs.create(path, perm, overwrite, fs.getConf().getInt("io.file.buffer.size", 4096), diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/src/test/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java new file mode 100644 index 00000000000..56dd9e7f819 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -0,0 +1,154 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.mockito.Mockito; + +/** + * Compact passed set of files. + * Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}. + * Call this classes {@link #main(String[])} to see how to run compaction code + * 'standalone'. + */ +public class CompactionTool implements Tool { + // Instantiates a Store instance and a mocked up HRegion. The compaction code + // requires a StoreScanner and a StoreScanner has to have a Store; its too + // tangled to do without (Store needs an HRegion which is another tangle). + // TODO: Undo the tangles some day. + private Configuration conf; + + CompactionTool() { + super(); + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration c) { + this.conf = c; + } + + private int usage(final int errCode) { + return usage(errCode, null); + } + + private int usage(final int errCode, final String errMsg) { + if (errMsg != null) System.err.println("ERROR: " + errMsg); + System.err.println("Usage: CompactionTool [options] "); + System.err.println(" To preserve input files, pass -Dhbase.hstore.compaction.complete=false"); + System.err.println(" To set tmp dir, pass -Dhbase.tmp.dir=ALTERNATE_DIR"); + System.err.println(" To stop delete of compacted file, pass -Dhbase.compactiontool.delete=false"); + return errCode; + } + + int checkdir(final FileSystem fs, final Path p) throws IOException { + if (!fs.exists(p)) { + return usage(-2, p.toString() + " does not exist."); + } + if (!fs.getFileStatus(p).isDir()) { + return usage(-3, p.toString() + " must be a directory"); + } + return 0; + } + + /** + * Mock up an HRegion instance. Need to return an HRegionInfo when asked. + * Also need an executor to run storefile open/closes. Need to repeat + * the thenReturn on getOpenAndCloseThreadPool because otherwise it returns + * cache of first which is closed during the opening. Also, need to return + * tmpdir, etc. + * @param hri + * @param tmpdir + * @return + */ + private HRegion createHRegion(final HRegionInfo hri, final Path tmpdir) { + HRegion mockedHRegion = Mockito.mock(HRegion.class); + Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(hri); + Mockito.when(mockedHRegion.getStoreFileOpenAndCloseThreadPool(Mockito.anyString())). + thenReturn(HRegion.getOpenAndCloseThreadPool(1, "mockedRegion.opener")). + thenReturn(HRegion.getOpenAndCloseThreadPool(1, "mockedRegion.closer")); + Mockito.when(mockedHRegion.areWritesEnabled()).thenReturn(true); + Mockito.when(mockedHRegion.getTmpDir()).thenReturn(tmpdir); + return mockedHRegion; + } + + /** + * Fake up a Store around the passed storedir. + * @param fs + * @param storedir + * @param tmpdir + * @return + * @throws IOException + */ + private Store getStore(final FileSystem fs, final Path storedir, final Path tmpdir) + throws IOException { + // TODO: Let config on table and column family be configurable from + // command-line setting versions, etc. For now do defaults + HColumnDescriptor hcd = new HColumnDescriptor("f"); + HRegionInfo hri = new HRegionInfo(Bytes.toBytes("t")); + // Get a shell of an HRegion w/ enough functionality to make Store happy. + HRegion region = createHRegion(hri, tmpdir); + // Create a Store w/ check of hbase.rootdir blanked out and return our + // list of files instead of have Store search its home dir. + return new Store(tmpdir, region, hcd, fs, getConf()) { + @Override + FileStatus[] getStoreFiles() throws IOException { + return this.fs.listStatus(getHomedir()); + } + + @Override + Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException { + return storedir; + } + }; + } + + @Override + public int run(final String[] args) throws Exception { + if (args.length == 0) return usage(-1); + FileSystem fs = FileSystem.get(getConf()); + final Path inputdir = new Path(args[0]); + final Path tmpdir = new Path(getConf().get("hbase.tmp.dir")); + int errCode = checkdir(fs, inputdir); + if (errCode != 0) return errCode; + errCode = checkdir(fs, tmpdir); + if (errCode != 0) return errCode; + // Get a Store that wraps the inputdir of files to compact. + Store store = getStore(fs, inputdir, tmpdir); + // Now we have a Store, run a compaction of passed files. + try { + CompactionRequest cr = store.requestCompaction(); + StoreFile sf = store.compact(cr); + if (sf != null) { + sf.closeReader(true); + if (conf.getBoolean("hbase.compactiontool.delete", true)) { + if (!fs.delete(sf.getPath(), false)) { + throw new IOException("Failed delete of " + sf.getPath()); + } + } + } + } finally { + store.close(); + } + return 0; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), + new CompactionTool(), args)); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 91ac6528712..6e12b3dcb39 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -585,8 +585,10 @@ public class TestCompaction extends HBaseTestCase { List storeFiles = store.getStorefiles(); long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); + Compactor tool = new Compactor(this.conf); - StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId); + StoreFile.Writer compactedFile = + tool.compact(store, storeFiles, false, maxId); // Now lets corrupt the compacted file. FileSystem fs = FileSystem.get(conf);