HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)

Introduce a StoreFileWriterCreationTracker to track the store files being written

Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Duo Zhang 2022-04-17 21:58:12 +08:00 committed by GitHub
parent 267b2ed86b
commit 48c4a4626e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 399 additions and 249 deletions

View File

@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@ -146,10 +149,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.getStoreEngine().createWriter(
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
return store.getStoreEngine()
.createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.includeMVCCReadpoint(true)
.includesTag(true));
}
};
@ -285,17 +292,19 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
@ -661,9 +670,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
}
}
@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());

View File

@ -25,6 +25,7 @@ import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -112,7 +113,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@ -126,7 +127,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, true);
writer = createWriter(snapshot, true, writerCreationTracker);
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing

View File

@ -67,7 +67,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
}
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
@ -110,11 +110,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
return paths;
}
/**
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();
protected abstract Collection<StoreFileWriter> writers();
/**
* Subclasses override this method to be called at the end of a successful sequence of append; all

View File

@ -152,7 +152,7 @@ public class BrokenStoreFileCleaner extends ScheduledChore {
}
private boolean isCompactionResultFile(FileStatus file, HStore store) {
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
return store.getStoreFilesBeingWritten().contains(file.getPath());
}
// Compacted files can still have readers and are cleaned by a separate chore, so they have to

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.function.Consumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience;
@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {
private String fileStoragePolicy = HConstants.EMPTY_STRING;
private Consumer<Path> writerCreationTracker;
private CreateStoreFileWriterParams() {
}
@ -127,8 +131,16 @@ public final class CreateStoreFileWriterParams {
return this;
}
public Consumer<Path> writerCreationTracker() {
return writerCreationTracker;
}
public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}
public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams();
}
}

View File

@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
public Collection<StoreFileWriter> writers() {
protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -44,8 +45,8 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@ -59,7 +60,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, false);
writer = createWriter(snapshot, false, writerCreationTracker);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);

View File

@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@ -156,8 +158,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// rows that has cells from both memstore and files (or only files)
private LongAdder mixedRowReadsCount = new LongAdder();
private boolean cacheOnWriteLogged;
/**
* Lock specific to archiving compacted store files. This avoids races around
* the combination of retrieving the list of compacted files and moving them to
@ -215,14 +215,46 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private final StoreContext storeContext;
// Used to track the store files which are currently being written. For compaction, if we want to
// compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
// track the store files being written when flushing.
// Notice that the creation is in the background compaction or flush thread and we will get the
// files in other thread, so it needs to be thread safe.
private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
public void accept(Path t) {
files.add(t);
}
public Set<Path> get() {
return Collections.unmodifiableSet(files);
}
}
// We may have multiple compaction running at the same time, and flush can also happen at the same
// time, so here we need to use a collection, and the collection needs to be thread safe.
// The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
// implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
// IdentityHashMap if later we want to implement hashCode or equals.
private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
Collections.newSetFromMap(new ConcurrentHashMap<>());
// For the SFT implementation which we will write tmp store file first, we do not need to clean up
// the broken store files under the data directory, which means we do not need to track the store
// file writer creation. So here we abstract a factory to return different trackers for different
// SFT implementations.
private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
/**
* Constructor
* @param family HColumnDescriptor for this column
* @param confParam configuration object failed. Can be null.
* @param confParam configuration object failed. Can be null.
*/
protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, boolean warmup) throws IOException {
this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
this.region = region;
@ -267,6 +299,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
storeEngine.initialize(warmup);
// if require writing to tmp dir first, then we just return null, which indicate that we do not
// need to track the creation of store file writer, otherwise we return a new
// StoreFileWriterCreationTracker.
this.storeFileWriterCreationTrackerFactory =
storeEngine.requireWritingToTmpDirFirst() ? () -> null
: () -> new StoreFileWriterCreationTracker();
refreshStoreSizeAndTotalBytes();
flushRetriesNumber = conf.getInt(
@ -290,7 +328,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
family.getCompressionType());
cacheOnWriteLogged = false;
}
private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
@ -795,8 +832,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* @throws IOException if exception occurs during process
*/
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@ -806,8 +843,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) {
try {
List<Path> pathNames =
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker);
List<Path> pathNames = flusher.flushSnapshot(
snapshot,
logCacheFlushId,
status,
throughputController,
tracker,
writerCreationTracker);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
@ -1118,6 +1160,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
ThroughputController throughputController, User user) throws IOException {
assert compaction != null;
CompactionRequestImpl cr = compaction.getRequest();
StoreFileWriterCreationTracker writerCreationTracker =
storeFileWriterCreationTrackerFactory.get();
if (writerCreationTracker != null) {
cr.setWriterCreationTracker(writerCreationTracker);
storeFileWriterCreationTrackers.add(writerCreationTracker);
}
try {
// Do all sanity checking in here if we have a valid CompactionRequestImpl
// because we need to clean up after it on the way out in a finally
@ -1157,18 +1205,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
replaceStoreFiles(filesToCompact, sfs, true);
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// have failed.
storeEngine.resetCompactionWriter();
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
} else {
compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
}
long outputBytes = getTotalSize(sfs);
// At this point the store will use new files for all new scanners.
@ -1577,6 +1613,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
}
// The tracker could be null, for example, we do not need to track the creation of store file
// writer due to different implementation of SFT, or the compaction is canceled.
if (cr.getWriterCreationTracker() != null) {
storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
}
}
/**
@ -1900,6 +1941,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private final class StoreFlusherImpl implements StoreFlushContext {
private final FlushLifeCycleTracker tracker;
private final StoreFileWriterCreationTracker writerCreationTracker;
private final long cacheFlushSeqNum;
private MemStoreSnapshot snapshot;
private List<Path> tempFiles;
@ -1911,6 +1953,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
this.cacheFlushSeqNum = cacheFlushSeqNum;
this.tracker = tracker;
this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
}
/**
@ -1931,41 +1974,61 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
public void flushCache(MonitoredTask status) throws IOException {
RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController =
rsService == null ? null : rsService.getFlushThroughputController();
tempFiles =
HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker);
rsService == null ? null : rsService.getFlushThroughputController();
// it could be null if we do not need to track the creation of store file writer due to
// different SFT implementation.
if (writerCreationTracker != null) {
HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
}
tempFiles = HStore.this.flushCache(
cacheFlushSeqNum,
snapshot,
status,
throughputController,
tracker,
writerCreationTracker);
}
@Override
public boolean commit(MonitoredTask status) throws IOException {
if (CollectionUtils.isEmpty(this.tempFiles)) {
return false;
}
status.setStatus("Flushing " + this + ": reopening flushed file");
List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
for (HStoreFile sf : storeFiles) {
StoreFileReader r = sf.getReader();
if (LOG.isInfoEnabled()) {
LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
try {
if (CollectionUtils.isEmpty(this.tempFiles)) {
return false;
}
status.setStatus("Flushing " + this + ": reopening flushed file");
List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
for (HStoreFile sf : storeFiles) {
StoreFileReader r = sf.getReader();
if (LOG.isInfoEnabled()) {
LOG.info(
"Added {}, entries={}, sequenceid={}, filesize={}",
sf,
r.getEntries(),
cacheFlushSeqNum,
TraditionalBinaryPrefix.long2String(r.length(), "", 1));
}
outputFileSize += r.length();
storeSize.addAndGet(r.length());
totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
committedFiles.add(sf.getPath());
}
outputFileSize += r.length();
storeSize.addAndGet(r.length());
totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
committedFiles.add(sf.getPath());
}
flushedCellsCount.addAndGet(cacheFlushCount);
flushedCellsSize.addAndGet(cacheFlushSize);
flushedOutputFileSize.addAndGet(outputFileSize);
// call coprocessor after we have done all the accounting above
for (HStoreFile sf : storeFiles) {
if (getCoprocessorHost() != null) {
getCoprocessorHost().postFlush(HStore.this, sf, tracker);
flushedCellsCount.addAndGet(cacheFlushCount);
flushedCellsSize.addAndGet(cacheFlushSize);
flushedOutputFileSize.addAndGet(outputFileSize);
// call coprocessor after we have done all the accounting above
for (HStoreFile sf : storeFiles) {
if (getCoprocessorHost() != null) {
getCoprocessorHost().postFlush(HStore.this, sf, tracker);
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return completeFlush(storeFiles, snapshot.getId());
} finally {
if (writerCreationTracker != null) {
HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return completeFlush(storeFiles, snapshot.getId());
}
@Override
@ -2111,6 +2174,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return majorCompactedCellsSize.get();
}
public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
if (isMajor) {
majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
} else {
compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
compactedCellsSize.addAndGet(progress.totalCompactedSize);
}
}
/**
* Returns the StoreEngine that is backing this concrete implementation of Store.
* @return Returns the {@link StoreEngine} object used internally inside this HStore object.
@ -2406,4 +2479,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
mixedRowReadsCount.increment();
}
}
/**
* Return the storefiles which are currently being written to. Mainly used by
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
* SFT yet.
*/
Set<Path> getStoreFilesBeingWritten() {
return storeFileWriterCreationTrackers.stream()
.flatMap(t -> t.get().stream())
.collect(Collectors.toSet());
}
}

View File

@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -94,7 +92,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
*/
@InterfaceAudience.Private
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
C extends Compactor, SFM extends StoreFileManager> {
C extends Compactor<?>, SFM extends StoreFileManager> {
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
@ -157,7 +155,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
/**
* @return Compactor to use.
*/
public Compactor getCompactor() {
public Compactor<?> getCompactor() {
return this.compactor;
}
@ -544,17 +542,6 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
return storeFileTracker.requireWritingToTmpDirFirst();
}
/**
* Resets the compaction writer when the new file is committed and used as active storefile.
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
* have failed. Currently called in
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
*/
public void resetCompactionWriter(){
compactor.resetWriter();
}
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/TestHStore.java")
ReadWriteLock getLock() {

View File

@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -439,9 +440,14 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
private String fileStoragePolicy;
// this is used to track the creation of the StoreFileWriter, mainly used for the SFT
// implementation where we will write store files directly to the final place, instead of
// writing a tmp file first. Under this scenario, we will have a background task to purge the
// store files which are not recorded in the SFT, but for the newly created store file writer,
// they are not tracked in SFT, so here we need to record them and treat them specially.
private Consumer<Path> writerCreationTracker;
public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
this.conf = conf;
this.cacheConf = cacheConf;
this.fs = fs;
@ -525,6 +531,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
return this;
}
public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@ -573,8 +584,21 @@ public class StoreFileWriter implements CellSink, ShipperListener {
bloomType = BloomType.NONE;
}
}
return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
// make sure we call this before actually create the writer
// in fact, it is not a big deal to even add an inexistent file to the track, as we will never
// try to delete it and finally we will clean the tracker up after compaction. But if the file
// cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
// and cause problem.
if (writerCreationTracker != null) {
writerCreationTracker.accept(filePath);
}
return new StoreFileWriter(
fs,
filePath,
conf,
cacheConf,
bloomType,
maxKeyCount,
favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
}
}

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -56,8 +56,8 @@ abstract class StoreFlusher {
* @return List of files written. Can be empty; must not be null.
*/
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException;
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException;
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
@ -70,13 +70,17 @@ abstract class StoreFlusher {
writer.close();
}
protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
throws IOException {
protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag,
Consumer<Path> writerCreationTracker) throws IOException {
return store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
.compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
.includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
.shouldDropBehind(false));
.createWriter(
CreateStoreFileWriterParams.create()
.maxKeyCount(snapshot.getCellsCount())
.compression(store.getColumnFamilyDescriptor().getCompressionType())
.isCompaction(false)
.includeMVCCReadpoint(true)
.includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
.shouldDropBehind(false).writerCreationTracker(writerCreationTracker));
}
/**

View File

@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
public Collection<StoreFileWriter> writers() {
protected Collection<StoreFileWriter> writers() {
return existingWriters;
}

View File

@ -23,7 +23,7 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
@ -54,11 +54,14 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException {
List<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
if (cellsCount == 0) {
// don't flush if there are no entries
return result;
}
InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
@ -70,8 +73,9 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
StripeMultiFileWriter.WriterFactory factory =
createWriterFactory(snapshot, writerCreationTracker);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
mw.init(storeScanner, factory);
synchronized (flushLock) {
@ -98,12 +102,13 @@ public class StripeStoreFlusher extends StoreFlusher {
return result;
}
private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot,
Consumer<Path> writerCreationTracker) {
return new StripeMultiFileWriter.WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
// XXX: it used to always pass true for includesTag, re-consider?
return StripeStoreFlusher.this.createWriter(snapshot, true);
return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker);
}
};
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -46,19 +46,21 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
super(conf, store);
}
protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
final FileDetails fd, final boolean shouldDropBehind, boolean major) {
protected final void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
final FileDetails fd, final boolean shouldDropBehind, boolean major,
Consumer<Path> writerCreationTracker) {
WriterFactory writerFactory = new WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
return AbstractMultiOutputCompactor.this
.createWriter(fd, shouldDropBehind, major, writerCreationTracker);
}
@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
fileStoragePolicy, major);
return AbstractMultiOutputCompactor.this
.createWriter(fd, shouldDropBehind, fileStoragePolicy, major, writerCreationTracker);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
@ -68,7 +70,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
}
@Override
protected void abortWriter() throws IOException {
protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
@ -79,7 +81,6 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
e);
}
}
//this step signals that the target file is no longer writen and can be cleaned up
writer = null;
}
}

View File

@ -37,7 +37,7 @@ public class CompactionProgress {
private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);
/** the total compacting key values in currently running compaction */
private long totalCompactingKVs;
public long totalCompactingKVs;
/** the completed count of key values in currently running compaction */
public long currentCompactedKVs = 0;
/** the total size of data processed by the currently running compaction, in bytes */

View File

@ -22,8 +22,9 @@ import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@ -51,6 +52,7 @@ public class CompactionRequestImpl implements CompactionRequest {
private String storeName = "";
private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
private Consumer<Path> writerCreationTracker;
public CompactionRequestImpl(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
@ -137,6 +139,14 @@ public class CompactionRequestImpl implements CompactionRequest {
return tracker;
}
public Consumer<Path> getWriterCreationTracker() {
return writerCreationTracker;
}
public void setWriterCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
}
public boolean isAfterSplit() {
return isAfterSplit;
}

View File

@ -25,12 +25,13 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -40,7 +41,6 @@ import org.apache.hadoop.hbase.PrivateConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -71,15 +71,18 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
* A compactor is a compaction algorithm associated a given policy. Base class also contains
* reusable parts for implementing compactors (what is common and what isn't is evolving).
* <p>
* Compactions might be concurrent against a given store and the Compactor is shared among
* them. Do not put mutable state into class fields. All Compactor class fields should be
* final or effectively final.
* 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
*/
@InterfaceAudience.Private
public abstract class Compactor<T extends CellSink> {
private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected volatile CompactionProgress progress;
protected final Configuration conf;
protected final HStore store;
protected final int compactionKVMax;
protected final Compression.Algorithm majorCompactionCompression;
protected final Compression.Algorithm minorCompactionCompression;
@ -93,15 +96,15 @@ public abstract class Compactor<T extends CellSink> {
protected static final String MINOR_COMPACTION_DROP_CACHE =
"hbase.regionserver.minorcompaction.pagecache.drop";
private final boolean dropCacheMajor;
private final boolean dropCacheMinor;
protected final boolean dropCacheMajor;
protected final boolean dropCacheMinor;
// In compaction process only a single thread will access and write to this field, and
// getCompactionTargets is the only place we will access it other than the compaction thread, so
// make it volatile.
protected volatile T writer = null;
// We track progress per request using the CompactionRequestImpl identity as key.
// completeCompaction() cleans up this state.
private final Set<CompactionProgress> progressSet =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
//TODO: depending on Store is not good but, realistically, all compactors currently do.
// TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) {
this.conf = conf;
this.store = store;
@ -117,15 +120,9 @@ public abstract class Compactor<T extends CellSink> {
this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
}
protected interface CellSinkFactory<S> {
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major)
throws IOException;
}
public CompactionProgress getProgress() {
return this.progress;
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
Consumer<Path> writerCreationTracker) throws IOException;
}
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
@ -272,12 +269,13 @@ public abstract class Compactor<T extends CellSink> {
};
protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
boolean major) {
boolean major, Consumer<Path> writerCreationTracker) {
return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
.compression(major ? majorCompactionCompression : minorCompactionCompression)
.isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
.includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
.totalCompactedFilesSize(fd.totalCompactedFilesSize);
.totalCompactedFilesSize(fd.totalCompactedFilesSize)
.writerCreationTracker(writerCreationTracker);
}
/**
@ -287,16 +285,20 @@ public abstract class Compactor<T extends CellSink> {
* @throws IOException if creation failed
*/
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
boolean major) throws IOException {
boolean major, Consumer<Path> writerCreationTracker) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
}
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major) throws IOException {
String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
.createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.fileStoragePolicy(fileStoragePolicy));
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
@ -328,7 +330,6 @@ public abstract class Compactor<T extends CellSink> {
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
@ -344,6 +345,9 @@ public abstract class Compactor<T extends CellSink> {
boolean finished = false;
List<StoreFileScanner> scanners =
createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
T writer = null;
CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
progressSet.add(progress);
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
@ -356,14 +360,14 @@ public abstract class Compactor<T extends CellSink> {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
if (writer != null){
LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
.map(n -> n.toString())
.collect(Collectors.joining(", ", "{ ", " }")));
}
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size());
writer = sinkFactory.createWriter(
scanner,
fd,
dropCache,
request.isMajor(),
request.getWriterCreationTracker());
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size(), progress);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
@ -381,34 +385,41 @@ public abstract class Compactor<T extends CellSink> {
} else {
Closeables.close(scanner, true);
}
if (!finished && writer != null) {
abortWriter();
if (!finished) {
if (writer != null) {
abortWriter(writer);
}
} else {
store.updateCompactedMetrics(request.isMajor(), progress);
}
progressSet.remove(progress);
}
assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error";
return commitWriter(fd, request);
return commitWriter(writer, fd, request);
}
protected abstract List<Path> commitWriter(FileDetails fd,
protected abstract List<Path> commitWriter(T writer, FileDetails fd,
CompactionRequestImpl request) throws IOException;
protected abstract void abortWriter() throws IOException;
protected abstract void abortWriter(T writer) throws IOException;
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
@ -550,22 +561,27 @@ public abstract class Compactor<T extends CellSink> {
dropDeletesFromRow, dropDeletesToRow);
}
public List<Path> getCompactionTargets() {
T writer = this.writer;
if (writer == null) {
return Collections.emptyList();
/**
* Return the aggregate progress for all currently active compactions.
*/
public CompactionProgress getProgress() {
synchronized (progressSet) {
long totalCompactingKVs = 0;
long currentCompactedKVs = 0;
long totalCompactedSize = 0;
for (CompactionProgress progress: progressSet) {
totalCompactingKVs += progress.totalCompactingKVs;
currentCompactedKVs += progress.currentCompactedKVs;
totalCompactedSize += progress.totalCompactedSize;
}
CompactionProgress result = new CompactionProgress(totalCompactingKVs);
result.currentCompactedKVs = currentCompactedKVs;
result.totalCompactedSize = totalCompactedSize;
return result;
}
if (writer instanceof StoreFileWriter) {
return Arrays.asList(((StoreFileWriter) writer).getPath());
}
return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath())
.collect(Collectors.toList());
}
/**
* Reset the Writer when the new storefiles were successfully added
*/
public void resetWriter(){
writer = null;
public boolean isCompacting() {
return !progressSet.isEmpty();
}
}

View File

@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
@ -68,21 +68,26 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
lowerBoundariesPolicies,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(
lowerBoundaries,
lowerBoundariesPolicies,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer;
}
}, throughputController, user);
},
throughputController,
user);
}
@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> pathList =
writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
return pathList;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -47,10 +48,11 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
return DefaultCompactor.this
.createWriter(fd, shouldDropBehind, major, writerCreationTracker);
}
};
@ -63,7 +65,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@ -72,12 +74,6 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
@Override
protected void abortWriter() throws IOException {
abortWriter(writer);
// this step signals that the target file is no longer written and can be cleaned up
writer = null;
}
protected final void abortWriter(StoreFileWriter writer) throws IOException {
Path leftoverFile = writer.getPath();
try {
@ -92,4 +88,5 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
leftoverFile, e);
}
}
}

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -88,18 +88,26 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
LOG.debug(sb.toString());
}
return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
return compact(
request,
new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() {
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
store.getComparator(),
targetBoundaries,
majorRangeFromRow,
majorRangeToRow);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer;
}
}, throughputController, user);
},
throughputController,
user);
}
public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize,
@ -115,20 +123,28 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
store.getComparator(), targetCount, targetSize, left, right);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
store.getComparator(),
targetCount,
targetSize,
left,
right);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer;
}
}, throughputController, user);
},
throughputController,
user);
}
@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
}
}

View File

@ -177,11 +177,15 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
}
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
.withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
.withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
.withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
.withOutputDir(outputDir)
.withBloomType(ctx.getBloomFilterType())
.withMaxKeyCount(params.maxKeyCount())
.withFavoredNodes(ctx.getFavoredNodes())
.withFileContext(hFileContext)
.withShouldDropCacheBehind(params.shouldDropBehind())
.withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
.withFileStoragePolicy(params.fileStoragePolicy());
.withFileStoragePolicy(params.fileStoragePolicy())
.withWriterCreationTracker(params.writerCreationTracker());
return builder.build();
}

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@ -88,9 +89,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
}
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
totalCompactions.incrementAndGet();
if (major) {

View File

@ -128,13 +128,14 @@ public class TestCompactorMemLeak {
}
@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
Cell cell = writerImpl.getLastCell();
// The cell should be backend with an KeyOnlyKeyValue.
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
return super.commitWriter(fd, request);
return super.commitWriter(writer, fd, request);
}
}
}

View File

@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
@ -2477,9 +2479,10 @@ public class TestHStore {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
counter.incrementAndGet();
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
writerCreationTracker);
}
@Override

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -211,26 +209,9 @@ public class TestMajorCompaction {
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100));
assertEquals(compactionThreshold, result.size());
// see if CompactionProgress is in place but null
for (HStore store : r.getStores()) {
assertNull(store.getCompactionProgress());
}
r.flush(true);
r.compact(true);
// see if CompactionProgress has done its thing on at least one store
int storeCount = 0;
for (HStore store : r.getStores()) {
CompactionProgress progress = store.getCompactionProgress();
if (progress != null) {
++storeCount;
assertTrue(progress.currentCompactedKVs > 0);
assertTrue(progress.getTotalCompactingKVs() > 0);
}
assertTrue(storeCount > 0);
}
// look at the second row
// Increment the least significant character so we get to next row.
byte[] secondRowBytes = START_KEY_BYTES.clone();

View File

@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -117,11 +116,9 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@ -620,17 +617,11 @@ public class TestSplitTransactionOnCluster {
assertEquals(1, region.getStores().size());
HStore store = region.getStores().get(0);
while (store.hasReferences()) {
// Wait on any current compaction to complete first.
CompactionProgress progress = store.getCompactionProgress();
if (progress != null && progress.getProgressPct() < 1.0f) {
while (progress.getProgressPct() < 1.0f) {
LOG.info("Waiting, progress={}", progress.getProgressPct());
Threads.sleep(1000);
}
} else {
// Run new compaction. Shoudn't be any others running.
region.compact(true);
while (store.storeEngine.getCompactor().isCompacting()) {
Threads.sleep(100);
}
// Run new compaction. Shoudn't be any others running.
region.compact(true);
store.closeAndArchiveCompactedFiles();
}
}

View File

@ -43,6 +43,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@ -612,12 +613,13 @@ public abstract class AbstractTestWALReplay {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
writerCreationTracker);
}
}