HBASE-8080 refactor default compactor to make its parts easier to reuse

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1458049 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-03-18 22:33:59 +00:00
parent 421e3a4847
commit dce86001ea
3 changed files with 202 additions and 142 deletions

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@ -707,7 +708,7 @@ public class StoreFile {
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
*/
public static class Writer {
public static class Writer implements Compactor.CellSink {
private final BloomFilterWriter generalBloomFilterWriter;
private final BloomFilterWriter deleteFamilyBloomFilterWriter;
private final BloomType bloomType;

View File

@ -18,25 +18,59 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.CellOutputStream;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
/**
* A compactor is a compaction algorithm associated a given policy.
* A compactor is a compaction algorithm associated a given policy. Base class also contains
* reusable parts for implementing compactors (what is common and what isn't is evolving).
*/
@InterfaceAudience.Private
public abstract class Compactor {
private static final Log LOG = LogFactory.getLog(Compactor.class);
protected CompactionProgress progress;
protected Configuration conf;
protected Store store;
Compactor(final Configuration conf) {
private int compactionKVMax;
protected Compression.Algorithm compactionCompression;
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(final Configuration conf, final Store store) {
this.conf = conf;
this.store = store;
this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
this.compactionCompression = (this.store.getFamily() == null) ?
Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
}
/**
* TODO: Replace this with {@link CellOutputStream} when StoreFile.Writer uses cells.
*/
public interface CellSink {
void append(KeyValue kv) throws IOException;
}
/**
@ -68,4 +102,137 @@ public abstract class Compactor {
public CompactionProgress getProgress() {
return this.progress;
}
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
protected static class FileDetails {
/** Maximum key count after compaction (for blooms) */
public int maxKeyCount = 0;
/** Earliest put timestamp if major compaction */
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
/** The last key in the files we're compacting. */
public long maxSeqId = 0;
}
protected FileDetails getFileDetails(
Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
FileDetails fd = new FileDetails();
for (StoreFile file : filesToCompact) {
long seqNum = file.getMaxSequenceId();
fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
StoreFile.Reader r = file.getReader();
if (r == null) {
LOG.warn("Null reader for " + file.getPath());
continue;
}
// NOTE: getFilterEntries could cause under-sized blooms if the user
// switches bloom type (e.g. from ROW to ROWCOL)
long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())
? r.getFilterEntries() : r.getEntries();
fd.maxKeyCount += keyCount;
// If required, calculate the earliest put timestamp of all involved storefiles.
// This is used to remove family delete marker during compaction.
long earliestPutTs = 0;
if (calculatePutTs) {
byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// There's a file with no information, must be an old one
// assume we have very old puts
fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
} else {
earliestPutTs = Bytes.toLong(tmp);
fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
", bloomtype=" + r.getBloomFilterType().toString() +
", size=" + StringUtils.humanReadableInt(r.length()) +
", encoding=" + r.getHFileReader().getEncodingOnDisk() +
", seqNum=" + seqNum +
(calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
}
}
return fd;
}
protected List<StoreFileScanner> createFileScanners(
final Collection<StoreFile> filesToCompact) throws IOException {
return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true);
}
protected long setSmallestReadPoint() {
long smallestReadPoint = store.getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
return smallestReadPoint;
}
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
if (store.getCoprocessorHost() == null) return null;
return store.getCoprocessorHost()
.preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
}
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
ScanType scanType, InternalScanner scanner) throws IOException {
if (store.getCoprocessorHost() == null) return scanner;
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
@SuppressWarnings("deprecation")
protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<KeyValue> kvs = new ArrayList<KeyValue>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval();
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
// output to writer:
for (KeyValue kv : kvs) {
if (kv.getMemstoreTS() <= smallestReadPoint) {
kv.setMemstoreTS(0);
}
writer.append(kv);
// update progress per key
++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
if (!store.areWritesEnabled()) return false;
}
}
}
kvs.clear();
} while (hasMore);
return true;
}
protected void abortWriter(final StoreFile.Writer writer) throws IOException {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
}
/**
* @param scanners Store file scanners.
* @param scanType Scan type.
* @param smallestReadPoint Smallest MVCC read point.
* @param earliestPutTs Earliest put across all files.
* @return A compaction scanner.
*/
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs);
}
}

View File

@ -49,170 +49,62 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
private final Store store;
public DefaultCompactor(final Configuration conf, final Store store) {
super(conf);
this.store = store;
super(conf, store);
}
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
@SuppressWarnings("deprecation")
@Override
public List<Path> compact(final CompactionRequest request) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
boolean majorCompaction = request.isMajor();
// Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Calculate maximum key count after compaction (for blooms)
// Also calculate earliest put timestamp if major compaction
int maxKeyCount = 0;
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
ScanType scanType = request.isMajor()
? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
for (StoreFile file: filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r == null) {
LOG.warn("Null reader for " + file.getPath());
continue;
}
// NOTE: getFilterEntries could cause under-sized blooms if the user
// switches bloom type (e.g. from ROW to ROWCOL)
long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())?
r.getFilterEntries() : r.getEntries();
maxKeyCount += keyCount;
// For major compactions calculate the earliest put timestamp of all
// involved storefiles. This is used to remove family delete marker during
// compaction.
if (scanType == ScanType.COMPACT_DROP_DELETES) {
byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// There's a file with no information, must be an old one
// assume we have very old puts
earliestPutTs = HConstants.OLDEST_TIMESTAMP;
} else {
earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
", bloomtype=" + r.getBloomFilterType().toString() +
", size=" + StringUtils.humanReadableInt(r.length()) +
", encoding=" + r.getHFileReader().getEncodingOnDisk() +
(majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
}
}
List<StoreFileScanner> scanners = createFileScanners(request.getFiles());
// keep track of compaction progress
this.progress = new CompactionProgress(maxKeyCount);
// For each file, obtain a scanner:
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(filesToCompact, false, false, true);
// Get some configs
int compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
Compression.Algorithm compression = store.getFamily().getCompression();
// Avoid overriding compression setting for major compactions if the user
// has not specified it separately
Compression.Algorithm compactionCompression =
(store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
store.getFamily().getCompactionCompression(): compression;
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
// Find the smallest read point across all the Scanners.
long smallestReadPoint = store.getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
long smallestReadPoint = setSmallestReadPoint();
try {
InternalScanner scanner = null;
try {
if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost()
.preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
}
/* Include deletes, unless we are doing a major compaction */
ScanType scanType =
request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
/* Include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs);
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
}
if (store.getCoprocessorHost() != null) {
InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner,
scanType, request);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return newFiles; // an empty list
}
scanner = cpScanner;
}
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<KeyValue> kvs = new ArrayList<KeyValue>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval();
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see
// HBASE-6059
if (writer == null) {
writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
}
if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
if (kv.getMemstoreTS() <= smallestReadPoint) {
kv.setMemstoreTS(0);
}
writer.append(kv);
// update progress per key
++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
isInterrupted(store, writer);
}
}
}
}
kvs.clear();
} while (hasMore);
} finally {
if (scanner != null) {
scanner.close();
scanner = postCreateCoprocScanner(request, scanType, scanner);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return newFiles;
}
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see HBASE-6059
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true);
boolean finished = performCompaction(scanner, writer, smallestReadPoint);
if (!finished) {
abortWriter(writer);
writer = null;
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
} finally {
if (scanner != null) {
scanner.close();
}
}
} finally {
if (writer != null) {
writer.appendMetadata(maxId, majorCompaction);
writer.appendMetadata(fd.maxSeqId, request.isMajor());
writer.close();
newFiles.add(writer.getPath());
}
}
return newFiles;
}
void isInterrupted(final Store store, final StoreFile.Writer writer)
throws IOException {
if (store.areWritesEnabled()) return;
// Else cleanup.
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
}