HBASE-26999 HStore should try write WAL compaction marker before repl… (#4407)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
e8b44d948c
commit
f6e9d3e1dd
|
@ -1232,14 +1232,17 @@ public class HStore
|
|||
allowedOnPath = ".*/(HStore|TestHStore).java")
|
||||
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
|
||||
boolean writeCompactionMarker) throws IOException {
|
||||
storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
|
||||
storeEngine.replaceStoreFiles(compactedFiles, result,
|
||||
() -> {
|
||||
if (writeCompactionMarker) {
|
||||
writeCompactionWalRecord(compactedFiles, result);
|
||||
}
|
||||
},
|
||||
() -> {
|
||||
synchronized (filesCompacting) {
|
||||
filesCompacting.removeAll(compactedFiles);
|
||||
}
|
||||
});
|
||||
if (writeCompactionMarker) {
|
||||
writeCompactionWalRecord(compactedFiles, result);
|
||||
}
|
||||
// These may be null when the RS is shutting down. The space quota Chores will fix the Region
|
||||
// sizes later so it's not super-critical if we miss these.
|
||||
RegionServerServices rsServices = region.getRegionServerServices();
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
|
@ -407,8 +408,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
|
|||
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
|
||||
|
||||
// propogate the file changes to the underlying store file manager
|
||||
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
|
||||
}); // won't throw an exception
|
||||
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {}, () -> {}); // won't throw an exception
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -491,9 +491,11 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
|
|||
}
|
||||
|
||||
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
|
||||
Collection<HStoreFile> newFiles, Runnable actionUnderLock) throws IOException {
|
||||
Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter,
|
||||
Runnable actionUnderLock) throws IOException {
|
||||
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
|
||||
StoreUtils.toStoreFileInfo(newFiles));
|
||||
walMarkerWriter.run();
|
||||
writeLock();
|
||||
try {
|
||||
storeFileManager.addCompactionResults(compactedFiles, newFiles);
|
||||
|
|
|
@ -1017,14 +1017,14 @@ public class TestHStore {
|
|||
// call first time after files changed
|
||||
spiedStoreEngine.refreshStoreFiles();
|
||||
assertEquals(2, this.store.getStorefilesCount());
|
||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any());
|
||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
||||
|
||||
// call second time
|
||||
spiedStoreEngine.refreshStoreFiles();
|
||||
|
||||
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
|
||||
// refreshed,
|
||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any());
|
||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
||||
}
|
||||
|
||||
private long countMemStoreScanner(StoreScanner scanner) {
|
||||
|
|
Loading…
Reference in New Issue