HBASE-26999 HStore should try write WAL compaction marker before repl… (#4407)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
ad2180b75d
commit
7b7f57619b
|
@ -1231,14 +1231,17 @@ public class HStore
|
||||||
allowedOnPath = ".*/(HStore|TestHStore).java")
|
allowedOnPath = ".*/(HStore|TestHStore).java")
|
||||||
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
|
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
|
||||||
boolean writeCompactionMarker) throws IOException {
|
boolean writeCompactionMarker) throws IOException {
|
||||||
storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
|
storeEngine.replaceStoreFiles(compactedFiles, result,
|
||||||
|
() -> {
|
||||||
|
if (writeCompactionMarker) {
|
||||||
|
writeCompactionWalRecord(compactedFiles, result);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
() -> {
|
||||||
synchronized (filesCompacting) {
|
synchronized (filesCompacting) {
|
||||||
filesCompacting.removeAll(compactedFiles);
|
filesCompacting.removeAll(compactedFiles);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (writeCompactionMarker) {
|
|
||||||
writeCompactionWalRecord(compactedFiles, result);
|
|
||||||
}
|
|
||||||
// These may be null when the RS is shutting down. The space quota Chores will fix the Region
|
// These may be null when the RS is shutting down. The space quota Chores will fix the Region
|
||||||
// sizes later so it's not super-critical if we miss these.
|
// sizes later so it's not super-critical if we miss these.
|
||||||
RegionServerServices rsServices = region.getRegionServerServices();
|
RegionServerServices rsServices = region.getRegionServerServices();
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
@ -407,8 +408,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
|
||||||
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
|
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
|
||||||
|
|
||||||
// propogate the file changes to the underlying store file manager
|
// propogate the file changes to the underlying store file manager
|
||||||
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
|
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {}, () -> {}); // won't throw an exception
|
||||||
}); // won't throw an exception
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -491,9 +491,11 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
|
||||||
}
|
}
|
||||||
|
|
||||||
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
|
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
|
||||||
Collection<HStoreFile> newFiles, Runnable actionUnderLock) throws IOException {
|
Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter,
|
||||||
|
Runnable actionUnderLock) throws IOException {
|
||||||
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
|
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
|
||||||
StoreUtils.toStoreFileInfo(newFiles));
|
StoreUtils.toStoreFileInfo(newFiles));
|
||||||
|
walMarkerWriter.run();
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
storeFileManager.addCompactionResults(compactedFiles, newFiles);
|
storeFileManager.addCompactionResults(compactedFiles, newFiles);
|
||||||
|
|
|
@ -1018,14 +1018,14 @@ public class TestHStore {
|
||||||
// call first time after files changed
|
// call first time after files changed
|
||||||
spiedStoreEngine.refreshStoreFiles();
|
spiedStoreEngine.refreshStoreFiles();
|
||||||
assertEquals(2, this.store.getStorefilesCount());
|
assertEquals(2, this.store.getStorefilesCount());
|
||||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any());
|
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
||||||
|
|
||||||
// call second time
|
// call second time
|
||||||
spiedStoreEngine.refreshStoreFiles();
|
spiedStoreEngine.refreshStoreFiles();
|
||||||
|
|
||||||
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
|
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
|
||||||
// refreshed,
|
// refreshed,
|
||||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any());
|
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
private long countMemStoreScanner(StoreScanner scanner) {
|
private long countMemStoreScanner(StoreScanner scanner) {
|
||||||
|
|
Loading…
Reference in New Issue