diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index 5950585bb96..b06be6b5de5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FileLink; import com.google.common.annotations.VisibleForTesting; @@ -76,14 +75,23 @@ public class FSDataInputStreamWrapper { private volatile int hbaseChecksumOffCount = -1; public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { - this(fs, null, path); + this(fs, null, path, false); + } + + public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException { + this(fs, null, path, dropBehind); } public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException { - this(fs, link, null); + this(fs, link, null, false); + } + public FSDataInputStreamWrapper(FileSystem fs, FileLink link, + boolean dropBehind) throws IOException { + this(fs, link, null, dropBehind); } - private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException { + private FSDataInputStreamWrapper(FileSystem fs, FileLink link, + Path path, boolean dropBehind) throws IOException { assert (path == null) != (link == null); this.path = path; this.link = link; @@ -96,8 +104,14 @@ public class FSDataInputStreamWrapper { // Initially we are going to read the tail block. Open the reader w/FS checksum. this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + try { + this.stream.setDropBehind(dropBehind); + } catch (Exception e) { + // Skipped. + } } + /** * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any * reads finish and before any other reads start (what happens in reality is we read the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 08e90483d91..7b4f530bff7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -131,6 +131,8 @@ public class CacheConfig { private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false; private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class"; + private static final String DROP_BEHIND_CACHE_COMPACTION_KEY="hbase.hfile.drop.behind.compaction"; + private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; /** * Enum of all built in external block caches. @@ -194,6 +196,8 @@ public class CacheConfig { */ private boolean cacheDataInL1; + private final boolean dropBehindCompaction; + /** * Create a cache configuration using the specified configuration object and * family descriptor. @@ -218,7 +222,8 @@ public class CacheConfig { conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, - HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1() + HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1(), + conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT) ); } @@ -239,7 +244,8 @@ public class CacheConfig { conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, - HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) + HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1), + conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT) ); } @@ -264,7 +270,7 @@ public class CacheConfig { final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, final boolean cacheDataCompressed, final boolean prefetchOnOpen, - final boolean cacheDataInL1) { + final boolean cacheDataInL1, final boolean dropBehindCompaction) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; @@ -275,6 +281,7 @@ public class CacheConfig { this.cacheDataCompressed = cacheDataCompressed; this.prefetchOnOpen = prefetchOnOpen; this.cacheDataInL1 = cacheDataInL1; + this.dropBehindCompaction = dropBehindCompaction; LOG.info(this); } @@ -287,7 +294,7 @@ public class CacheConfig { cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, - cacheConf.cacheDataInL1); + cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction); } /** @@ -314,6 +321,10 @@ public class CacheConfig { return isBlockCacheEnabled() && cacheDataOnRead; } + public boolean shouldDropBehindCompaction() { + return dropBehindCompaction; + } + /** * Should we cache a block of a particular category? We always cache * important blocks such as index blocks, as long as the block cache is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 48636a50dd9..7dbad6c8579 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -251,6 +251,7 @@ public class HFile { CellComparator.COMPARATOR; protected InetSocketAddress[] favoredNodes; private HFileContext fileContext; + protected boolean shouldDropBehind = false; WriterFactory(Configuration conf, CacheConfig cacheConf) { this.conf = conf; @@ -288,6 +289,12 @@ public class HFile { return this; } + public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) { + this.shouldDropBehind = shouldDropBehind; + return this; + } + + public Writer create() throws IOException { if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) { throw new AssertionError("Please specify exactly one of " + @@ -295,6 +302,11 @@ public class HFile { } if (path != null) { ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes); + try { + ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction()); + } catch (UnsupportedOperationException uoe) { + LOG.debug("Unable to set drop behind on " + path, uoe); + } } return createWriter(fs, path, ostream, comparator, fileContext); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 1670f7b777b..f48bb946005 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -73,15 +73,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { /** * Creates a writer for a new file in a temporary directory. * @param fd The file details. - * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @param shouldDropBehind Should the writer drop behind. * @return Writer for a new StoreFile in the tmp dir. * @throws IOException */ @Override - protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { + protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException { // make this writer with tags always because of possible new cells with tags. StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, - true, true, true); + true, true, true, shouldDropBehind); return writer; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java index 68f19b636c9..8c760e66e5d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -58,7 +58,7 @@ public class MobFile { List sfs = new ArrayList(); sfs.add(sf); List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, - false, null, sf.getMaxMemstoreTS()); + false, false, sf.getMaxMemstoreTS()); return sfScanners.get(0); } @@ -89,7 +89,7 @@ public class MobFile { sfs.add(sf); try { List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, - cacheMobBlocks, true, false, null, readPt); + cacheMobBlocks, true, false, false, readPt); if (!sfScanners.isEmpty()) { scanner = sfScanners.get(0); if (scanner.seek(search)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 19137c4d15c..e0ae4810342 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -547,7 +547,7 @@ public class PartitionedMobCompactor extends MobCompactor { private StoreScanner createScanner(List filesToCompact, ScanType scanType) throws IOException { List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, - null, HConstants.LATEST_TIMESTAMP); + false, HConstants.LATEST_TIMESTAMP); Scan scan = new Scan(); scan.setMaxVersions(column.getMaxVersions()); long ttl = HStore.determineTTLFromFamily(column); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 4b56ba3fc62..2f512778eb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -63,8 +63,11 @@ public class DefaultStoreFlusher extends StoreFlusher { synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(), false, - true, snapshot.isTagsPresent()); + writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(), + /* isCompaction = */ false, + /* includeMVCCReadpoint = */ true, + /* includesTags = */ snapshot.isTagsPresent(), + /* shouldDropBehind = */ false); writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); IOException e = null; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7569e7aad0f..840085d58f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -986,6 +986,15 @@ public class HStore implements Store { return sf; } + @Override + public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, + boolean isCompaction, boolean includeMVCCReadpoint, + boolean includesTag) + throws IOException { + return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, + includesTag, false); + } + /* * @param maxKeyCount * @param compression Compression algorithm to use @@ -996,7 +1005,8 @@ public class HStore implements Store { */ @Override public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) + boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, + boolean shouldDropBehind) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { @@ -1021,6 +1031,7 @@ public class HStore implements Store { .withMaxKeyCount(maxKeyCount) .withFavoredNodes(favoredNodes) .withFileContext(hFileContext) + .withShouldDropCacheBehind(shouldDropBehind) .build(); return w; } @@ -1122,9 +1133,8 @@ public class HStore implements Store { // TODO this used to get the store files in descending order, // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher, - readPt); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, + cacheBlocks, usePread, isCompaction, false, matcher, readPt); List scanners = new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); @@ -1196,7 +1206,7 @@ public class HStore implements Store { CompactionThroughputController throughputController) throws IOException { assert compaction != null; List sfs = null; - CompactionRequest cr = compaction.getRequest();; + CompactionRequest cr = compaction.getRequest(); try { // Do all sanity checking in here if we have a valid CompactionRequest // because we need to clean up after it on the way out in a finally @@ -2026,7 +2036,7 @@ public class HStore implements Store { return new StoreFlusherImpl(cacheFlushId); } - private class StoreFlusherImpl implements StoreFlushContext { + private final class StoreFlusherImpl implements StoreFlushContext { private long cacheFlushSeqNum; private MemStoreSnapshot snapshot; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index edc166ebba5..24310442558 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -145,21 +145,42 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf FileSystem getFileSystem(); - /* + + /** * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction * @param includeMVCCReadpoint whether we should out the MVCC readpoint * @return Writer for a new StoreFile in the tmp dir. */ + StoreFile.Writer createWriterInTmp( + long maxKeyCount, + Compression.Algorithm compression, + boolean isCompaction, + boolean includeMVCCReadpoint, + boolean includesTags + ) throws IOException; + + /** + * @param maxKeyCount + * @param compression Compression algorithm to use + * @param isCompaction whether we are creating a new file in a compaction + * @param includeMVCCReadpoint whether we should out the MVCC readpoint + * @param shouldDropBehind should the writer drop caches behind writes + * @return Writer for a new StoreFile in the tmp dir. + */ StoreFile.Writer createWriterInTmp( long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, - boolean includesTags + boolean includesTags, + boolean shouldDropBehind ) throws IOException; + + + // Compaction oriented methods boolean throttleCompaction(long compactionSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index f948bdc52fa..ba872095013 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -257,8 +257,8 @@ public class StoreFile { } /** - * @return True if this is a StoreFile Reference; call after {@link #open()} - * else may get wrong answer. + * @return True if this is a StoreFile Reference; call + * after {@link #open(boolean canUseDropBehind)} else may get wrong answer. */ public boolean isReference() { return this.fileInfo.isReference(); @@ -376,13 +376,13 @@ public class StoreFile { * @throws IOException * @see #closeReader(boolean) */ - private Reader open() throws IOException { + private Reader open(boolean canUseDropBehind) throws IOException { if (this.reader != null) { throw new IllegalAccessError("Already open"); } // Open the StoreFile.Reader - this.reader = fileInfo.open(this.fs, this.cacheConf); + this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind); // Load up indices and fileinfo. This also loads Bloom filter type. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); @@ -478,14 +478,18 @@ public class StoreFile { return this.reader; } + public Reader createReader() throws IOException { + return createReader(false); + } + /** * @return Reader for StoreFile. creates if necessary * @throws IOException */ - public Reader createReader() throws IOException { + public Reader createReader(boolean canUseDropBehind) throws IOException { if (this.reader == null) { try { - this.reader = open(); + this.reader = open(canUseDropBehind); } catch (IOException e) { try { this.closeReader(true); @@ -574,6 +578,8 @@ public class StoreFile { private Path filePath; private InetSocketAddress[] favoredNodes; private HFileContext fileContext; + private boolean shouldDropCacheBehind = false; + public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; @@ -639,6 +645,11 @@ public class StoreFile { this.fileContext = fileContext; return this; } + + public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { + this.shouldDropCacheBehind = shouldDropCacheBehind; + return this; + } /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using @@ -1316,7 +1327,7 @@ public class StoreFile { break; case ROWCOL: - kvKey = KeyValueUtil.createFirstOnRow(row, rowOffset, rowLen, + kvKey = KeyValueUtil.createFirstOnRow(row, rowOffset, rowLen, HConstants.EMPTY_BYTE_ARRAY, 0, 0, col, colOffset, colLen); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 619a1348a38..6e744c146a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -229,21 +229,24 @@ public class StoreFileInfo { * @return The StoreFile.Reader for the file */ public StoreFile.Reader open(final FileSystem fs, - final CacheConfig cacheConf) throws IOException { + final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException { FSDataInputStreamWrapper in; FileStatus status; + final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction(); if (this.link != null) { // HFileLink - in = new FSDataInputStreamWrapper(fs, this.link); + in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind); status = this.link.getFileStatus(fs); } else if (this.reference != null) { // HFile Reference Path referencePath = getReferredToFile(this.getPath()); - in = new FSDataInputStreamWrapper(fs, referencePath); + in = new FSDataInputStreamWrapper(fs, referencePath, + doDropBehind); status = fs.getFileStatus(referencePath); } else { - in = new FSDataInputStreamWrapper(fs, this.getPath()); + in = new FSDataInputStreamWrapper(fs, this.getPath(), + doDropBehind); status = fs.getFileStatus(initialPath); } long length = status.getLen(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index d954764e2cd..c3bac1561ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -87,7 +87,7 @@ public class StoreFileScanner implements KeyValueScanner { boolean cacheBlocks, boolean usePread, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, - usePread, false, readPt); + usePread, false, false, readPt); } /** @@ -95,9 +95,9 @@ public class StoreFileScanner implements KeyValueScanner { */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, long readPt) throws IOException { + boolean isCompaction, boolean useDropBehind, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, - null, readPt); + useDropBehind, null, readPt); } /** @@ -107,11 +107,12 @@ public class StoreFileScanner implements KeyValueScanner { */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { + boolean isCompaction, boolean canUseDrop, + ScanQueryMatcher matcher, long readPt) throws IOException { List scanners = new ArrayList( files.size()); for (StoreFile file : files) { - StoreFile.Reader r = file.createReader(); + StoreFile.Reader r = file.createReader(canUseDrop); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt); scanner.setScanQueryMatcher(matcher); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index c87b24654a2..9deea7ade7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -109,7 +109,11 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public Writer createWriter() throws IOException { StoreFile.Writer writer = store.createWriterInTmp( - kvCount, store.getFamily().getCompressionType(), false, true, true); + kvCount, store.getFamily().getCompressionType(), + /* isCompaction = */ false, + /* includeMVCCReadpoint = */ true, + /* includesTags = */ true, + /* shouldDropBehind = */ false); writer.setTimeRangeTracker(tracker); return writer; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index b64f40f1848..873d8275e86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -197,8 +197,14 @@ public abstract class Compactor { * @return Scanners. */ protected List createFileScanners( - final Collection filesToCompact, long smallestReadPoint) throws IOException { - return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, + final Collection filesToCompact, + long smallestReadPoint, + boolean useDropBehind) throws IOException { + return StoreFileScanner.getScannersForStoreFiles(filesToCompact, + /* cache blocks = */ false, + /* use pread = */ false, + /* is compaction */ true, + /* use Drop Behind */ useDropBehind, smallestReadPoint); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index e20157f9916..f26f4fe3dfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor { List scanners; Collection readersToClose; - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) { + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, // HFileFiles, and their readers readersToClose = new ArrayList(request.getFiles().size()); for (StoreFile f : request.getFiles()) { readersToClose.add(new StoreFile(f)); } - scanners = createFileScanners(readersToClose, smallestReadPoint); + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); } else { readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); } StoreFile.Writer writer = null; @@ -81,8 +83,10 @@ public class DefaultCompactor extends Compactor { InternalScanner scanner = null; try { /* Include deletes, unless we are doing a compaction of all files */ - ScanType scanType = request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES - : ScanType.COMPACT_DROP_DELETES; + ScanType scanType = + request.isRetainDeleteMarkers() ? + ScanType.COMPACT_RETAIN_DELETES : + ScanType.COMPACT_DROP_DELETES; scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); if (scanner == null) { scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); @@ -99,14 +103,16 @@ public class DefaultCompactor extends Compactor { cleanSeqId = true; } - writer = createTmpWriter(fd, smallestReadPoint); + + writer = createTmpWriter(fd, store.throttleCompaction(request.getSize())); boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController, request.isAllFiles()); + if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); writer = null; - throw new InterruptedIOException( "Aborting compaction of store " + store + + throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); } @@ -147,18 +153,20 @@ public class DefaultCompactor extends Compactor { /** * Creates a writer for a new file in a temporary directory. * @param fd The file details. - * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. * @return Writer for a new StoreFile in the tmp dir. * @throws IOException */ - protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) + protected StoreFile.Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException { - // When all MVCC readpoints are 0, don't write them. - // See HBASE-8166, HBASE-12600, and HBASE-13389. - // make this writer with tags always because of possible new cells with tags. - return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, - true, fd.maxMVCCReadpoint > 0, fd.maxTagsLength >0); + // When all MVCC readpoints are 0, don't write them. + // See HBASE-8166, HBASE-12600, and HBASE-13389. + + return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + /* isCompaction = */ true, + /* includeMVCCReadpoint = */ fd.maxMVCCReadpoint > 0, + /* includesTags = */ fd.maxTagsLength > 0, + /* shouldDropBehind = */ shouldDropBehind); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 4095db25cee..6814b8c4036 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor { throughputController); } - private List compactInternal(StripeMultiFileWriter mw, CompactionRequest request, + private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, CompactionThroughputController throughputController) throws IOException { final Collection filesToCompact = request.getFiles(); @@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor { this.progress = new CompactionProgress(fd.maxKeyCount); long smallestReadPoint = getSmallestReadPoint(); - List scanners = createFileScanners(filesToCompact, smallestReadPoint); + List scanners = createFileScanners(filesToCompact, + smallestReadPoint, store.throttleCompaction(request.getSize())); boolean finished = false; InternalScanner scanner = null; @@ -124,7 +125,8 @@ public class StripeCompactor extends Compactor { @Override public Writer createWriter() throws IOException { return store.createWriterInTmp( - fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0); + fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0, + store.throttleCompaction(request.getSize())); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 5823dfb2dac..bfa5b87cdc5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -256,7 +256,8 @@ public class TestCacheOnWrite { cacheConf = new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), cowType.shouldBeCached(BlockType.LEAF_INDEX), - cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false); + cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, + false, false, false); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index e6d2b98df2f..d6599c25e6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -386,7 +386,7 @@ public class TestPartitionedMobCompactor { sfs.add(sf); } List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, - false, null, HConstants.LATEST_TIMESTAMP); + false, false, HConstants.LATEST_TIMESTAMP); Scan scan = new Scan(); scan.setMaxVersions(hcd.getMaxVersions()); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 0d3fa13b691..07647e8a87d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -145,7 +145,7 @@ public class TestFSErrorsExposed { cacheConf, BloomType.NONE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false, + Collections.singletonList(sf), false, true, false, false, // 0 is passed as readpoint because this test operates on StoreFile directly 0); KeyValueScanner scanner = scanners.get(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java index 0b8b658d1ae..8d2d8577da2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java @@ -436,7 +436,7 @@ public class TestMobStoreCompaction { numDelfiles++; } } - List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null, + List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false, HConstants.LATEST_TIMESTAMP); Scan scan = new Scan(); scan.setMaxVersions(hcd.getMaxVersions()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 452d8ffb824..c520422e57b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -106,14 +106,14 @@ public class TestReversibleScanners { TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); List scanners = StoreFileScanner - .getScannersForStoreFiles(Collections.singletonList(sf), false, true, - false, Long.MAX_VALUE); + .getScannersForStoreFiles(Collections.singletonList(sf), + false, true, false, false, Long.MAX_VALUE); StoreFileScanner scanner = scanners.get(0); seekTestOfReversibleKeyValueScanner(scanner); for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false, readPoint); + Collections.singletonList(sf), false, true, false, false, readPoint); seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); } } @@ -494,7 +494,7 @@ public class TestReversibleScanners { throws IOException { List fileScanners = StoreFileScanner .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, - false, readPoint); + false, false, readPoint); List memScanners = memstore.getScanners(readPoint); List scanners = new ArrayList( fileScanners.size() + 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index eef229f6309..6ef6336c2b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -197,7 +197,7 @@ public class TestStripeCompactor { when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), - anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); when(store.getComparator()).thenReturn(CellComparator.COMPARATOR); return new StripeCompactor(conf, store) { @@ -228,6 +228,7 @@ public class TestStripeCompactor { .thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader()).thenReturn(r); + when(sf.createReader(anyBoolean())).thenReturn(r); return new CompactionRequest(Arrays.asList(sf)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index c7848baccbd..a0579ce8e39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -736,6 +736,7 @@ public class TestStripeCompactionPolicy { when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); + when(sf.createReader(anyBoolean())).thenReturn(r); when(sf.createReader()).thenReturn(r); return sf; } @@ -759,7 +760,7 @@ public class TestStripeCompactionPolicy { when(store.getRegionInfo()).thenReturn(info); when( store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), - anyBoolean(), anyBoolean())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); Configuration conf = HBaseConfiguration.create(); final Scanner scanner = new Scanner();