From 48c4a4626e54724beb4ed46becbedbe292841dc4 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sun, 17 Apr 2022 21:58:12 +0800 Subject: [PATCH] HBASE-26938 Compaction failures after StoreFileTracker integration (#4350) Introduce a StoreFileWriterCreationTracker to track the store files being written Signed-off-by: Josh Elser Signed-off-by: Andrew Purtell --- .../hbase/mob/DefaultMobStoreCompactor.java | 22 ++- .../hbase/mob/DefaultMobStoreFlusher.java | 5 +- .../regionserver/AbstractMultiFileWriter.java | 8 +- .../regionserver/BrokenStoreFileCleaner.java | 2 +- .../CreateStoreFileWriterParams.java | 14 +- .../DateTieredMultiFileWriter.java | 2 +- .../regionserver/DefaultStoreFlusher.java | 7 +- .../hadoop/hbase/regionserver/HStore.java | 180 +++++++++++++----- .../hbase/regionserver/StoreEngine.java | 17 +- .../hbase/regionserver/StoreFileWriter.java | 32 +++- .../hbase/regionserver/StoreFlusher.java | 22 ++- .../regionserver/StripeMultiFileWriter.java | 2 +- .../regionserver/StripeStoreFlusher.java | 21 +- .../AbstractMultiOutputCompactor.java | 19 +- .../compactions/CompactionProgress.java | 2 +- .../compactions/CompactionRequestImpl.java | 12 +- .../regionserver/compactions/Compactor.java | 128 +++++++------ .../compactions/DateTieredCompactor.java | 21 +- .../compactions/DefaultCompactor.java | 19 +- .../compactions/StripeCompactor.java | 38 ++-- .../StoreFileTrackerBase.java | 12 +- .../hbase/mob/FaultyMobStoreCompactor.java | 7 +- .../regionserver/TestCompactorMemLeak.java | 5 +- .../hadoop/hbase/regionserver/TestHStore.java | 7 +- .../regionserver/TestMajorCompaction.java | 19 -- .../TestSplitTransactionOnCluster.java | 17 +- .../wal/AbstractTestWALReplay.java | 8 +- 27 files changed, 399 insertions(+), 249 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 15f0a73a9df..ae3246fabed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.Optional; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; @@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -146,10 +149,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { @Override public StoreFileWriter createWriter(InternalScanner scanner, org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { // make this writer with tags always because of possible new cells with tags. - return store.getStoreEngine().createWriter( - createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true)); + return store.getStoreEngine() + .createWriter( + createParams(fd, shouldDropBehind, major, writerCreationTracker) + .includeMVCCReadpoint(true) + .includesTag(true)); } }; @@ -285,17 +292,19 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { * * @param fd File details * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Clear old mob references @@ -661,9 +670,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { } } - @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 4a1dc7b33a5..cff7474324b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -25,6 +25,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -112,7 +113,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + FlushLifeCycleTracker tracker, Consumer writerCreationTracker) throws IOException { ArrayList result = new ArrayList<>(); long cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -126,7 +127,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = createWriter(snapshot, true); + writer = createWriter(snapshot, true, writerCreationTracker); IOException e = null; try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index 82c3867c103..a824b501c82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -67,7 +67,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen * comments in HBASE-15400 for more details. */ public List commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { - return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET); + return commitWriters(maxSeqId, majorCompaction, Collections.emptyList()); } public List commitWriters(long maxSeqId, boolean majorCompaction, @@ -110,11 +110,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen return paths; } - /** - * Returns all writers. This is used to prevent deleting currently writen storefiles - * during cleanup. - */ - public abstract Collection writers(); + protected abstract Collection writers(); /** * Subclasses override this method to be called at the end of a successful sequence of append; all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index 0c4807d8bad..042acb0bbc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -152,7 +152,7 @@ public class BrokenStoreFileCleaner extends ScheduledChore { } private boolean isCompactionResultFile(FileStatus file, HStore store) { - return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); + return store.getStoreFilesBeingWritten().contains(file.getPath()); } // Compacted files can still have readers and are cleaned by a separate chore, so they have to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java index 10cd9f009e4..1d45e1c51c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.function.Consumer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.yetus.audience.InterfaceAudience; @@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams { private String fileStoragePolicy = HConstants.EMPTY_STRING; + private Consumer writerCreationTracker; + private CreateStoreFileWriterParams() { } @@ -127,8 +131,16 @@ public final class CreateStoreFileWriterParams { return this; } + public Consumer writerCreationTracker() { + return writerCreationTracker; + } + + public CreateStoreFileWriterParams writerCreationTracker(Consumer writerCreationTracker) { + this.writerCreationTracker = writerCreationTracker; + return this; + } + public static CreateStoreFileWriterParams create() { return new CreateStoreFileWriterParams(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index 1e10eb2db23..8201cb152c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { } @Override - public Collection writers() { + protected Collection writers() { return lowerBoundary2Writer.values(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 306760d7ce6..0f3daa4c177 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -44,8 +45,8 @@ public class DefaultStoreFlusher extends StoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException { ArrayList result = new ArrayList<>(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -59,7 +60,7 @@ public class DefaultStoreFlusher extends StoreFlusher { synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = createWriter(snapshot, false); + writer = createWriter(snapshot, false, writerCreationTracker); IOException e = null; try { performFlush(scanner, writer, throughputController); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 73018286630..b9b57bc4a85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -156,8 +158,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // rows that has cells from both memstore and files (or only files) private LongAdder mixedRowReadsCount = new LongAdder(); - private boolean cacheOnWriteLogged; - /** * Lock specific to archiving compacted store files. This avoids races around * the combination of retrieving the list of compacted files and moving them to @@ -215,14 +215,46 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private final StoreContext storeContext; + // Used to track the store files which are currently being written. For compaction, if we want to + // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to + // track the store files being written when flushing. + // Notice that the creation is in the background compaction or flush thread and we will get the + // files in other thread, so it needs to be thread safe. + private static final class StoreFileWriterCreationTracker implements Consumer { + + private final Set files = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + @Override + public void accept(Path t) { + files.add(t); + } + + public Set get() { + return Collections.unmodifiableSet(files); + } + } + + // We may have multiple compaction running at the same time, and flush can also happen at the same + // time, so here we need to use a collection, and the collection needs to be thread safe. + // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to + // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to + // IdentityHashMap if later we want to implement hashCode or equals. + private final Set storeFileWriterCreationTrackers = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + // For the SFT implementation which we will write tmp store file first, we do not need to clean up + // the broken store files under the data directory, which means we do not need to track the store + // file writer creation. So here we abstract a factory to return different trackers for different + // SFT implementations. + private final Supplier storeFileWriterCreationTrackerFactory; + /** * Constructor * @param family HColumnDescriptor for this column - * @param confParam configuration object failed. Can be null. + * @param confParam configuration object failed. Can be null. */ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration confParam, boolean warmup) throws IOException { - this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); this.region = region; @@ -267,6 +299,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); storeEngine.initialize(warmup); + // if require writing to tmp dir first, then we just return null, which indicate that we do not + // need to track the creation of store file writer, otherwise we return a new + // StoreFileWriterCreationTracker. + this.storeFileWriterCreationTrackerFactory = + storeEngine.requireWritingToTmpDirFirst() ? () -> null + : () -> new StoreFileWriterCreationTracker(); refreshStoreSizeAndTotalBytes(); flushRetriesNumber = conf.getInt( @@ -290,7 +328,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); - cacheOnWriteLogged = false; } private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { @@ -795,8 +832,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, * @throws IOException if exception occurs during process */ protected List flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. @@ -806,8 +843,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, IOException lastException = null; for (int i = 0; i < flushRetriesNumber; i++) { try { - List pathNames = - flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); + List pathNames = flusher.flushSnapshot( + snapshot, + logCacheFlushId, + status, + throughputController, + tracker, + writerCreationTracker); Path lastPathName = null; try { for (Path pathName : pathNames) { @@ -1118,6 +1160,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, ThroughputController throughputController, User user) throws IOException { assert compaction != null; CompactionRequestImpl cr = compaction.getRequest(); + StoreFileWriterCreationTracker writerCreationTracker = + storeFileWriterCreationTrackerFactory.get(); + if (writerCreationTracker != null) { + cr.setWriterCreationTracker(writerCreationTracker); + storeFileWriterCreationTrackers.add(writerCreationTracker); + } try { // Do all sanity checking in here if we have a valid CompactionRequestImpl // because we need to clean up after it on the way out in a finally @@ -1157,18 +1205,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } replaceStoreFiles(filesToCompact, sfs, true); - // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - // CleanerChore know that compaction is done and the file can be cleaned up if compaction - // have failed. - storeEngine.resetCompactionWriter(); - - if (cr.isMajor()) { - majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); - } else { - compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); - } long outputBytes = getTotalSize(sfs); // At this point the store will use new files for all new scanners. @@ -1577,6 +1613,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, synchronized (filesCompacting) { filesCompacting.removeAll(cr.getFiles()); } + // The tracker could be null, for example, we do not need to track the creation of store file + // writer due to different implementation of SFT, or the compaction is canceled. + if (cr.getWriterCreationTracker() != null) { + storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); + } } /** @@ -1900,6 +1941,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private final class StoreFlusherImpl implements StoreFlushContext { private final FlushLifeCycleTracker tracker; + private final StoreFileWriterCreationTracker writerCreationTracker; private final long cacheFlushSeqNum; private MemStoreSnapshot snapshot; private List tempFiles; @@ -1911,6 +1953,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { this.cacheFlushSeqNum = cacheFlushSeqNum; this.tracker = tracker; + this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); } /** @@ -1931,41 +1974,61 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, public void flushCache(MonitoredTask status) throws IOException { RegionServerServices rsService = region.getRegionServerServices(); ThroughputController throughputController = - rsService == null ? null : rsService.getFlushThroughputController(); - tempFiles = - HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); + rsService == null ? null : rsService.getFlushThroughputController(); + // it could be null if we do not need to track the creation of store file writer due to + // different SFT implementation. + if (writerCreationTracker != null) { + HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); + } + tempFiles = HStore.this.flushCache( + cacheFlushSeqNum, + snapshot, + status, + throughputController, + tracker, + writerCreationTracker); } @Override public boolean commit(MonitoredTask status) throws IOException { - if (CollectionUtils.isEmpty(this.tempFiles)) { - return false; - } - status.setStatus("Flushing " + this + ": reopening flushed file"); - List storeFiles = storeEngine.commitStoreFiles(tempFiles, false); - for (HStoreFile sf : storeFiles) { - StoreFileReader r = sf.getReader(); - if (LOG.isInfoEnabled()) { - LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), - cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); + try { + if (CollectionUtils.isEmpty(this.tempFiles)) { + return false; + } + status.setStatus("Flushing " + this + ": reopening flushed file"); + List storeFiles = storeEngine.commitStoreFiles(tempFiles, false); + for (HStoreFile sf : storeFiles) { + StoreFileReader r = sf.getReader(); + if (LOG.isInfoEnabled()) { + LOG.info( + "Added {}, entries={}, sequenceid={}, filesize={}", + sf, + r.getEntries(), + cacheFlushSeqNum, + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); + } + outputFileSize += r.length(); + storeSize.addAndGet(r.length()); + totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); + committedFiles.add(sf.getPath()); } - outputFileSize += r.length(); - storeSize.addAndGet(r.length()); - totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); - committedFiles.add(sf.getPath()); - } - flushedCellsCount.addAndGet(cacheFlushCount); - flushedCellsSize.addAndGet(cacheFlushSize); - flushedOutputFileSize.addAndGet(outputFileSize); - // call coprocessor after we have done all the accounting above - for (HStoreFile sf : storeFiles) { - if (getCoprocessorHost() != null) { - getCoprocessorHost().postFlush(HStore.this, sf, tracker); + flushedCellsCount.addAndGet(cacheFlushCount); + flushedCellsSize.addAndGet(cacheFlushSize); + flushedOutputFileSize.addAndGet(outputFileSize); + // call coprocessor after we have done all the accounting above + for (HStoreFile sf : storeFiles) { + if (getCoprocessorHost() != null) { + getCoprocessorHost().postFlush(HStore.this, sf, tracker); + } + } + // Add new file to store files. Clear snapshot too while we have the Store write lock. + return completeFlush(storeFiles, snapshot.getId()); + } finally { + if (writerCreationTracker != null) { + HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); } } - // Add new file to store files. Clear snapshot too while we have the Store write lock. - return completeFlush(storeFiles, snapshot.getId()); } @Override @@ -2111,6 +2174,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, return majorCompactedCellsSize.get(); } + public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { + if (isMajor) { + majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); + } else { + compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + compactedCellsSize.addAndGet(progress.totalCompactedSize); + } + } + /** * Returns the StoreEngine that is backing this concrete implementation of Store. * @return Returns the {@link StoreEngine} object used internally inside this HStore object. @@ -2406,4 +2479,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, mixedRowReadsCount.increment(); } } + + /** + * Return the storefiles which are currently being written to. Mainly used by + * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in + * SFT yet. + */ + Set getStoreFilesBeingWritten() { + return storeFileWriterCreationTrackers.stream() + .flatMap(t -> t.get().stream()) + .collect(Collectors.toSet()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index d85553ac808..847187074df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -94,7 +92,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti */ @InterfaceAudience.Private public abstract class StoreEngine { + C extends Compactor, SFM extends StoreFileManager> { private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); @@ -157,7 +155,7 @@ public abstract class StoreEngine getCompactor() { return this.compactor; } @@ -544,17 +542,6 @@ public abstract class StoreEngine> compactedFilesSupplier = () -> Collections.emptySet(); private String fileStoragePolicy; + // this is used to track the creation of the StoreFileWriter, mainly used for the SFT + // implementation where we will write store files directly to the final place, instead of + // writing a tmp file first. Under this scenario, we will have a background task to purge the + // store files which are not recorded in the SFT, but for the newly created store file writer, + // they are not tracked in SFT, so here we need to record them and treat them specially. + private Consumer writerCreationTracker; - public Builder(Configuration conf, CacheConfig cacheConf, - FileSystem fs) { + public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; this.cacheConf = cacheConf; this.fs = fs; @@ -525,6 +531,11 @@ public class StoreFileWriter implements CellSink, ShipperListener { return this; } + public Builder withWriterCreationTracker(Consumer writerCreationTracker) { + this.writerCreationTracker = writerCreationTracker; + return this; + } + /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using @@ -573,8 +584,21 @@ public class StoreFileWriter implements CellSink, ShipperListener { bloomType = BloomType.NONE; } } - - return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, + // make sure we call this before actually create the writer + // in fact, it is not a big deal to even add an inexistent file to the track, as we will never + // try to delete it and finally we will clean the tracker up after compaction. But if the file + // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file + // and cause problem. + if (writerCreationTracker != null) { + writerCreationTracker.accept(filePath); + } + return new StoreFileWriter( + fs, + filePath, + conf, + cacheConf, + bloomType, + maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 58031288f75..c783100ed67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -56,8 +56,8 @@ abstract class StoreFlusher { * @return List of files written. Can be empty; must not be null. */ public abstract List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException; + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException; protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status) throws IOException { @@ -70,13 +70,17 @@ abstract class StoreFlusher { writer.close(); } - protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag) - throws IOException { + protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag, + Consumer writerCreationTracker) throws IOException { return store.getStoreEngine() - .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount()) - .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false) - .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) - .shouldDropBehind(false)); + .createWriter( + CreateStoreFileWriterParams.create() + .maxKeyCount(snapshot.getCellsCount()) + .compression(store.getColumnFamilyDescriptor().getCompressionType()) + .isCompaction(false) + .includeMVCCReadpoint(true) + .includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) + .shouldDropBehind(false).writerCreationTracker(writerCreationTracker)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index a4e943ac8b0..fc0598d89ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { } @Override - public Collection writers() { + protected Collection writers() { return existingWriters; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index f8183b7645a..fb9115e01ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -23,7 +23,7 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K import java.io.IOException; import java.util.ArrayList; import java.util.List; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; @@ -54,11 +54,14 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException { List result = new ArrayList<>(); int cellsCount = snapshot.getCellsCount(); - if (cellsCount == 0) return result; // don't flush if there are no entries + if (cellsCount == 0) { + // don't flush if there are no entries + return result; + } InternalScanner scanner = createScanner(snapshot.getScanners(), tracker); @@ -70,8 +73,9 @@ public class StripeStoreFlusher extends StoreFlusher { StripeMultiFileWriter mw = null; try { mw = req.createWriter(); // Writer according to the policy. - StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot); - StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; + StripeMultiFileWriter.WriterFactory factory = + createWriterFactory(snapshot, writerCreationTracker); + StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; mw.init(storeScanner, factory); synchronized (flushLock) { @@ -98,12 +102,13 @@ public class StripeStoreFlusher extends StoreFlusher { return result; } - private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) { + private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot, + Consumer writerCreationTracker) { return new StripeMultiFileWriter.WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { // XXX: it used to always pass true for includesTag, re-consider? - return StripeStoreFlusher.this.createWriter(snapshot, true); + return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker); } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 19b7a98627e..23d16934b65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,19 +46,21 @@ public abstract class AbstractMultiOutputCompactor writerCreationTracker) { WriterFactory writerFactory = new WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { - return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major); + return AbstractMultiOutputCompactor.this + .createWriter(fd, shouldDropBehind, major, writerCreationTracker); } @Override public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) throws IOException { - return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, - fileStoragePolicy, major); + return AbstractMultiOutputCompactor.this + .createWriter(fd, shouldDropBehind, fileStoragePolicy, major, writerCreationTracker); } }; // Prepare multi-writer, and perform the compaction using scanner and writer. @@ -68,7 +70,7 @@ public abstract class AbstractMultiOutputCompactor writerCreationTracker; public CompactionRequestImpl(Collection files) { this.selectionTime = EnvironmentEdgeManager.currentTime(); @@ -137,6 +139,14 @@ public class CompactionRequestImpl implements CompactionRequest { return tracker; } + public Consumer getWriterCreationTracker() { + return writerCreationTracker; + } + + public void setWriterCreationTracker(Consumer writerCreationTracker) { + this.writerCreationTracker = writerCreationTracker; + } + public boolean isAfterSplit() { return isAfterSplit; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 93f0555b7f4..80cc14be74f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -25,12 +25,13 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Set; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -40,7 +41,6 @@ import org.apache.hadoop.hbase.PrivateConstants; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileInfo; -import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; @@ -71,15 +71,18 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables; /** * A compactor is a compaction algorithm associated a given policy. Base class also contains * reusable parts for implementing compactors (what is common and what isn't is evolving). + *

+ * Compactions might be concurrent against a given store and the Compactor is shared among + * them. Do not put mutable state into class fields. All Compactor class fields should be + * final or effectively final. + * 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. */ @InterfaceAudience.Private public abstract class Compactor { private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; - protected volatile CompactionProgress progress; protected final Configuration conf; protected final HStore store; - protected final int compactionKVMax; protected final Compression.Algorithm majorCompactionCompression; protected final Compression.Algorithm minorCompactionCompression; @@ -93,15 +96,15 @@ public abstract class Compactor { protected static final String MINOR_COMPACTION_DROP_CACHE = "hbase.regionserver.minorcompaction.pagecache.drop"; - private final boolean dropCacheMajor; - private final boolean dropCacheMinor; + protected final boolean dropCacheMajor; + protected final boolean dropCacheMinor; - // In compaction process only a single thread will access and write to this field, and - // getCompactionTargets is the only place we will access it other than the compaction thread, so - // make it volatile. - protected volatile T writer = null; + // We track progress per request using the CompactionRequestImpl identity as key. + // completeCompaction() cleans up this state. + private final Set progressSet = + Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); - //TODO: depending on Store is not good but, realistically, all compactors currently do. + // TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { this.conf = conf; this.store = store; @@ -117,15 +120,9 @@ public abstract class Compactor { this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); } - - protected interface CellSinkFactory { - S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major) - throws IOException; - } - - public CompactionProgress getProgress() { - return this.progress; + S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, + Consumer writerCreationTracker) throws IOException; } /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ @@ -272,12 +269,13 @@ public abstract class Compactor { }; protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, - boolean major) { + boolean major, Consumer writerCreationTracker) { return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) .compression(major ? majorCompactionCompression : minorCompactionCompression) .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) - .totalCompactedFilesSize(fd.totalCompactedFilesSize); + .totalCompactedFilesSize(fd.totalCompactedFilesSize) + .writerCreationTracker(writerCreationTracker); } /** @@ -287,16 +285,20 @@ public abstract class Compactor { * @throws IOException if creation failed */ protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, - boolean major) throws IOException { + boolean major, Consumer writerCreationTracker) throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. - return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major)); + return store.getStoreEngine() + .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); } protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, - String fileStoragePolicy, boolean major) throws IOException { + String fileStoragePolicy, boolean major, Consumer writerCreationTracker) + throws IOException { return store.getStoreEngine() - .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy)); + .createWriter( + createParams(fd, shouldDropBehind, major, writerCreationTracker) + .fileStoragePolicy(fileStoragePolicy)); } private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, @@ -328,7 +330,6 @@ public abstract class Compactor { InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); - this.progress = new CompactionProgress(fd.maxKeyCount); // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); @@ -344,6 +345,9 @@ public abstract class Compactor { boolean finished = false; List scanners = createFileScanners(request.getFiles(), smallestReadPoint, dropCache); + T writer = null; + CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); + progressSet.add(progress); try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); @@ -356,14 +360,14 @@ public abstract class Compactor { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - if (writer != null){ - LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() - .map(n -> n.toString()) - .collect(Collectors.joining(", ", "{ ", " }"))); - } - writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); - finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size()); + writer = sinkFactory.createWriter( + scanner, + fd, + dropCache, + request.isMajor(), + request.getWriterCreationTracker()); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles(), request.getFiles().size(), progress); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -381,34 +385,41 @@ public abstract class Compactor { } else { Closeables.close(scanner, true); } - if (!finished && writer != null) { - abortWriter(); + if (!finished) { + if (writer != null) { + abortWriter(writer); + } + } else { + store.updateCompactedMetrics(request.isMajor(), progress); } + progressSet.remove(progress); } assert finished : "We should have exited the method on all error paths"; assert writer != null : "Writer should be non-null if no error"; - return commitWriter(fd, request); + return commitWriter(writer, fd, request); } - protected abstract List commitWriter(FileDetails fd, + protected abstract List commitWriter(T writer, FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter() throws IOException; + protected abstract void abortWriter(T writer) throws IOException; /** * Performs the compaction. * @param fd FileDetails of cell sink writer * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -550,22 +561,27 @@ public abstract class Compactor { dropDeletesFromRow, dropDeletesToRow); } - public List getCompactionTargets() { - T writer = this.writer; - if (writer == null) { - return Collections.emptyList(); + /** + * Return the aggregate progress for all currently active compactions. + */ + public CompactionProgress getProgress() { + synchronized (progressSet) { + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; + long totalCompactedSize = 0; + for (CompactionProgress progress: progressSet) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + totalCompactedSize += progress.totalCompactedSize; + } + CompactionProgress result = new CompactionProgress(totalCompactingKVs); + result.currentCompactedKVs = currentCompactedKVs; + result.totalCompactedSize = totalCompactedSize; + return result; } - if (writer instanceof StoreFileWriter) { - return Arrays.asList(((StoreFileWriter) writer).getPath()); - } - return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath()) - .collect(Collectors.toList()); } - /** - * Reset the Writer when the new storefiles were successfully added - */ - public void resetWriter(){ - writer = null; + public boolean isCompacting() { + return !progressSet.isEmpty(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 43e037c5e70..c8c10e16ff1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.OptionalLong; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; @@ -68,21 +68,26 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor writerCreationTracker) + throws IOException { + DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter( + lowerBoundaries, + lowerBoundariesPolicies, + needEmptyFile(request)); + initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } - }, throughputController, user); + }, + throughputController, + user); } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); return pathList; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 03e3a1b5f39..0e91d8870b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.HStore; @@ -47,10 +48,11 @@ public class DefaultCompactor extends Compactor { private final CellSinkFactory writerFactory = new CellSinkFactory() { @Override - public StoreFileWriter createWriter(InternalScanner scanner, - org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { - return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major); + public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd, + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { + return DefaultCompactor.this + .createWriter(fd, shouldDropBehind, major, writerCreationTracker); } }; @@ -63,7 +65,7 @@ public class DefaultCompactor extends Compactor { } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); @@ -72,12 +74,6 @@ public class DefaultCompactor extends Compactor { } @Override - protected void abortWriter() throws IOException { - abortWriter(writer); - // this step signals that the target file is no longer written and can be cleaned up - writer = null; - } - protected final void abortWriter(StoreFileWriter writer) throws IOException { Path leftoverFile = writer.getPath(); try { @@ -92,4 +88,5 @@ public class DefaultCompactor extends Compactor { leftoverFile, e); } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 060a11b41fe..6413a304d55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.HStore; @@ -88,18 +88,26 @@ public class StripeCompactor extends AbstractMultiOutputCompactor() { @Override public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( - store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow); - initMultiWriter(writer, scanner, fd, shouldDropBehind, major); + store.getComparator(), + targetBoundaries, + majorRangeFromRow, + majorRangeToRow); + initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } - }, throughputController, user); + }, + throughputController, + user); } public List compact(CompactionRequestImpl request, final int targetCount, final long targetSize, @@ -115,20 +123,28 @@ public class StripeCompactor extends AbstractMultiOutputCompactor writerCreationTracker) + throws IOException { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( - store.getComparator(), targetCount, targetSize, left, right); - initMultiWriter(writer, scanner, fd, shouldDropBehind, major); + store.getComparator(), + targetCount, + targetSize, + left, + right); + initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } - }, throughputController, user); + }, + throughputController, + user); } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index 1bf354f00a0..f3e62670796 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -177,11 +177,15 @@ abstract class StoreFileTrackerBase implements StoreFileTracker { } StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) - .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) - .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) - .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) + .withOutputDir(outputDir) + .withBloomType(ctx.getBloomFilterType()) + .withMaxKeyCount(params.maxKeyCount()) + .withFavoredNodes(ctx.getFavoredNodes()) + .withFileContext(hFileContext) + .withShouldDropCacheBehind(params.shouldDropBehind()) .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) - .withFileStoragePolicy(params.fileStoragePolicy()); + .withFileStoragePolicy(params.fileStoragePolicy()) + .withWriterCreationTracker(params.writerCreationTracker()); return builder.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 813c288c61d..8377a0c7e5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -88,9 +89,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { } @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { totalCompactions.incrementAndGet(); if (major) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index 6a0a8baa9de..7863cd4e316 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -128,13 +128,14 @@ public class TestCompactorMemLeak { } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); - return super.commitWriter(fd, request); + return super.commitWriter(writer, fd, request); } } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 5543c03b650..ab58eb8b497 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.function.IntBinaryOperator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; @@ -2477,9 +2479,10 @@ public class TestHStore { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + FlushLifeCycleTracker tracker, Consumer writerCreationTracker) throws IOException { counter.incrementAndGet(); - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, + writerCreationTracker); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 85fdf0871f7..9552142881c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -211,26 +209,9 @@ public class TestMajorCompaction { Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100)); assertEquals(compactionThreshold, result.size()); - // see if CompactionProgress is in place but null - for (HStore store : r.getStores()) { - assertNull(store.getCompactionProgress()); - } - r.flush(true); r.compact(true); - // see if CompactionProgress has done its thing on at least one store - int storeCount = 0; - for (HStore store : r.getStores()) { - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null) { - ++storeCount; - assertTrue(progress.currentCompactedKVs > 0); - assertTrue(progress.getTotalCompactingKVs() > 0); - } - assertTrue(storeCount > 0); - } - // look at the second row // Increment the least significant character so we get to next row. byte[] secondRowBytes = START_KEY_BYTES.clone(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 652c019ff04..e20f6871b64 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -117,11 +116,9 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; @@ -620,17 +617,11 @@ public class TestSplitTransactionOnCluster { assertEquals(1, region.getStores().size()); HStore store = region.getStores().get(0); while (store.hasReferences()) { - // Wait on any current compaction to complete first. - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null && progress.getProgressPct() < 1.0f) { - while (progress.getProgressPct() < 1.0f) { - LOG.info("Waiting, progress={}", progress.getProgressPct()); - Threads.sleep(1000); - } - } else { - // Run new compaction. Shoudn't be any others running. - region.compact(true); + while (store.storeEngine.getCompactor().isCompacting()) { + Threads.sleep(100); } + // Run new compaction. Shoudn't be any others running. + region.compact(true); store.closeAndArchiveCompactedFiles(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 4c454d96e89..d07c6b6bcc6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -612,12 +613,13 @@ public abstract class AbstractTestWALReplay { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker, Consumer writerCreationTracker) throws IOException { if (throwExceptionWhenFlushing.get()) { throw new IOException("Simulated exception by tests"); } - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, + writerCreationTracker); } }