HBASE-15527 Refactor Compactor related classes

This commit is contained in:
zhangduo 2016-04-09 16:18:08 +08:00
parent a4dcf51415
commit f7d44e929f
6 changed files with 317 additions and 343 deletions

View File

@ -42,11 +42,12 @@ import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -58,6 +59,45 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
private long mobSizeThreshold;
private HMobStore mobStore;
private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
if (scanType == ScanType.COMPACT_DROP_DELETES) {
// In major compaction, we need to write the delete markers to del files, so we have to
// retain the them in scanning.
scanType = ScanType.COMPACT_RETAIN_DELETES;
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, fd.earliestPutTs, true);
} else {
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, fd.earliestPutTs, false);
}
}
};
private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
@Override
public Writer createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
shouldDropBehind);
}
};
public DefaultMobStoreCompactor(Configuration conf, Store store) {
super(conf, store);
// The mob cells reside in the mob-enabled column family which is held by HMobStore.
@ -71,36 +111,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
mobSizeThreshold = store.getFamily().getMobThreshold();
}
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @param shouldDropBehind Should the writer drop behind.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException
*/
@Override
protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags.
StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
true, true, true, shouldDropBehind);
return writer;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
if (scanType == ScanType.COMPACT_DROP_DELETES) {
// In major compaction, we need to write the delete markers to del files, so we have to
// retain the them in scanning.
scanType = ScanType.COMPACT_RETAIN_DELETES;
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs, true);
} else {
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs, false);
}
public List<Path> compact(CompactionRequest request, ThroughputController throughputController,
User user) throws IOException {
return compact(request, scannerFactory, writerFactory, throughputController, user);
}
// TODO refactor to take advantage of the throughput controller.
@ -166,7 +180,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
boolean hasMore;
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
byte[] fileName = null;
StoreFile.Writer mobFileWriter = null, delFileWriter = null;
Writer mobFileWriter = null, delFileWriter = null;
long mobCells = 0, deleteMarkersCount = 0;
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
store.getTableName().getName());

View File

@ -18,11 +18,6 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,16 +28,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.io.Closeables;
/**
* Base class for implementing a Compactor which will generate multiple output files after
@ -50,7 +38,7 @@ import com.google.common.io.Closeables;
*/
@InterfaceAudience.Private
public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWriter>
extends Compactor {
extends Compactor<T> {
private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class);
@ -58,104 +46,31 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
super(conf, store);
}
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
final FileDetails fd, final boolean shouldDropBehind) {
WriterFactory writerFactory = new WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
writer.init(storeScanner, writerFactory);
}
protected List<Path> compact(T writer, final CompactionRequest request,
InternalScannerFactory scannerFactory, ThroughputController throughputController, User user)
throws IOException {
final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
InternalScanner scanner = null;
boolean finished = false;
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
boolean cleanSeqId = false;
if (fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
// Create the writer factory for compactions.
final boolean needMvcc = fd.maxMVCCReadpoint >= 0;
WriterFactory writerFactory = new WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc,
fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner
= (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
writer.init(storeScanner, writerFactory);
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles());
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
}
} finally {
Closeables.close(scanner, true);
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException e) {
LOG.warn("Exception closing " + f, e);
}
}
if (!finished) {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
fs.delete(leftoverFile, false);
} catch (IOException e) {
LOG.error("Failed to delete the leftover file " + leftoverFile
+ " after an unfinished compaction.",
e);
}
}
@Override
protected void abortWriter(T writer) throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
fs.delete(leftoverFile, false);
} catch (IOException e) {
LOG.warn(
"Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
e);
}
}
assert finished : "We should have exited the method on all error paths";
return commitMultiWriter(writer, fd, request);
}
protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
CompactionRequest request) throws IOException;
}

View File

@ -22,12 +22,14 @@ import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -55,15 +59,17 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import com.google.common.io.Closeables;
/**
* A compactor is a compaction algorithm associated a given policy. Base class also contains
* reusable parts for implementing compactors (what is common and what isn't is evolving).
*/
@InterfaceAudience.Private
public abstract class Compactor {
public abstract class Compactor<T extends CellSink> {
private static final Log LOG = LogFactory.getLog(Compactor.class);
private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected CompactionProgress progress;
protected volatile CompactionProgress progress;
protected final Configuration conf;
protected final Store store;
@ -89,6 +95,11 @@ public abstract class Compactor {
void append(Cell cell) throws IOException;
}
protected interface CellSinkFactory<S> {
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
throws IOException;
}
public CompactionProgress getProgress() {
return this.progress;
}
@ -145,7 +156,7 @@ public abstract class Compactor {
fd.maxKeyCount += keyCount;
// calculate the latest MVCC readpoint in any of the involved store files
Map<byte[], byte[]> fileInfo = r.loadFileInfo();
byte tmp[] = null;
byte[] tmp = null;
// Get and set the real MVCCReadpoint for bulk loaded files, which is the
// SeqId number.
if (r.isBulkLoaded()) {
@ -217,6 +228,120 @@ public abstract class Compactor {
return store.getSmallestReadPoint();
}
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
}
protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
};
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
*/
protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
/* isCompaction = */true,
/* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0,
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
}
protected List<Path> compact(final CompactionRequest request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
T writer = null;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
InternalScanner scanner = null;
boolean finished = false;
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
boolean cleanSeqId = false;
if (fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize()));
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles());
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
}
} finally {
Closeables.close(scanner, true);
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException e) {
LOG.warn("Exception closing " + f, e);
}
}
if (!finished && writer != null) {
abortWriter(writer);
}
}
assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error";
return commitWriter(writer, fd, request);
}
protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest request)
throws IOException;
protected abstract void abortWriter(T writer) throws IOException;
/**
* Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
* @param request Compaction request.
@ -233,7 +358,9 @@ public abstract class Compactor {
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
User user) throws IOException {
if (store.getCoprocessorHost() == null) return null;
if (store.getCoprocessorHost() == null) {
return null;
}
if (user == null) {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request);
@ -261,25 +388,27 @@ public abstract class Compactor {
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) return scanner;
if (user == null) {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
if (store.getCoprocessorHost() == null) {
return scanner;
}
if (user == null) {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
/**
@ -288,7 +417,8 @@ public abstract class Compactor {
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= smallestReadPoint
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
@ -421,17 +551,4 @@ public abstract class Compactor {
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
/**
* Appends the metadata and closes the writer.
* @param writer The current store writer.
* @param fd The file details.
* @param isMajor Is a major compaction.
* @throws IOException
*/
protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
boolean isMajor) throws IOException {
writer.appendMetadata(fd.maxSeqId, isMajor);
writer.close();
}
}

View File

@ -27,10 +27,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -52,34 +50,29 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId();
}
public List<Path> compact(final CompactionRequest request, List<Long> lowerBoundaries,
public List<Path> compact(final CompactionRequest request, final List<Long> lowerBoundaries,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size()
+ "windows, lower boundaries: " + lowerBoundaries);
}
DateTieredMultiFileWriter writer =
new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request));
return compact(writer, request, new InternalScannerFactory() {
return compact(request, defaultScannerFactory,
new CellSinkFactory<DateTieredMultiFileWriter>() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
}, throughputController, user);
@Override
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
}
}, throughputController, user);
}
@Override
protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd,
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
}

View File

@ -18,166 +18,59 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.collect.Lists;
/**
* Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, ThroughputController, User)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
public class DefaultCompactor extends Compactor<Writer> {
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
public DefaultCompactor(final Configuration conf, final Store store) {
super(conf, store);
}
private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
@Override
public Writer createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
public List<Path> compact(final CompactionRequest request,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
boolean cleanSeqId = false;
IOException e = null;
try {
InternalScanner scanner = null;
try {
/* Include deletes, unless we are doing a compaction of all files */
ScanType scanType =
request.isRetainDeleteMarkers() ?
ScanType.COMPACT_RETAIN_DELETES :
ScanType.COMPACT_DROP_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
if (scanner == null) {
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return newFiles;
}
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see HBASE-6059
if(fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
writer = createTmpWriter(fd, store.throttleCompaction(request.getSize()));
boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles());
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
writer = null;
throw new InterruptedIOException("Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
} finally {
if (scanner != null) {
scanner.close();
}
}
} catch (IOException ioe) {
e = ioe;
// Throw the exception
throw ioe;
}
finally {
try {
if (writer != null) {
if (e != null) {
writer.close();
} else {
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close();
newFiles.add(writer.getPath());
}
}
} finally {
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException ioe) {
LOG.warn("Exception closing " + f, ioe);
}
}
}
}
return newFiles;
return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
}
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException
*/
protected StoreFile.Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind)
throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
/* isCompaction = */ true,
/* includeMVCCReadpoint = */ fd.maxMVCCReadpoint > 0,
/* includesTags = */ fd.maxTagsLength > 0,
/* shouldDropBehind = */ shouldDropBehind);
}
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}.
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
* made it through the compaction.
@ -187,6 +80,32 @@ public class DefaultCompactor extends Compactor {
throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
return compact(cr, NoLimitThroughputController.INSTANCE, null);
}
@Override
protected List<Path> commitWriter(Writer writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close();
return newFiles;
}
@Override
protected void abortWriter(Writer writer) throws IOException {
Path leftoverFile = writer.getPath();
try {
writer.close();
} catch (IOException e) {
LOG.warn("Failed to close the writer after an unfinished compaction.", e);
}
try {
store.getFileSystem().delete(leftoverFile, false);
} catch (IOException e) {
LOG.warn(
"Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
e);
}
}
}

View File

@ -68,15 +68,17 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners,
scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store,
scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
return (majorRangeFromRow == null)
? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs)
: StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
majorRangeFromRow, majorRangeToRow);
}
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController,
User user) throws IOException {
public List<Path> compact(CompactionRequest request, final List<byte[]> targetBoundaries,
final byte[] majorRangeFromRow, final byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -85,30 +87,44 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
LOG.debug(sb.toString());
}
StripeMultiFileWriter writer =
new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries,
majorRangeFromRow, majorRangeToRow);
return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
majorRangeToRow), throughputController, user);
return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() {
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
}
}, throughputController, user);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
public List<Path> compact(CompactionRequest request, final int targetCount, final long targetSize,
final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize + " target file size, no more than "
+ targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right)
+ "] range");
LOG.debug(
"Executing compaction with " + targetSize + " target file size, no more than " + targetCount
+ " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
}
StripeMultiFileWriter writer =
new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize,
left, right);
return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
majorRangeToRow), throughputController, user);
return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() {
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
store.getComparator(), targetCount, targetSize, left, right);
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
}
}, throughputController, user);
}
@Override
protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd,
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";