HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)

Introduce a StoreFileWriterCreationTracker to track the store files being written

Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
(cherry picked from commit 48c4a4626e54724beb4ed46becbedbe292841dc4)
This commit is contained in:
Duo Zhang 2022-04-17 21:58:12 +08:00
parent 07531728b1
commit 450a54bed8
26 changed files with 394 additions and 244 deletions

View File

@ -22,12 +22,14 @@ import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@ -84,10 +87,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override @Override
public StoreFileWriter createWriter(InternalScanner scanner, public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException { boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
// make this writer with tags always because of possible new cells with tags. // make this writer with tags always because of possible new cells with tags.
return store.getStoreEngine().createWriter( return store.getStoreEngine()
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true)); .createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.includeMVCCReadpoint(true)
.includesTag(true));
} }
}; };
@ -155,17 +162,19 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* the scanner to filter the deleted cells. * the scanner to filter the deleted cells.
* @param fd File details * @param fd File details
* @param scanner Where to read from. * @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point. * @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller. * @param throughputController The compaction throughput controller.
* @param major Is a major compaction. * @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact * @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason. * @return Whether compaction ended; false if it was interrupted for any reason.
*/ */
@Override @Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException { boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0; long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data, // Since scanner.next() can return 'false' but still be delivering data,
@ -369,9 +378,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
return true; return true;
} }
@Override @Override
protected List<Path> commitWriter(FileDetails fd, protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath()); List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -100,7 +101,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException { FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>(); ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount(); long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
@ -114,7 +115,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
synchronized (flushLock) { synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer"); status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk // Write the map out to the disk
writer = createWriter(snapshot, true); writer = createWriter(snapshot, true, writerCreationTracker);
IOException e = null; IOException e = null;
try { try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing // It's a mob store, flush the cells in a mob way. This is the difference of flushing

View File

@ -67,7 +67,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
* comments in HBASE-15400 for more details. * comments in HBASE-15400 for more details.
*/ */
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET); return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
} }
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction, public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
@ -110,11 +110,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
return paths; return paths;
} }
/** protected abstract Collection<StoreFileWriter> writers();
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();
/** /**
* Subclasses override this method to be called at the end of a successful sequence of append; all * Subclasses override this method to be called at the end of a successful sequence of append; all

View File

@ -152,7 +152,7 @@ public class BrokenStoreFileCleaner extends ScheduledChore {
} }
private boolean isCompactionResultFile(FileStatus file, HStore store) { private boolean isCompactionResultFile(FileStatus file, HStore store) {
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); return store.getStoreFilesBeingWritten().contains(file.getPath());
} }
// Compacted files can still have readers and are cleaned by a separate chore, so they have to // Compacted files can still have readers and are cleaned by a separate chore, so they have to

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.function.Consumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {
private String fileStoragePolicy = HConstants.EMPTY_STRING; private String fileStoragePolicy = HConstants.EMPTY_STRING;
private Consumer<Path> writerCreationTracker;
private CreateStoreFileWriterParams() { private CreateStoreFileWriterParams() {
} }
@ -127,8 +131,16 @@ public final class CreateStoreFileWriterParams {
return this; return this;
} }
public Consumer<Path> writerCreationTracker() {
return writerCreationTracker;
}
public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}
public static CreateStoreFileWriterParams create() { public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams(); return new CreateStoreFileWriterParams();
} }
} }

View File

@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
} }
@Override @Override
public Collection<StoreFileWriter> writers() { protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values(); return lowerBoundary2Writer.values();
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -44,8 +45,8 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
FlushLifeCycleTracker tracker) throws IOException { Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>(); ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
@ -59,7 +60,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) { synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer"); status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk // Write the map out to the disk
writer = createWriter(snapshot, false); writer = createWriter(snapshot, false, writerCreationTracker);
IOException e = null; IOException e = null;
try { try {
performFlush(scanner, writer, throughputController); performFlush(scanner, writer, throughputController);

View File

@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.ToLongFunction; import java.util.function.ToLongFunction;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.LongStream; import java.util.stream.LongStream;
@ -213,14 +215,46 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private final StoreContext storeContext; private final StoreContext storeContext;
// Used to track the store files which are currently being written. For compaction, if we want to
// compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
// track the store files being written when flushing.
// Notice that the creation is in the background compaction or flush thread and we will get the
// files in other thread, so it needs to be thread safe.
private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
public void accept(Path t) {
files.add(t);
}
public Set<Path> get() {
return Collections.unmodifiableSet(files);
}
}
// We may have multiple compaction running at the same time, and flush can also happen at the same
// time, so here we need to use a collection, and the collection needs to be thread safe.
// The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
// implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
// IdentityHashMap if later we want to implement hashCode or equals.
private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
Collections.newSetFromMap(new ConcurrentHashMap<>());
// For the SFT implementation which we will write tmp store file first, we do not need to clean up
// the broken store files under the data directory, which means we do not need to track the store
// file writer creation. So here we abstract a factory to return different trackers for different
// SFT implementations.
private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
/** /**
* Constructor * Constructor
* @param family HColumnDescriptor for this column * @param family HColumnDescriptor for this column
* @param confParam configuration object failed. Can be null. * @param confParam configuration object failed. Can be null.
*/ */
protected HStore(final HRegion region, final ColumnFamilyDescriptor family, protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, boolean warmup) throws IOException { final Configuration confParam, boolean warmup) throws IOException {
this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
this.region = region; this.region = region;
@ -265,6 +299,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
storeEngine.initialize(warmup); storeEngine.initialize(warmup);
// if require writing to tmp dir first, then we just return null, which indicate that we do not
// need to track the creation of store file writer, otherwise we return a new
// StoreFileWriterCreationTracker.
this.storeFileWriterCreationTrackerFactory =
storeEngine.requireWritingToTmpDirFirst() ? () -> null
: () -> new StoreFileWriterCreationTracker();
refreshStoreSizeAndTotalBytes(); refreshStoreSizeAndTotalBytes();
flushRetriesNumber = conf.getInt( flushRetriesNumber = conf.getInt(
@ -792,8 +832,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* @throws IOException if exception occurs during process * @throws IOException if exception occurs during process
*/ */
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
FlushLifeCycleTracker tracker) throws IOException { Consumer<Path> writerCreationTracker) throws IOException {
// If an exception happens flushing, we let it out without clearing // If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say // the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around. // 'snapshot', the next time flush comes around.
@ -803,8 +843,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
IOException lastException = null; IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) { for (int i = 0; i < flushRetriesNumber; i++) {
try { try {
List<Path> pathNames = List<Path> pathNames = flusher.flushSnapshot(
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); snapshot,
logCacheFlushId,
status,
throughputController,
tracker,
writerCreationTracker);
Path lastPathName = null; Path lastPathName = null;
try { try {
for (Path pathName : pathNames) { for (Path pathName : pathNames) {
@ -1115,6 +1160,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
assert compaction != null; assert compaction != null;
CompactionRequestImpl cr = compaction.getRequest(); CompactionRequestImpl cr = compaction.getRequest();
StoreFileWriterCreationTracker writerCreationTracker =
storeFileWriterCreationTrackerFactory.get();
if (writerCreationTracker != null) {
cr.setWriterCreationTracker(writerCreationTracker);
storeFileWriterCreationTrackers.add(writerCreationTracker);
}
try { try {
// Do all sanity checking in here if we have a valid CompactionRequestImpl // Do all sanity checking in here if we have a valid CompactionRequestImpl
// because we need to clean up after it on the way out in a finally // because we need to clean up after it on the way out in a finally
@ -1154,18 +1205,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
} }
replaceStoreFiles(filesToCompact, sfs, true); replaceStoreFiles(filesToCompact, sfs, true);
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// have failed.
storeEngine.resetCompactionWriter();
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
} else {
compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
}
long outputBytes = getTotalSize(sfs); long outputBytes = getTotalSize(sfs);
// At this point the store will use new files for all new scanners. // At this point the store will use new files for all new scanners.
@ -1573,6 +1612,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
synchronized (filesCompacting) { synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles()); filesCompacting.removeAll(cr.getFiles());
} }
// The tracker could be null, for example, we do not need to track the creation of store file
// writer due to different implementation of SFT, or the compaction is canceled.
if (cr.getWriterCreationTracker() != null) {
storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
}
} }
/** /**
@ -1896,6 +1940,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private final class StoreFlusherImpl implements StoreFlushContext { private final class StoreFlusherImpl implements StoreFlushContext {
private final FlushLifeCycleTracker tracker; private final FlushLifeCycleTracker tracker;
private final StoreFileWriterCreationTracker writerCreationTracker;
private final long cacheFlushSeqNum; private final long cacheFlushSeqNum;
private MemStoreSnapshot snapshot; private MemStoreSnapshot snapshot;
private List<Path> tempFiles; private List<Path> tempFiles;
@ -1907,6 +1952,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
this.cacheFlushSeqNum = cacheFlushSeqNum; this.cacheFlushSeqNum = cacheFlushSeqNum;
this.tracker = tracker; this.tracker = tracker;
this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
} }
/** /**
@ -1927,41 +1973,61 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
public void flushCache(MonitoredTask status) throws IOException { public void flushCache(MonitoredTask status) throws IOException {
RegionServerServices rsService = region.getRegionServerServices(); RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController = ThroughputController throughputController =
rsService == null ? null : rsService.getFlushThroughputController(); rsService == null ? null : rsService.getFlushThroughputController();
tempFiles = // it could be null if we do not need to track the creation of store file writer due to
HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); // different SFT implementation.
if (writerCreationTracker != null) {
HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
}
tempFiles = HStore.this.flushCache(
cacheFlushSeqNum,
snapshot,
status,
throughputController,
tracker,
writerCreationTracker);
} }
@Override @Override
public boolean commit(MonitoredTask status) throws IOException { public boolean commit(MonitoredTask status) throws IOException {
if (CollectionUtils.isEmpty(this.tempFiles)) { try {
return false; if (CollectionUtils.isEmpty(this.tempFiles)) {
} return false;
status.setStatus("Flushing " + this + ": reopening flushed file"); }
List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false); status.setStatus("Flushing " + this + ": reopening flushed file");
for (HStoreFile sf : storeFiles) { List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
StoreFileReader r = sf.getReader(); for (HStoreFile sf : storeFiles) {
if (LOG.isInfoEnabled()) { StoreFileReader r = sf.getReader();
LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), if (LOG.isInfoEnabled()) {
cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); LOG.info(
"Added {}, entries={}, sequenceid={}, filesize={}",
sf,
r.getEntries(),
cacheFlushSeqNum,
TraditionalBinaryPrefix.long2String(r.length(), "", 1));
}
outputFileSize += r.length();
storeSize.addAndGet(r.length());
totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
committedFiles.add(sf.getPath());
} }
outputFileSize += r.length();
storeSize.addAndGet(r.length());
totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
committedFiles.add(sf.getPath());
}
flushedCellsCount.addAndGet(cacheFlushCount); flushedCellsCount.addAndGet(cacheFlushCount);
flushedCellsSize.addAndGet(cacheFlushSize); flushedCellsSize.addAndGet(cacheFlushSize);
flushedOutputFileSize.addAndGet(outputFileSize); flushedOutputFileSize.addAndGet(outputFileSize);
// call coprocessor after we have done all the accounting above // call coprocessor after we have done all the accounting above
for (HStoreFile sf : storeFiles) { for (HStoreFile sf : storeFiles) {
if (getCoprocessorHost() != null) { if (getCoprocessorHost() != null) {
getCoprocessorHost().postFlush(HStore.this, sf, tracker); getCoprocessorHost().postFlush(HStore.this, sf, tracker);
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return completeFlush(storeFiles, snapshot.getId());
} finally {
if (writerCreationTracker != null) {
HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
} }
} }
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return completeFlush(storeFiles, snapshot.getId());
} }
@Override @Override
@ -2107,6 +2173,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return majorCompactedCellsSize.get(); return majorCompactedCellsSize.get();
} }
public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
if (isMajor) {
majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
} else {
compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
compactedCellsSize.addAndGet(progress.totalCompactedSize);
}
}
/** /**
* Returns the StoreEngine that is backing this concrete implementation of Store. * Returns the StoreEngine that is backing this concrete implementation of Store.
* @return Returns the {@link StoreEngine} object used internally inside this HStore object. * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
@ -2401,4 +2477,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
mixedRowReadsCount.increment(); mixedRowReadsCount.increment();
} }
} }
/**
* Return the storefiles which are currently being written to. Mainly used by
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
* SFT yet.
*/
Set<Path> getStoreFilesBeingWritten() {
return storeFileWriterCreationTrackers.stream()
.flatMap(t -> t.get().stream())
.collect(Collectors.toSet());
}
} }

View File

@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -94,7 +92,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy, public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
C extends Compactor, SFM extends StoreFileManager> { C extends Compactor<?>, SFM extends StoreFileManager> {
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
@ -157,7 +155,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
/** /**
* @return Compactor to use. * @return Compactor to use.
*/ */
public Compactor getCompactor() { public Compactor<?> getCompactor() {
return this.compactor; return this.compactor;
} }
@ -544,17 +542,6 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
return storeFileTracker.requireWritingToTmpDirFirst(); return storeFileTracker.requireWritingToTmpDirFirst();
} }
/**
* Resets the compaction writer when the new file is committed and used as active storefile.
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
* have failed. Currently called in
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
*/
public void resetCompactionWriter(){
compactor.resetWriter();
}
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "", @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/TestHStore.java") allowedOnPath = ".*/TestHStore.java")
ReadWriteLock getLock() { ReadWriteLock getLock() {

View File

@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -423,9 +424,14 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private boolean shouldDropCacheBehind; private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
private String fileStoragePolicy; private String fileStoragePolicy;
// this is used to track the creation of the StoreFileWriter, mainly used for the SFT
// implementation where we will write store files directly to the final place, instead of
// writing a tmp file first. Under this scenario, we will have a background task to purge the
// store files which are not recorded in the SFT, but for the newly created store file writer,
// they are not tracked in SFT, so here we need to record them and treat them specially.
private Consumer<Path> writerCreationTracker;
public Builder(Configuration conf, CacheConfig cacheConf, public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
FileSystem fs) {
this.conf = conf; this.conf = conf;
this.cacheConf = cacheConf; this.cacheConf = cacheConf;
this.fs = fs; this.fs = fs;
@ -509,6 +515,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
return this; return this;
} }
public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}
/** /**
* Create a store file writer. Client is responsible for closing file when * Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using * done. If metadata, add BEFORE closing using
@ -557,8 +568,21 @@ public class StoreFileWriter implements CellSink, ShipperListener {
bloomType = BloomType.NONE; bloomType = BloomType.NONE;
} }
} }
// make sure we call this before actually create the writer
return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
// try to delete it and finally we will clean the tracker up after compaction. But if the file
// cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
// and cause problem.
if (writerCreationTracker != null) {
writerCreationTracker.accept(filePath);
}
return new StoreFileWriter(
fs,
filePath,
conf,
cacheConf,
bloomType,
maxKeyCount,
favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
} }
} }

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -55,8 +55,8 @@ abstract class StoreFlusher {
* @return List of files written. Can be empty; must not be null. * @return List of files written. Can be empty; must not be null.
*/ */
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
FlushLifeCycleTracker tracker) throws IOException; Consumer<Path> writerCreationTracker) throws IOException;
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException { MonitoredTask status) throws IOException {
@ -69,13 +69,17 @@ abstract class StoreFlusher {
writer.close(); writer.close();
} }
protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag) protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag,
throws IOException { Consumer<Path> writerCreationTracker) throws IOException {
return store.getStoreEngine() return store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount()) .createWriter(
.compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false) CreateStoreFileWriterParams.create()
.includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) .maxKeyCount(snapshot.getCellsCount())
.shouldDropBehind(false)); .compression(store.getColumnFamilyDescriptor().getCompressionType())
.isCompaction(false)
.includeMVCCReadpoint(true)
.includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
.shouldDropBehind(false).writerCreationTracker(writerCreationTracker));
} }
/** /**

View File

@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
} }
@Override @Override
public Collection<StoreFileWriter> writers() { protected Collection<StoreFileWriter> writers() {
return existingWriters; return existingWriters;
} }

View File

@ -23,7 +23,7 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -54,11 +54,14 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
FlushLifeCycleTracker tracker) throws IOException { Consumer<Path> writerCreationTracker) throws IOException {
List<Path> result = new ArrayList<>(); List<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) {
// don't flush if there are no entries
return result;
}
InternalScanner scanner = createScanner(snapshot.getScanners(), tracker); InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
@ -70,8 +73,9 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter mw = null; StripeMultiFileWriter mw = null;
try { try {
mw = req.createWriter(); // Writer according to the policy. mw = req.createWriter(); // Writer according to the policy.
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot); StripeMultiFileWriter.WriterFactory factory =
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; createWriterFactory(snapshot, writerCreationTracker);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
mw.init(storeScanner, factory); mw.init(storeScanner, factory);
synchronized (flushLock) { synchronized (flushLock) {
@ -98,12 +102,13 @@ public class StripeStoreFlusher extends StoreFlusher {
return result; return result;
} }
private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) { private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot,
Consumer<Path> writerCreationTracker) {
return new StripeMultiFileWriter.WriterFactory() { return new StripeMultiFileWriter.WriterFactory() {
@Override @Override
public StoreFileWriter createWriter() throws IOException { public StoreFileWriter createWriter() throws IOException {
// XXX: it used to always pass true for includesTag, re-consider? // XXX: it used to always pass true for includesTag, re-consider?
return StripeStoreFlusher.this.createWriter(snapshot, true); return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker);
} }
}; };
} }

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException; import java.io.IOException;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -46,19 +46,21 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
super(conf, store); super(conf, store);
} }
protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner, protected final void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
final FileDetails fd, final boolean shouldDropBehind, boolean major) { final FileDetails fd, final boolean shouldDropBehind, boolean major,
Consumer<Path> writerCreationTracker) {
WriterFactory writerFactory = new WriterFactory() { WriterFactory writerFactory = new WriterFactory() {
@Override @Override
public StoreFileWriter createWriter() throws IOException { public StoreFileWriter createWriter() throws IOException {
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major); return AbstractMultiOutputCompactor.this
.createWriter(fd, shouldDropBehind, major, writerCreationTracker);
} }
@Override @Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException { throws IOException {
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, return AbstractMultiOutputCompactor.this
fileStoragePolicy, major); .createWriter(fd, shouldDropBehind, fileStoragePolicy, major, writerCreationTracker);
} }
}; };
// Prepare multi-writer, and perform the compaction using scanner and writer. // Prepare multi-writer, and perform the compaction using scanner and writer.
@ -68,7 +70,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
} }
@Override @Override
protected void abortWriter() throws IOException { protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
FileSystem fs = store.getFileSystem(); FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) { for (Path leftoverFile : writer.abortWriters()) {
try { try {
@ -79,7 +81,6 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
e); e);
} }
} }
//this step signals that the target file is no longer writen and can be cleaned up
writer = null;
} }
} }

View File

@ -37,7 +37,7 @@ public class CompactionProgress {
private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class); private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);
/** the total compacting key values in currently running compaction */ /** the total compacting key values in currently running compaction */
private long totalCompactingKVs; public long totalCompactingKVs;
/** the completed count of key values in currently running compaction */ /** the completed count of key values in currently running compaction */
public long currentCompactedKVs = 0; public long currentCompactedKVs = 0;
/** the total size of data processed by the currently running compaction, in bytes */ /** the total size of data processed by the currently running compaction, in bytes */

View File

@ -22,8 +22,9 @@ import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@ -51,6 +52,7 @@ public class CompactionRequestImpl implements CompactionRequest {
private String storeName = ""; private String storeName = "";
private long totalSize = -1L; private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY; private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
private Consumer<Path> writerCreationTracker;
public CompactionRequestImpl(Collection<HStoreFile> files) { public CompactionRequestImpl(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime(); this.selectionTime = EnvironmentEdgeManager.currentTime();
@ -137,6 +139,14 @@ public class CompactionRequestImpl implements CompactionRequest {
return tracker; return tracker;
} }
public Consumer<Path> getWriterCreationTracker() {
return writerCreationTracker;
}
public void setWriterCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
}
public boolean isAfterSplit() { public boolean isAfterSplit() {
return isAfterSplit; return isAfterSplit;
} }

View File

@ -25,12 +25,13 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.Set;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo; import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -70,15 +70,18 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/** /**
* A compactor is a compaction algorithm associated a given policy. Base class also contains * A compactor is a compaction algorithm associated a given policy. Base class also contains
* reusable parts for implementing compactors (what is common and what isn't is evolving). * reusable parts for implementing compactors (what is common and what isn't is evolving).
* <p>
* Compactions might be concurrent against a given store and the Compactor is shared among
* them. Do not put mutable state into class fields. All Compactor class fields should be
* final or effectively final.
* 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class Compactor<T extends CellSink> { public abstract class Compactor<T extends CellSink> {
private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected volatile CompactionProgress progress;
protected final Configuration conf; protected final Configuration conf;
protected final HStore store; protected final HStore store;
protected final int compactionKVMax; protected final int compactionKVMax;
protected final Compression.Algorithm majorCompactionCompression; protected final Compression.Algorithm majorCompactionCompression;
protected final Compression.Algorithm minorCompactionCompression; protected final Compression.Algorithm minorCompactionCompression;
@ -92,15 +95,15 @@ public abstract class Compactor<T extends CellSink> {
protected static final String MINOR_COMPACTION_DROP_CACHE = protected static final String MINOR_COMPACTION_DROP_CACHE =
"hbase.regionserver.minorcompaction.pagecache.drop"; "hbase.regionserver.minorcompaction.pagecache.drop";
private final boolean dropCacheMajor; protected final boolean dropCacheMajor;
private final boolean dropCacheMinor; protected final boolean dropCacheMinor;
// In compaction process only a single thread will access and write to this field, and // We track progress per request using the CompactionRequestImpl identity as key.
// getCompactionTargets is the only place we will access it other than the compaction thread, so // completeCompaction() cleans up this state.
// make it volatile. private final Set<CompactionProgress> progressSet =
protected volatile T writer = null; Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
//TODO: depending on Store is not good but, realistically, all compactors currently do. // TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) { Compactor(Configuration conf, HStore store) {
this.conf = conf; this.conf = conf;
this.store = store; this.store = store;
@ -116,15 +119,9 @@ public abstract class Compactor<T extends CellSink> {
this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
} }
protected interface CellSinkFactory<S> { protected interface CellSinkFactory<S> {
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major) S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
throws IOException; Consumer<Path> writerCreationTracker) throws IOException;
}
public CompactionProgress getProgress() {
return this.progress;
} }
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */ /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
@ -271,12 +268,13 @@ public abstract class Compactor<T extends CellSink> {
}; };
protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
boolean major) { boolean major, Consumer<Path> writerCreationTracker) {
return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
.compression(major ? majorCompactionCompression : minorCompactionCompression) .compression(major ? majorCompactionCompression : minorCompactionCompression)
.isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
.includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
.totalCompactedFilesSize(fd.totalCompactedFilesSize); .totalCompactedFilesSize(fd.totalCompactedFilesSize)
.writerCreationTracker(writerCreationTracker);
} }
/** /**
@ -286,16 +284,20 @@ public abstract class Compactor<T extends CellSink> {
* @throws IOException if creation failed * @throws IOException if creation failed
*/ */
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
boolean major) throws IOException { boolean major, Consumer<Path> writerCreationTracker) throws IOException {
// When all MVCC readpoints are 0, don't write them. // When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389. // See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major)); return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
} }
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major) throws IOException { String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
return store.getStoreEngine() return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy)); .createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.fileStoragePolicy(fileStoragePolicy));
} }
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
@ -327,7 +329,6 @@ public abstract class Compactor<T extends CellSink> {
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners. // Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint(); long smallestReadPoint = getSmallestReadPoint();
@ -343,6 +344,9 @@ public abstract class Compactor<T extends CellSink> {
boolean finished = false; boolean finished = false;
List<StoreFileScanner> scanners = List<StoreFileScanner> scanners =
createFileScanners(request.getFiles(), smallestReadPoint, dropCache); createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
T writer = null;
CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
progressSet.add(progress);
try { try {
/* Include deletes, unless we are doing a major compaction */ /* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request); ScanType scanType = scannerFactory.getScanType(request);
@ -355,14 +359,14 @@ public abstract class Compactor<T extends CellSink> {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true; cleanSeqId = true;
} }
if (writer != null){ writer = sinkFactory.createWriter(
LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() scanner,
.map(n -> n.toString()) fd,
.collect(Collectors.joining(", ", "{ ", " }"))); dropCache,
} request.isMajor(),
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); request.getWriterCreationTracker());
finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size()); throughputController, request.isAllFiles(), request.getFiles().size(), progress);
if (!finished) { if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region " throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
@ -380,34 +384,41 @@ public abstract class Compactor<T extends CellSink> {
} else { } else {
Closeables.close(scanner, true); Closeables.close(scanner, true);
} }
if (!finished && writer != null) { if (!finished) {
abortWriter(); if (writer != null) {
abortWriter(writer);
}
} else {
store.updateCompactedMetrics(request.isMajor(), progress);
} }
progressSet.remove(progress);
} }
assert finished : "We should have exited the method on all error paths"; assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error"; assert writer != null : "Writer should be non-null if no error";
return commitWriter(fd, request); return commitWriter(writer, fd, request);
} }
protected abstract List<Path> commitWriter(FileDetails fd, protected abstract List<Path> commitWriter(T writer, FileDetails fd,
CompactionRequestImpl request) throws IOException; CompactionRequestImpl request) throws IOException;
protected abstract void abortWriter() throws IOException; protected abstract void abortWriter(T writer) throws IOException;
/** /**
* Performs the compaction. * Performs the compaction.
* @param fd FileDetails of cell sink writer * @param fd FileDetails of cell sink writer
* @param scanner Where to read from. * @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point. * @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint * smallestReadPoint
* @param major Is a major compaction. * @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact * @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some reason. * @return Whether compaction ended; false if it was interrupted for some reason.
*/ */
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException { boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
assert writer instanceof ShipperListener; assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0; long bytesWrittenProgressForShippedCall = 0;
@ -549,22 +560,27 @@ public abstract class Compactor<T extends CellSink> {
dropDeletesFromRow, dropDeletesToRow); dropDeletesFromRow, dropDeletesToRow);
} }
public List<Path> getCompactionTargets() { /**
T writer = this.writer; * Return the aggregate progress for all currently active compactions.
if (writer == null) { */
return Collections.emptyList(); public CompactionProgress getProgress() {
synchronized (progressSet) {
long totalCompactingKVs = 0;
long currentCompactedKVs = 0;
long totalCompactedSize = 0;
for (CompactionProgress progress: progressSet) {
totalCompactingKVs += progress.totalCompactingKVs;
currentCompactedKVs += progress.currentCompactedKVs;
totalCompactedSize += progress.totalCompactedSize;
}
CompactionProgress result = new CompactionProgress(totalCompactingKVs);
result.currentCompactedKVs = currentCompactedKVs;
result.totalCompactedSize = totalCompactedSize;
return result;
} }
if (writer instanceof StoreFileWriter) {
return Arrays.asList(((StoreFileWriter) writer).getPath());
}
return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath())
.collect(Collectors.toList());
} }
/** public boolean isCompacting() {
* Reset the Writer when the new storefiles were successfully added return !progressSet.isEmpty();
*/
public void resetWriter(){
writer = null;
} }
} }

View File

@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
@ -68,21 +68,26 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override @Override
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException { boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries, throws IOException {
lowerBoundariesPolicies, DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(
needEmptyFile(request)); lowerBoundaries,
initMultiWriter(writer, scanner, fd, shouldDropBehind, major); lowerBoundariesPolicies,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer; return writer;
} }
}, throughputController, user); },
throughputController,
user);
} }
@Override @Override
protected List<Path> commitWriter(FileDetails fd, protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> pathList = List<Path> pathList =
writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
return pathList; return pathList;
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -47,10 +48,11 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
private final CellSinkFactory<StoreFileWriter> writerFactory = private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() { new CellSinkFactory<StoreFileWriter>() {
@Override @Override
public StoreFileWriter createWriter(InternalScanner scanner, public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
boolean shouldDropBehind, boolean major) throws IOException { throws IOException {
return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major); return DefaultCompactor.this
.createWriter(fd, shouldDropBehind, major, writerCreationTracker);
} }
}; };
@ -63,7 +65,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
} }
@Override @Override
protected List<Path> commitWriter(FileDetails fd, protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath()); List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@ -72,12 +74,6 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
} }
@Override @Override
protected void abortWriter() throws IOException {
abortWriter(writer);
// this step signals that the target file is no longer written and can be cleaned up
writer = null;
}
protected final void abortWriter(StoreFileWriter writer) throws IOException { protected final void abortWriter(StoreFileWriter writer) throws IOException {
Path leftoverFile = writer.getPath(); Path leftoverFile = writer.getPath();
try { try {
@ -92,4 +88,5 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
leftoverFile, e); leftoverFile, e);
} }
} }
} }

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -88,18 +88,26 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
} }
LOG.debug(sb.toString()); LOG.debug(sb.toString());
} }
return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow), return compact(
request,
new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() { new CellSinkFactory<StripeMultiFileWriter>() {
@Override @Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException { boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow); store.getComparator(),
initMultiWriter(writer, scanner, fd, shouldDropBehind, major); targetBoundaries,
majorRangeFromRow,
majorRangeToRow);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer; return writer;
} }
}, throughputController, user); },
throughputController,
user);
} }
public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize, public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize,
@ -115,20 +123,28 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override @Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException { boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
store.getComparator(), targetCount, targetSize, left, right); store.getComparator(),
initMultiWriter(writer, scanner, fd, shouldDropBehind, major); targetCount,
targetSize,
left,
right);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer; return writer;
} }
}, throughputController, user); },
throughputController,
user);
} }
@Override @Override
protected List<Path> commitWriter(FileDetails fd, protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles; return newFiles;
} }
} }

View File

@ -165,11 +165,15 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
} }
StoreFileWriter.Builder builder = StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
.withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) .withOutputDir(outputDir)
.withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) .withBloomType(ctx.getBloomFilterType())
.withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) .withMaxKeyCount(params.maxKeyCount())
.withFavoredNodes(ctx.getFavoredNodes())
.withFileContext(hFileContext)
.withShouldDropCacheBehind(params.shouldDropBehind())
.withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
.withFileStoragePolicy(params.fileStoragePolicy()); .withFileStoragePolicy(params.fileStoragePolicy())
.withWriterCreationTracker(params.writerCreationTracker());
return builder.build(); return builder.build();
} }

View File

@ -128,13 +128,14 @@ public class TestCompactorMemLeak {
} }
@Override @Override
protected List<Path> commitWriter(FileDetails fd, protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
Cell cell = writerImpl.getLastCell(); Cell cell = writerImpl.getLastCell();
// The cell should be backend with an KeyOnlyKeyValue. // The cell should be backend with an KeyOnlyKeyValue.
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
return super.commitWriter(fd, request); return super.commitWriter(writer, fd, request);
} }
} }
} }

View File

@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator; import java.util.function.IntBinaryOperator;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
@ -2491,9 +2493,10 @@ public class TestHStore {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException { FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
counter.incrementAndGet(); counter.incrementAndGet();
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
writerCreationTracker);
} }
@Override @Override

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -211,26 +209,9 @@ public class TestMajorCompaction {
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100)); Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100));
assertEquals(compactionThreshold, result.size()); assertEquals(compactionThreshold, result.size());
// see if CompactionProgress is in place but null
for (HStore store : r.getStores()) {
assertNull(store.getCompactionProgress());
}
r.flush(true); r.flush(true);
r.compact(true); r.compact(true);
// see if CompactionProgress has done its thing on at least one store
int storeCount = 0;
for (HStore store : r.getStores()) {
CompactionProgress progress = store.getCompactionProgress();
if (progress != null) {
++storeCount;
assertTrue(progress.currentCompactedKVs > 0);
assertTrue(progress.getTotalCompactingKVs() > 0);
}
assertTrue(storeCount > 0);
}
// look at the second row // look at the second row
// Increment the least significant character so we get to next row. // Increment the least significant character so we get to next row.
byte[] secondRowBytes = START_KEY_BYTES.clone(); byte[] secondRowBytes = START_KEY_BYTES.clone();

View File

@ -92,7 +92,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -118,11 +117,9 @@ import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@ -621,17 +618,11 @@ public class TestSplitTransactionOnCluster {
assertEquals(1, region.getStores().size()); assertEquals(1, region.getStores().size());
HStore store = region.getStores().get(0); HStore store = region.getStores().get(0);
while (store.hasReferences()) { while (store.hasReferences()) {
// Wait on any current compaction to complete first. while (store.storeEngine.getCompactor().isCompacting()) {
CompactionProgress progress = store.getCompactionProgress(); Threads.sleep(100);
if (progress != null && progress.getProgressPct() < 1.0f) {
while (progress.getProgressPct() < 1.0f) {
LOG.info("Waiting, progress={}", progress.getProgressPct());
Threads.sleep(1000);
}
} else {
// Run new compaction. Shoudn't be any others running.
region.compact(true);
} }
// Run new compaction. Shoudn't be any others running.
region.compact(true);
store.closeAndArchiveCompactedFiles(); store.closeAndArchiveCompactedFiles();
} }
} }

View File

@ -43,6 +43,7 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
@ -654,11 +653,12 @@ public abstract class AbstractTestWALReplay {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController, MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException { FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
if (throwExceptionWhenFlushing.get()) { if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests"); throw new IOException("Simulated exception by tests");
} }
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
writerCreationTracker);
} }
}; };