HBASE-15527 Refactor Compactor related classes

This commit is contained in:
zhangduo 2016-04-09 22:36:34 +08:00
parent 2b75562701
commit 7303c7e479
5 changed files with 274 additions and 282 deletions

View File

@ -18,11 +18,6 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,16 +28,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.io.Closeables;
/**
* Base class for implementing a Compactor which will generate multiple output files after
@ -50,7 +38,7 @@ import com.google.common.io.Closeables;
*/
@InterfaceAudience.Private
public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWriter>
extends Compactor {
extends Compactor<T> {
private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class);
@ -58,104 +46,31 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
super(conf, store);
}
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
final FileDetails fd, final boolean shouldDropBehind) {
WriterFactory writerFactory = new WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
writer.init(storeScanner, writerFactory);
}
protected List<Path> compact(T writer, final CompactionRequest request,
InternalScannerFactory scannerFactory, ThroughputController throughputController, User user)
throws IOException {
final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
InternalScanner scanner = null;
boolean finished = false;
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
boolean cleanSeqId = false;
if (fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
// Create the writer factory for compactions.
final boolean needMvcc = fd.maxMVCCReadpoint >= 0;
WriterFactory writerFactory = new WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc,
fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner =
(scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
writer.init(storeScanner, writerFactory);
finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId,
throughputController);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
}
} finally {
Closeables.close(scanner, true);
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException e) {
LOG.warn("Exception closing " + f, e);
}
}
if (!finished) {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
fs.delete(leftoverFile, false);
} catch (IOException e) {
LOG.error("Failed to delete the leftover file " + leftoverFile
+ " after an unfinished compaction.",
e);
}
}
@Override
protected void abortWriter(T writer) throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
fs.delete(leftoverFile, false);
} catch (IOException e) {
LOG.warn(
"Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
e);
}
}
assert finished : "We should have exited the method on all error paths";
return commitMultiWriter(writer, fd, request);
}
protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
CompactionRequest request) throws IOException;
}

View File

@ -22,12 +22,16 @@ import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.common.io.Closeables;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
@ -43,8 +47,10 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -57,9 +63,11 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
* reusable parts for implementing compactors (what is common and what isn't is evolving).
*/
@InterfaceAudience.Private
public abstract class Compactor {
public abstract class Compactor<T extends CellSink> {
private static final Log LOG = LogFactory.getLog(Compactor.class);
protected CompactionProgress progress;
protected volatile CompactionProgress progress;
protected final Configuration conf;
protected final Store store;
@ -85,6 +93,11 @@ public abstract class Compactor {
void append(Cell cell) throws IOException;
}
protected interface CellSinkFactory<S> {
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
throws IOException;
}
public CompactionProgress getProgress() {
return this.progress;
}
@ -139,7 +152,7 @@ public abstract class Compactor {
fd.maxKeyCount += keyCount;
// calculate the latest MVCC readpoint in any of the involved store files
Map<byte[], byte[]> fileInfo = r.loadFileInfo();
byte tmp[] = null;
byte[] tmp = null;
// Get and set the real MVCCReadpoint for bulk loaded files, which is the
// SeqId number.
if (r.isBulkLoaded()) {
@ -203,6 +216,120 @@ public abstract class Compactor {
return store.getSmallestReadPoint();
}
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
}
protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
: ScanType.COMPACT_RETAIN_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
};
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
*/
protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
/* isCompaction = */true,
/* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0,
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
}
protected List<Path> compact(final CompactionRequest request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
T writer = null;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
InternalScanner scanner = null;
boolean finished = false;
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
boolean cleanSeqId = false;
if (fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize()));
finished =
performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
}
} finally {
Closeables.close(scanner, true);
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException e) {
LOG.warn("Exception closing " + f, e);
}
}
if (!finished && writer != null) {
abortWriter(writer);
}
}
assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error";
return commitWriter(writer, fd, request);
}
protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest request)
throws IOException;
protected abstract void abortWriter(T writer) throws IOException;
/**
* Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
* @param request Compaction request.
@ -219,7 +346,9 @@ public abstract class Compactor {
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
User user) throws IOException {
if (store.getCoprocessorHost() == null) return null;
if (store.getCoprocessorHost() == null) {
return null;
}
if (user == null) {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request);
@ -247,25 +376,27 @@ public abstract class Compactor {
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) return scanner;
if (user == null) {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
if (store.getCoprocessorHost() == null) {
return scanner;
}
if (user == null) {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
/**
@ -273,7 +404,8 @@ public abstract class Compactor {
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= smallestReadPoint
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(InternalScanner scanner, CellSink writer,

View File

@ -27,10 +27,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -52,34 +50,29 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId();
}
public List<Path> compact(final CompactionRequest request, List<Long> lowerBoundaries,
public List<Path> compact(final CompactionRequest request, final List<Long> lowerBoundaries,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size()
+ "windows, lower boundaries: " + lowerBoundaries);
}
DateTieredMultiFileWriter writer =
new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request));
return compact(writer, request, new InternalScannerFactory() {
return compact(request, defaultScannerFactory,
new CellSinkFactory<DateTieredMultiFileWriter>() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
: ScanType.COMPACT_RETAIN_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
}, throughputController, user);
@Override
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
}
}, throughputController, user);
}
@Override
protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd,
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
}

View File

@ -18,149 +18,59 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.collect.Lists;
/**
* Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, ThroughputController, User)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
public class DefaultCompactor extends Compactor<Writer> {
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
public DefaultCompactor(final Configuration conf, final Store store) {
super(conf, store);
}
private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
@Override
public Writer createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
public List<Path> compact(final CompactionRequest request,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
boolean cleanSeqId = false;
IOException e = null;
try {
InternalScanner scanner = null;
try {
/* Include deletes, unless we are doing a compaction of all files */
ScanType scanType =
request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
if (scanner == null) {
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return newFiles;
}
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see HBASE-6059
if(fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
boolean finished =
performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
writer = null;
throw new InterruptedIOException("Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
} finally {
if (scanner != null) {
scanner.close();
}
}
} catch (IOException ioe) {
e = ioe;
// Throw the exception
throw ioe;
}
finally {
try {
if (writer != null) {
if (e != null) {
writer.close();
} else {
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close();
newFiles.add(writer.getPath());
}
}
} finally {
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException ioe) {
LOG.warn("Exception closing " + f, ioe);
}
}
}
}
return newFiles;
return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
}
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}.
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
* made it through the compaction.
@ -170,6 +80,32 @@ public class DefaultCompactor extends Compactor {
throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
return compact(cr, NoLimitThroughputController.INSTANCE, null);
}
@Override
protected List<Path> commitWriter(Writer writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close();
return newFiles;
}
@Override
protected void abortWriter(Writer writer) throws IOException {
Path leftoverFile = writer.getPath();
try {
writer.close();
} catch (IOException e) {
LOG.warn("Failed to close the writer after an unfinished compaction.", e);
}
try {
store.getFileSystem().delete(leftoverFile, false);
} catch (IOException e) {
LOG.warn(
"Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
e);
}
}
}

View File

@ -68,15 +68,17 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners,
scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store,
scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
return (majorRangeFromRow == null)
? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs)
: StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
majorRangeFromRow, majorRangeToRow);
}
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController,
User user) throws IOException {
public List<Path> compact(CompactionRequest request, final List<byte[]> targetBoundaries,
final byte[] majorRangeFromRow, final byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -85,30 +87,44 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
LOG.debug(sb.toString());
}
StripeMultiFileWriter writer =
new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries,
majorRangeFromRow, majorRangeToRow);
return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
majorRangeToRow), throughputController, user);
return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() {
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
}
}, throughputController, user);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
public List<Path> compact(CompactionRequest request, final int targetCount, final long targetSize,
final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize + " target file size, no more than "
+ targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right)
+ "] range");
LOG.debug(
"Executing compaction with " + targetSize + " target file size, no more than " + targetCount
+ " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
}
StripeMultiFileWriter writer =
new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize,
left, right);
return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
majorRangeToRow), throughputController, user);
return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() {
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
store.getComparator(), targetCount, targetSize, left, right);
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
}
}, throughputController, user);
}
@Override
protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd,
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";