diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index c4095adc6b6..ef39a6cad2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -18,11 +18,6 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,16 +28,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; -import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; - -import com.google.common.io.Closeables; /** * Base class for implementing a Compactor which will generate multiple output files after @@ -50,7 +38,7 @@ import com.google.common.io.Closeables; */ @InterfaceAudience.Private public abstract class AbstractMultiOutputCompactor - extends Compactor { + extends Compactor { private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class); @@ -58,104 +46,31 @@ public abstract class AbstractMultiOutputCompactor scanners, ScanType scanType, - FileDetails fd, long smallestReadPoint) throws IOException; + protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner, + final FileDetails fd, final boolean shouldDropBehind) { + WriterFactory writerFactory = new WriterFactory() { + @Override + public Writer createWriter() throws IOException { + return createTmpWriter(fd, shouldDropBehind); + } + }; + // Prepare multi-writer, and perform the compaction using scanner and writer. + // It is ok here if storeScanner is null. + StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; + writer.init(storeScanner, writerFactory); } - protected List compact(T writer, final CompactionRequest request, - InternalScannerFactory scannerFactory, ThroughputController throughputController, User user) - throws IOException { - final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); - this.progress = new CompactionProgress(fd.maxKeyCount); - - // Find the smallest read point across all the Scanners. - long smallestReadPoint = getSmallestReadPoint(); - - List scanners; - Collection readersToClose; - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { - // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, - // HFiles, and their readers - readersToClose = new ArrayList(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { - readersToClose.add(f.cloneForReader()); - } - scanners = createFileScanners(readersToClose, smallestReadPoint, - store.throttleCompaction(request.getSize())); - } else { - readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, - store.throttleCompaction(request.getSize())); - } - InternalScanner scanner = null; - boolean finished = false; - try { - /* Include deletes, unless we are doing a major compaction */ - ScanType scanType = scannerFactory.getScanType(request); - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); - if (scanner == null) { - scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); - } - scanner = postCreateCoprocScanner(request, scanType, scanner, user); - if (scanner == null) { - // NULL scanner returned from coprocessor hooks means skip normal processing. - return new ArrayList(); - } - boolean cleanSeqId = false; - if (fd.minSeqIdToKeep > 0) { - smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); - cleanSeqId = true; - } - // Create the writer factory for compactions. - final boolean needMvcc = fd.maxMVCCReadpoint >= 0; - WriterFactory writerFactory = new WriterFactory() { - @Override - public Writer createWriter() throws IOException { - return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc, - fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); - } - }; - // Prepare multi-writer, and perform the compaction using scanner and writer. - // It is ok here if storeScanner is null. - StoreScanner storeScanner = - (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; - writer.init(storeScanner, writerFactory); - finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, - throughputController); - if (!finished) { - throw new InterruptedIOException("Aborting compaction of store " + store + " in region " - + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); - } - } finally { - Closeables.close(scanner, true); - for (StoreFile f : readersToClose) { - try { - f.closeReader(true); - } catch (IOException e) { - LOG.warn("Exception closing " + f, e); - } - } - if (!finished) { - FileSystem fs = store.getFileSystem(); - for (Path leftoverFile : writer.abortWriters()) { - try { - fs.delete(leftoverFile, false); - } catch (IOException e) { - LOG.error("Failed to delete the leftover file " + leftoverFile - + " after an unfinished compaction.", - e); - } - } + @Override + protected void abortWriter(T writer) throws IOException { + FileSystem fs = store.getFileSystem(); + for (Path leftoverFile : writer.abortWriters()) { + try { + fs.delete(leftoverFile, false); + } catch (IOException e) { + LOG.warn( + "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.", + e); } } - assert finished : "We should have exited the method on all error paths"; - return commitMultiWriter(writer, fd, request); } - - protected abstract List commitMultiWriter(T writer, FileDetails fd, - CompactionRequest request) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 52c711b716d..c6fc0c6dcb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -22,12 +22,16 @@ import java.io.InterruptedIOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import com.google.common.io.Closeables; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -43,8 +47,10 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; @@ -57,9 +63,11 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; * reusable parts for implementing compactors (what is common and what isn't is evolving). */ @InterfaceAudience.Private -public abstract class Compactor { +public abstract class Compactor { private static final Log LOG = LogFactory.getLog(Compactor.class); - protected CompactionProgress progress; + + protected volatile CompactionProgress progress; + protected final Configuration conf; protected final Store store; @@ -85,6 +93,11 @@ public abstract class Compactor { void append(Cell cell) throws IOException; } + protected interface CellSinkFactory { + S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) + throws IOException; + } + public CompactionProgress getProgress() { return this.progress; } @@ -139,7 +152,7 @@ public abstract class Compactor { fd.maxKeyCount += keyCount; // calculate the latest MVCC readpoint in any of the involved store files Map fileInfo = r.loadFileInfo(); - byte tmp[] = null; + byte[] tmp = null; // Get and set the real MVCCReadpoint for bulk loaded files, which is the // SeqId number. if (r.isBulkLoaded()) { @@ -203,6 +216,120 @@ public abstract class Compactor { return store.getSmallestReadPoint(); } + protected interface InternalScannerFactory { + + ScanType getScanType(CompactionRequest request); + + InternalScanner createScanner(List scanners, ScanType scanType, + FileDetails fd, long smallestReadPoint) throws IOException; + } + + protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { + + @Override + public ScanType getScanType(CompactionRequest request) { + return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES + : ScanType.COMPACT_RETAIN_DELETES; + } + + @Override + public InternalScanner createScanner(List scanners, ScanType scanType, + FileDetails fd, long smallestReadPoint) throws IOException { + return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint, + fd.earliestPutTs); + } + }; + + /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException if creation failed + */ + protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException { + // When all MVCC readpoints are 0, don't write them. + // See HBASE-8166, HBASE-12600, and HBASE-13389. + return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + /* isCompaction = */true, + /* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0, + /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind); + } + + protected List compact(final CompactionRequest request, + InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, + ThroughputController throughputController, User user) throws IOException { + FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); + this.progress = new CompactionProgress(fd.maxKeyCount); + + // Find the smallest read point across all the Scanners. + long smallestReadPoint = getSmallestReadPoint(); + + List scanners; + Collection readersToClose; + T writer = null; + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(f.cloneForReader()); + } + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); + } + InternalScanner scanner = null; + boolean finished = false; + try { + /* Include deletes, unless we are doing a major compaction */ + ScanType scanType = scannerFactory.getScanType(request); + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + if (scanner == null) { + scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); + } + scanner = postCreateCoprocScanner(request, scanType, scanner, user); + if (scanner == null) { + // NULL scanner returned from coprocessor hooks means skip normal processing. + return new ArrayList(); + } + boolean cleanSeqId = false; + if (fd.minSeqIdToKeep > 0) { + smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); + cleanSeqId = true; + } + writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize())); + finished = + performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController); + if (!finished) { + throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); + } + } finally { + Closeables.close(scanner, true); + for (StoreFile f : readersToClose) { + try { + f.closeReader(true); + } catch (IOException e) { + LOG.warn("Exception closing " + f, e); + } + } + if (!finished && writer != null) { + abortWriter(writer); + } + } + assert finished : "We should have exited the method on all error paths"; + assert writer != null : "Writer should be non-null if no error"; + return commitWriter(writer, fd, request); + } + + protected abstract List commitWriter(T writer, FileDetails fd, CompactionRequest request) + throws IOException; + + protected abstract void abortWriter(T writer) throws IOException; + /** * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation. * @param request Compaction request. @@ -219,7 +346,9 @@ public abstract class Compactor { protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, final ScanType scanType, final long earliestPutTs, final List scanners, User user) throws IOException { - if (store.getCoprocessorHost() == null) return null; + if (store.getCoprocessorHost() == null) { + return null; + } if (user == null) { return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request); @@ -247,25 +376,27 @@ public abstract class Compactor { * @param scanner The default scanner created for compaction. * @return Scanner scanner to use (usually the default); null if compaction should not proceed. */ - protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, + protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, final ScanType scanType, final InternalScanner scanner, User user) throws IOException { - if (store.getCoprocessorHost() == null) return scanner; - if (user == null) { - return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); - } else { - try { - return user.getUGI().doAs(new PrivilegedExceptionAction() { - @Override - public InternalScanner run() throws Exception { - return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); - } - }); - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } + if (store.getCoprocessorHost() == null) { + return scanner; + } + if (user == null) { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** @@ -273,7 +404,8 @@ public abstract class Compactor { * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. - * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= + * smallestReadPoint * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(InternalScanner scanner, CellSink writer, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index a0a10380a85..b1203c5ccdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -27,10 +27,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; @@ -52,34 +50,29 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor compact(final CompactionRequest request, List lowerBoundaries, + public List compact(final CompactionRequest request, final List lowerBoundaries, ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + lowerBoundaries.size() + "windows, lower boundaries: " + lowerBoundaries); } - DateTieredMultiFileWriter writer = - new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request)); - return compact(writer, request, new InternalScannerFactory() { + return compact(request, defaultScannerFactory, + new CellSinkFactory() { - @Override - public ScanType getScanType(CompactionRequest request) { - return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES - : ScanType.COMPACT_RETAIN_DELETES; - } - - @Override - public InternalScanner createScanner(List scanners, ScanType scanType, - FileDetails fd, long smallestReadPoint) throws IOException { - return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint, - fd.earliestPutTs); - } - }, throughputController, user); + @Override + public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, + boolean shouldDropBehind) throws IOException { + DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries, + needEmptyFile(request)); + initMultiWriter(writer, scanner, fd, shouldDropBehind); + return writer; + } + }, throughputController, user); } @Override - protected List commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd, + protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequest request) throws IOException { return writer.commitWriters(fd.maxSeqId, request.isAllFiles()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 5a8dbc5f6c4..9759d2b9224 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -18,149 +18,59 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import com.google.common.collect.Lists; + /** * Compact passed set of files. Create an instance and then call * {@link #compact(CompactionRequest, ThroughputController, User)} */ @InterfaceAudience.Private -public class DefaultCompactor extends Compactor { +public class DefaultCompactor extends Compactor { private static final Log LOG = LogFactory.getLog(DefaultCompactor.class); public DefaultCompactor(final Configuration conf, final Store store) { super(conf, store); } + private final CellSinkFactory writerFactory = new CellSinkFactory() { + + @Override + public Writer createWriter(InternalScanner scanner, + org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, + boolean shouldDropBehind) throws IOException { + return createTmpWriter(fd, shouldDropBehind); + } + }; + /** * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, ThroughputController throughputController, User user) throws IOException { - FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); - this.progress = new CompactionProgress(fd.maxKeyCount); - - // Find the smallest read point across all the Scanners. - long smallestReadPoint = getSmallestReadPoint(); - - List scanners; - Collection readersToClose; - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { - // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, - // HFiles, and their readers - readersToClose = new ArrayList(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { - readersToClose.add(f.cloneForReader()); - } - scanners = createFileScanners(readersToClose, smallestReadPoint, - store.throttleCompaction(request.getSize())); - } else { - readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, - store.throttleCompaction(request.getSize())); - } - - StoreFile.Writer writer = null; - List newFiles = new ArrayList(); - boolean cleanSeqId = false; - IOException e = null; - try { - InternalScanner scanner = null; - try { - /* Include deletes, unless we are doing a compaction of all files */ - - ScanType scanType = - request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user); - if (scanner == null) { - scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); - } - scanner = postCreateCoprocScanner(request, scanType, scanner, user); - if (scanner == null) { - // NULL scanner returned from coprocessor hooks means skip normal processing. - return newFiles; - } - // Create the writer even if no kv(Empty store file is also ok), - // because we need record the max seq id for the store file, see HBASE-6059 - if(fd.minSeqIdToKeep > 0) { - smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); - cleanSeqId = true; - } - - // When all MVCC readpoints are 0, don't write them. - // See HBASE-8166, HBASE-12600, and HBASE-13389. - writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); - - boolean finished = - performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController); - - - if (!finished) { - writer.close(); - store.getFileSystem().delete(writer.getPath(), false); - writer = null; - throw new InterruptedIOException("Aborting compaction of store " + store + - " in region " + store.getRegionInfo().getRegionNameAsString() + - " because it was interrupted."); - } - } finally { - if (scanner != null) { - scanner.close(); - } - } - } catch (IOException ioe) { - e = ioe; - // Throw the exception - throw ioe; - } - finally { - try { - if (writer != null) { - if (e != null) { - writer.close(); - } else { - writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); - writer.close(); - newFiles.add(writer.getPath()); - } - } - } finally { - for (StoreFile f : readersToClose) { - try { - f.closeReader(true); - } catch (IOException ioe) { - LOG.warn("Exception closing " + f, ioe); - } - } - } - } - return newFiles; + return compact(request, defaultScannerFactory, writerFactory, throughputController, user); } /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to * {@link #compact(CompactionRequest, ThroughputController, User)}; - * @param filesToCompact the files to compact. These are used as the compactionSelection for - * the generated {@link CompactionRequest}. + * @param filesToCompact the files to compact. These are used as the compactionSelection for the + * generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) * @return Product of compaction or an empty list if all cells expired or deleted and nothing \ * made it through the compaction. @@ -170,6 +80,32 @@ public class DefaultCompactor extends Compactor { throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor, isMajor); - return this.compact(cr, NoLimitThroughputController.INSTANCE, null); + return compact(cr, NoLimitThroughputController.INSTANCE, null); + } + + @Override + protected List commitWriter(Writer writer, FileDetails fd, + CompactionRequest request) throws IOException { + List newFiles = Lists.newArrayList(writer.getPath()); + writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); + writer.close(); + return newFiles; + } + + @Override + protected void abortWriter(Writer writer) throws IOException { + Path leftoverFile = writer.getPath(); + try { + writer.close(); + } catch (IOException e) { + LOG.warn("Failed to close the writer after an unfinished compaction.", e); + } + try { + store.getFileSystem().delete(leftoverFile, false); + } catch (IOException e) { + LOG.warn( + "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.", + e); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 1364ce085ed..5e796adefb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -68,15 +68,17 @@ public class StripeCompactor extends AbstractMultiOutputCompactor scanners, ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { - return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners, - scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store, - scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); + return (majorRangeFromRow == null) + ? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint, + fd.earliestPutTs) + : StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs, + majorRangeFromRow, majorRangeToRow); } } - public List compact(CompactionRequest request, List targetBoundaries, - byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController, - User user) throws IOException { + public List compact(CompactionRequest request, final List targetBoundaries, + final byte[] majorRangeFromRow, final byte[] majorRangeToRow, + ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -85,30 +87,44 @@ public class StripeCompactor extends AbstractMultiOutputCompactor() { + + @Override + public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, + boolean shouldDropBehind) throws IOException { + StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( + store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow); + initMultiWriter(writer, scanner, fd, shouldDropBehind); + return writer; + } + }, throughputController, user); } - public List compact(CompactionRequest request, int targetCount, long targetSize, - byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, + public List compact(CompactionRequest request, final int targetCount, final long targetSize, + final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " - + targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) - + "] range"); + LOG.debug( + "Executing compaction with " + targetSize + " target file size, no more than " + targetCount + + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range"); } - StripeMultiFileWriter writer = - new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize, - left, right); - return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow, - majorRangeToRow), throughputController, user); + return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow), + new CellSinkFactory() { + + @Override + public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, + boolean shouldDropBehind) throws IOException { + StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( + store.getComparator(), targetCount, targetSize, left, right); + initMultiWriter(writer, scanner, fd, shouldDropBehind); + return writer; + } + }, throughputController, user); } @Override - protected List commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd, + protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, CompactionRequest request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";