HBASE-14098 Allow dropping caches behind compactions

Summary: On large compactions drop the OS level fs cache behind reads and writes.

Test Plan: Test on a loaded cluster.

Differential Revision: https://reviews.facebook.net/D42681
This commit is contained in:
Elliott Clark 2015-07-16 11:09:53 -07:00
parent d7178aa27c
commit a399ac9c4f
23 changed files with 179 additions and 70 deletions

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FileLink;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -76,14 +75,23 @@ public class FSDataInputStreamWrapper {
private volatile int hbaseChecksumOffCount = -1; private volatile int hbaseChecksumOffCount = -1;
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, null, path); this(fs, null, path, false);
}
public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
this(fs, null, path, dropBehind);
} }
public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException { public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
this(fs, link, null); this(fs, link, null, false);
}
public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
boolean dropBehind) throws IOException {
this(fs, link, null, dropBehind);
} }
private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException { private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
Path path, boolean dropBehind) throws IOException {
assert (path == null) != (link == null); assert (path == null) != (link == null);
this.path = path; this.path = path;
this.link = link; this.link = link;
@ -96,7 +104,13 @@ public class FSDataInputStreamWrapper {
// Initially we are going to read the tail block. Open the reader w/FS checksum. // Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path); this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
try {
this.stream.setDropBehind(dropBehind);
} catch (Exception e) {
// Skipped.
} }
}
/** /**
* Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any

View File

@ -131,6 +131,8 @@ public class CacheConfig {
private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false; private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false;
private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class"; private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class";
private static final String DROP_BEHIND_CACHE_COMPACTION_KEY="hbase.hfile.drop.behind.compaction";
private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
/** /**
* Enum of all built in external block caches. * Enum of all built in external block caches.
@ -194,6 +196,8 @@ public class CacheConfig {
*/ */
private boolean cacheDataInL1; private boolean cacheDataInL1;
private final boolean dropBehindCompaction;
/** /**
* Create a cache configuration using the specified configuration object and * Create a cache configuration using the specified configuration object and
* family descriptor. * family descriptor.
@ -218,7 +222,8 @@ public class CacheConfig {
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(), DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(),
conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1() HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1(),
conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
); );
} }
@ -239,7 +244,8 @@ public class CacheConfig {
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN),
conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1),
conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
); );
} }
@ -264,7 +270,7 @@ public class CacheConfig {
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose, final boolean cacheBloomsOnWrite, final boolean evictOnClose,
final boolean cacheDataCompressed, final boolean prefetchOnOpen, final boolean cacheDataCompressed, final boolean prefetchOnOpen,
final boolean cacheDataInL1) { final boolean cacheDataInL1, final boolean dropBehindCompaction) {
this.blockCache = blockCache; this.blockCache = blockCache;
this.cacheDataOnRead = cacheDataOnRead; this.cacheDataOnRead = cacheDataOnRead;
this.inMemory = inMemory; this.inMemory = inMemory;
@ -275,6 +281,7 @@ public class CacheConfig {
this.cacheDataCompressed = cacheDataCompressed; this.cacheDataCompressed = cacheDataCompressed;
this.prefetchOnOpen = prefetchOnOpen; this.prefetchOnOpen = prefetchOnOpen;
this.cacheDataInL1 = cacheDataInL1; this.cacheDataInL1 = cacheDataInL1;
this.dropBehindCompaction = dropBehindCompaction;
LOG.info(this); LOG.info(this);
} }
@ -287,7 +294,7 @@ public class CacheConfig {
cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
cacheConf.cacheDataInL1); cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction);
} }
/** /**
@ -314,6 +321,10 @@ public class CacheConfig {
return isBlockCacheEnabled() && cacheDataOnRead; return isBlockCacheEnabled() && cacheDataOnRead;
} }
public boolean shouldDropBehindCompaction() {
return dropBehindCompaction;
}
/** /**
* Should we cache a block of a particular category? We always cache * Should we cache a block of a particular category? We always cache
* important blocks such as index blocks, as long as the block cache is * important blocks such as index blocks, as long as the block cache is

View File

@ -251,6 +251,7 @@ public class HFile {
CellComparator.COMPARATOR; CellComparator.COMPARATOR;
protected InetSocketAddress[] favoredNodes; protected InetSocketAddress[] favoredNodes;
private HFileContext fileContext; private HFileContext fileContext;
protected boolean shouldDropBehind = false;
WriterFactory(Configuration conf, CacheConfig cacheConf) { WriterFactory(Configuration conf, CacheConfig cacheConf) {
this.conf = conf; this.conf = conf;
@ -288,6 +289,12 @@ public class HFile {
return this; return this;
} }
public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) {
this.shouldDropBehind = shouldDropBehind;
return this;
}
public Writer create() throws IOException { public Writer create() throws IOException {
if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) { if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
throw new AssertionError("Please specify exactly one of " + throw new AssertionError("Please specify exactly one of " +
@ -295,6 +302,11 @@ public class HFile {
} }
if (path != null) { if (path != null) {
ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes); ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes);
try {
ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
} catch (UnsupportedOperationException uoe) {
LOG.debug("Unable to set drop behind on " + path, uoe);
}
} }
return createWriter(fs, path, ostream, return createWriter(fs, path, ostream,
comparator, fileContext); comparator, fileContext);

View File

@ -73,15 +73,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
/** /**
* Creates a writer for a new file in a temporary directory. * Creates a writer for a new file in a temporary directory.
* @param fd The file details. * @param fd The file details.
* @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. * @param shouldDropBehind Should the writer drop behind.
* @return Writer for a new StoreFile in the tmp dir. * @return Writer for a new StoreFile in the tmp dir.
* @throws IOException * @throws IOException
*/ */
@Override @Override
protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags. // make this writer with tags always because of possible new cells with tags.
StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
true, true, true); true, true, true, shouldDropBehind);
return writer; return writer;
} }

View File

@ -58,7 +58,7 @@ public class MobFile {
List<StoreFile> sfs = new ArrayList<StoreFile>(); List<StoreFile> sfs = new ArrayList<StoreFile>();
sfs.add(sf); sfs.add(sf);
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
false, null, sf.getMaxMemstoreTS()); false, false, sf.getMaxMemstoreTS());
return sfScanners.get(0); return sfScanners.get(0);
} }
@ -89,7 +89,7 @@ public class MobFile {
sfs.add(sf); sfs.add(sf);
try { try {
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
cacheMobBlocks, true, false, null, readPt); cacheMobBlocks, true, false, false, readPt);
if (!sfScanners.isEmpty()) { if (!sfScanners.isEmpty()) {
scanner = sfScanners.get(0); scanner = sfScanners.get(0);
if (scanner.seek(search)) { if (scanner.seek(search)) {

View File

@ -547,7 +547,7 @@ public class PartitionedMobCompactor extends MobCompactor {
private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
throws IOException { throws IOException {
List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
null, HConstants.LATEST_TIMESTAMP); false, HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(column.getMaxVersions()); scan.setMaxVersions(column.getMaxVersions());
long ttl = HStore.determineTTLFromFamily(column); long ttl = HStore.determineTTLFromFamily(column);

View File

@ -63,8 +63,11 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) { synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer"); status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk // Write the map out to the disk
writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(), false, writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(),
true, snapshot.isTagsPresent()); /* isCompaction = */ false,
/* includeMVCCReadpoint = */ true,
/* includesTags = */ snapshot.isTagsPresent(),
/* shouldDropBehind = */ false);
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null; IOException e = null;
try { try {

View File

@ -986,6 +986,15 @@ public class HStore implements Store {
return sf; return sf;
} }
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint,
boolean includesTag)
throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
includesTag, false);
}
/* /*
* @param maxKeyCount * @param maxKeyCount
* @param compression Compression algorithm to use * @param compression Compression algorithm to use
@ -996,7 +1005,8 @@ public class HStore implements Store {
*/ */
@Override @Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind)
throws IOException { throws IOException {
final CacheConfig writerCacheConf; final CacheConfig writerCacheConf;
if (isCompaction) { if (isCompaction) {
@ -1021,6 +1031,7 @@ public class HStore implements Store {
.withMaxKeyCount(maxKeyCount) .withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes) .withFavoredNodes(favoredNodes)
.withFileContext(hFileContext) .withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind)
.build(); .build();
return w; return w;
} }
@ -1122,9 +1133,8 @@ public class HStore implements Store {
// TODO this used to get the store files in descending order, // TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is // but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end. // actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
readPt);
List<KeyValueScanner> scanners = List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1); new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners); scanners.addAll(sfScanners);
@ -1196,7 +1206,7 @@ public class HStore implements Store {
CompactionThroughputController throughputController) throws IOException { CompactionThroughputController throughputController) throws IOException {
assert compaction != null; assert compaction != null;
List<StoreFile> sfs = null; List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();; CompactionRequest cr = compaction.getRequest();
try { try {
// Do all sanity checking in here if we have a valid CompactionRequest // Do all sanity checking in here if we have a valid CompactionRequest
// because we need to clean up after it on the way out in a finally // because we need to clean up after it on the way out in a finally
@ -2026,7 +2036,7 @@ public class HStore implements Store {
return new StoreFlusherImpl(cacheFlushId); return new StoreFlusherImpl(cacheFlushId);
} }
private class StoreFlusherImpl implements StoreFlushContext { private final class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum; private long cacheFlushSeqNum;
private MemStoreSnapshot snapshot; private MemStoreSnapshot snapshot;

View File

@ -145,7 +145,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
FileSystem getFileSystem(); FileSystem getFileSystem();
/*
/**
* @param maxKeyCount * @param maxKeyCount
* @param compression Compression algorithm to use * @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction * @param isCompaction whether we are creating a new file in a compaction
@ -160,6 +161,26 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
boolean includesTags boolean includesTags
) throws IOException; ) throws IOException;
/**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
* @param shouldDropBehind should the writer drop caches behind writes
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
long maxKeyCount,
Compression.Algorithm compression,
boolean isCompaction,
boolean includeMVCCReadpoint,
boolean includesTags,
boolean shouldDropBehind
) throws IOException;
// Compaction oriented methods // Compaction oriented methods
boolean throttleCompaction(long compactionSize); boolean throttleCompaction(long compactionSize);

View File

@ -257,8 +257,8 @@ public class StoreFile {
} }
/** /**
* @return True if this is a StoreFile Reference; call after {@link #open()} * @return True if this is a StoreFile Reference; call
* else may get wrong answer. * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
*/ */
public boolean isReference() { public boolean isReference() {
return this.fileInfo.isReference(); return this.fileInfo.isReference();
@ -376,13 +376,13 @@ public class StoreFile {
* @throws IOException * @throws IOException
* @see #closeReader(boolean) * @see #closeReader(boolean)
*/ */
private Reader open() throws IOException { private Reader open(boolean canUseDropBehind) throws IOException {
if (this.reader != null) { if (this.reader != null) {
throw new IllegalAccessError("Already open"); throw new IllegalAccessError("Already open");
} }
// Open the StoreFile.Reader // Open the StoreFile.Reader
this.reader = fileInfo.open(this.fs, this.cacheConf); this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
// Load up indices and fileinfo. This also loads Bloom filter type. // Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@ -478,14 +478,18 @@ public class StoreFile {
return this.reader; return this.reader;
} }
public Reader createReader() throws IOException {
return createReader(false);
}
/** /**
* @return Reader for StoreFile. creates if necessary * @return Reader for StoreFile. creates if necessary
* @throws IOException * @throws IOException
*/ */
public Reader createReader() throws IOException { public Reader createReader(boolean canUseDropBehind) throws IOException {
if (this.reader == null) { if (this.reader == null) {
try { try {
this.reader = open(); this.reader = open(canUseDropBehind);
} catch (IOException e) { } catch (IOException e) {
try { try {
this.closeReader(true); this.closeReader(true);
@ -574,6 +578,8 @@ public class StoreFile {
private Path filePath; private Path filePath;
private InetSocketAddress[] favoredNodes; private InetSocketAddress[] favoredNodes;
private HFileContext fileContext; private HFileContext fileContext;
private boolean shouldDropCacheBehind = false;
public WriterBuilder(Configuration conf, CacheConfig cacheConf, public WriterBuilder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) { FileSystem fs) {
this.conf = conf; this.conf = conf;
@ -639,6 +645,11 @@ public class StoreFile {
this.fileContext = fileContext; this.fileContext = fileContext;
return this; return this;
} }
public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
this.shouldDropCacheBehind = shouldDropCacheBehind;
return this;
}
/** /**
* Create a store file writer. Client is responsible for closing file when * Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using * done. If metadata, add BEFORE closing using

View File

@ -229,21 +229,24 @@ public class StoreFileInfo {
* @return The StoreFile.Reader for the file * @return The StoreFile.Reader for the file
*/ */
public StoreFile.Reader open(final FileSystem fs, public StoreFile.Reader open(final FileSystem fs,
final CacheConfig cacheConf) throws IOException { final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
FSDataInputStreamWrapper in; FSDataInputStreamWrapper in;
FileStatus status; FileStatus status;
final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
if (this.link != null) { if (this.link != null) {
// HFileLink // HFileLink
in = new FSDataInputStreamWrapper(fs, this.link); in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
status = this.link.getFileStatus(fs); status = this.link.getFileStatus(fs);
} else if (this.reference != null) { } else if (this.reference != null) {
// HFile Reference // HFile Reference
Path referencePath = getReferredToFile(this.getPath()); Path referencePath = getReferredToFile(this.getPath());
in = new FSDataInputStreamWrapper(fs, referencePath); in = new FSDataInputStreamWrapper(fs, referencePath,
doDropBehind);
status = fs.getFileStatus(referencePath); status = fs.getFileStatus(referencePath);
} else { } else {
in = new FSDataInputStreamWrapper(fs, this.getPath()); in = new FSDataInputStreamWrapper(fs, this.getPath(),
doDropBehind);
status = fs.getFileStatus(initialPath); status = fs.getFileStatus(initialPath);
} }
long length = status.getLen(); long length = status.getLen();

View File

@ -87,7 +87,7 @@ public class StoreFileScanner implements KeyValueScanner {
boolean cacheBlocks, boolean cacheBlocks,
boolean usePread, long readPt) throws IOException { boolean usePread, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, return getScannersForStoreFiles(files, cacheBlocks,
usePread, false, readPt); usePread, false, false, readPt);
} }
/** /**
@ -95,9 +95,9 @@ public class StoreFileScanner implements KeyValueScanner {
*/ */
public static List<StoreFileScanner> getScannersForStoreFiles( public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction, long readPt) throws IOException { boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
null, readPt); useDropBehind, null, readPt);
} }
/** /**
@ -107,11 +107,12 @@ public class StoreFileScanner implements KeyValueScanner {
*/ */
public static List<StoreFileScanner> getScannersForStoreFiles( public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { boolean isCompaction, boolean canUseDrop,
ScanQueryMatcher matcher, long readPt) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>( List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size()); files.size());
for (StoreFile file : files) { for (StoreFile file : files) {
StoreFile.Reader r = file.createReader(); StoreFile.Reader r = file.createReader(canUseDrop);
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt); isCompaction, readPt);
scanner.setScanQueryMatcher(matcher); scanner.setScanQueryMatcher(matcher);

View File

@ -109,7 +109,11 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override @Override
public Writer createWriter() throws IOException { public Writer createWriter() throws IOException {
StoreFile.Writer writer = store.createWriterInTmp( StoreFile.Writer writer = store.createWriterInTmp(
kvCount, store.getFamily().getCompressionType(), false, true, true); kvCount, store.getFamily().getCompressionType(),
/* isCompaction = */ false,
/* includeMVCCReadpoint = */ true,
/* includesTags = */ true,
/* shouldDropBehind = */ false);
writer.setTimeRangeTracker(tracker); writer.setTimeRangeTracker(tracker);
return writer; return writer;
} }

View File

@ -197,8 +197,14 @@ public abstract class Compactor {
* @return Scanners. * @return Scanners.
*/ */
protected List<StoreFileScanner> createFileScanners( protected List<StoreFileScanner> createFileScanners(
final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException { final Collection<StoreFile> filesToCompact,
return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, long smallestReadPoint,
boolean useDropBehind) throws IOException {
return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
/* cache blocks = */ false,
/* use pread = */ false,
/* is compaction */ true,
/* use Drop Behind */ useDropBehind,
smallestReadPoint); smallestReadPoint);
} }

View File

@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor {
List<StoreFileScanner> scanners; List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose; Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) { if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFileFiles, and their readers // HFileFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size()); readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) { for (StoreFile f : request.getFiles()) {
readersToClose.add(new StoreFile(f)); readersToClose.add(new StoreFile(f));
} }
scanners = createFileScanners(readersToClose, smallestReadPoint); scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else { } else {
readersToClose = Collections.emptyList(); readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint); scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
} }
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
@ -81,8 +83,10 @@ public class DefaultCompactor extends Compactor {
InternalScanner scanner = null; InternalScanner scanner = null;
try { try {
/* Include deletes, unless we are doing a compaction of all files */ /* Include deletes, unless we are doing a compaction of all files */
ScanType scanType = request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES ScanType scanType =
: ScanType.COMPACT_DROP_DELETES; request.isRetainDeleteMarkers() ?
ScanType.COMPACT_RETAIN_DELETES :
ScanType.COMPACT_DROP_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) { if (scanner == null) {
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
@ -99,9 +103,11 @@ public class DefaultCompactor extends Compactor {
cleanSeqId = true; cleanSeqId = true;
} }
writer = createTmpWriter(fd, smallestReadPoint);
writer = createTmpWriter(fd, store.throttleCompaction(request.getSize()));
boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles()); throughputController, request.isAllFiles());
if (!finished) { if (!finished) {
writer.close(); writer.close();
store.getFileSystem().delete(writer.getPath(), false); store.getFileSystem().delete(writer.getPath(), false);
@ -147,18 +153,20 @@ public class DefaultCompactor extends Compactor {
/** /**
* Creates a writer for a new file in a temporary directory. * Creates a writer for a new file in a temporary directory.
* @param fd The file details. * @param fd The file details.
* @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
* @return Writer for a new StoreFile in the tmp dir. * @return Writer for a new StoreFile in the tmp dir.
* @throws IOException * @throws IOException
*/ */
protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) protected StoreFile.Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind)
throws IOException { throws IOException {
// When all MVCC readpoints are 0, don't write them. // When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389. // See HBASE-8166, HBASE-12600, and HBASE-13389.
// make this writer with tags always because of possible new cells with tags.
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
true, fd.maxMVCCReadpoint > 0, fd.maxTagsLength >0); /* isCompaction = */ true,
/* includeMVCCReadpoint = */ fd.maxMVCCReadpoint > 0,
/* includesTags = */ fd.maxTagsLength > 0,
/* shouldDropBehind = */ shouldDropBehind);
} }

View File

@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor {
throughputController); throughputController);
} }
private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request, private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException { CompactionThroughputController throughputController) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles(); final Collection<StoreFile> filesToCompact = request.getFiles();
@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor {
this.progress = new CompactionProgress(fd.maxKeyCount); this.progress = new CompactionProgress(fd.maxKeyCount);
long smallestReadPoint = getSmallestReadPoint(); long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint); List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
smallestReadPoint, store.throttleCompaction(request.getSize()));
boolean finished = false; boolean finished = false;
InternalScanner scanner = null; InternalScanner scanner = null;
@ -124,7 +125,8 @@ public class StripeCompactor extends Compactor {
@Override @Override
public Writer createWriter() throws IOException { public Writer createWriter() throws IOException {
return store.createWriterInTmp( return store.createWriterInTmp(
fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0); fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
store.throttleCompaction(request.getSize()));
} }
}; };

View File

@ -256,7 +256,8 @@ public class TestCacheOnWrite {
cacheConf = cacheConf =
new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
cowType.shouldBeCached(BlockType.LEAF_INDEX), cowType.shouldBeCached(BlockType.LEAF_INDEX),
cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false); cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData,
false, false, false);
} }
@After @After

View File

@ -386,7 +386,7 @@ public class TestPartitionedMobCompactor {
sfs.add(sf); sfs.add(sf);
} }
List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
false, null, HConstants.LATEST_TIMESTAMP); false, false, HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(hcd.getMaxVersions()); scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);

View File

@ -145,7 +145,7 @@ public class TestFSErrorsExposed {
cacheConf, BloomType.NONE); cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, Collections.singletonList(sf), false, true, false, false,
// 0 is passed as readpoint because this test operates on StoreFile directly // 0 is passed as readpoint because this test operates on StoreFile directly
0); 0);
KeyValueScanner scanner = scanners.get(0); KeyValueScanner scanner = scanners.get(0);

View File

@ -436,7 +436,7 @@ public class TestMobStoreCompaction {
numDelfiles++; numDelfiles++;
} }
} }
List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null, List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false,
HConstants.LATEST_TIMESTAMP); HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(hcd.getMaxVersions()); scan.setMaxVersions(hcd.getMaxVersions());

View File

@ -106,14 +106,14 @@ public class TestReversibleScanners {
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(Collections.singletonList(sf), false, true, .getScannersForStoreFiles(Collections.singletonList(sf),
false, Long.MAX_VALUE); false, true, false, false, Long.MAX_VALUE);
StoreFileScanner scanner = scanners.get(0); StoreFileScanner scanner = scanners.get(0);
seekTestOfReversibleKeyValueScanner(scanner); seekTestOfReversibleKeyValueScanner(scanner);
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint); LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles( scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, readPoint); Collections.singletonList(sf), false, true, false, false, readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
} }
} }
@ -494,7 +494,7 @@ public class TestReversibleScanners {
throws IOException { throws IOException {
List<StoreFileScanner> fileScanners = StoreFileScanner List<StoreFileScanner> fileScanners = StoreFileScanner
.getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
false, readPoint); false, false, readPoint);
List<KeyValueScanner> memScanners = memstore.getScanners(readPoint); List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>( List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
fileScanners.size() + 1); fileScanners.size() + 1);

View File

@ -197,7 +197,7 @@ public class TestStripeCompactor {
when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(CellComparator.COMPARATOR); when(store.getComparator()).thenReturn(CellComparator.COMPARATOR);
return new StripeCompactor(conf, store) { return new StripeCompactor(conf, store) {
@ -228,6 +228,7 @@ public class TestStripeCompactor {
.thenReturn(mock(StoreFileScanner.class)); .thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r); when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r); when(sf.createReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);
return new CompactionRequest(Arrays.asList(sf)); return new CompactionRequest(Arrays.asList(sf));
} }

View File

@ -736,6 +736,7 @@ public class TestStripeCompactionPolicy {
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
mock(StoreFileScanner.class)); mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r); when(sf.getReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);
when(sf.createReader()).thenReturn(r); when(sf.createReader()).thenReturn(r);
return sf; return sf;
} }
@ -759,7 +760,7 @@ public class TestStripeCompactionPolicy {
when(store.getRegionInfo()).thenReturn(info); when(store.getRegionInfo()).thenReturn(info);
when( when(
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean())).thenAnswer(writers); anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
final Scanner scanner = new Scanner(); final Scanner scanner = new Scanner();