HBASE-26118 The HStore.commitFile and HStore.moveFileIntoPlace almost have the same logic (#3525)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
4a3c7d73b0
commit
02d263e7dd
|
@ -1084,27 +1084,44 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param path The pathname of the tmp file into which the store was flushed
|
* Commit the given {@code files}.
|
||||||
* @return store file created.
|
* <p/>
|
||||||
|
* We will move the file into data directory, and open it.
|
||||||
|
* @param files the files want to commit
|
||||||
|
* @param validate whether to validate the store files
|
||||||
|
* @return the committed store files
|
||||||
*/
|
*/
|
||||||
private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
|
private List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
|
||||||
throws IOException {
|
List<HStoreFile> committedFiles = new ArrayList<>(files.size());
|
||||||
// Write-out finished successfully, move into the right spot
|
HRegionFileSystem hfs = getRegionFileSystem();
|
||||||
Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
|
String familyName = getColumnFamilyName();
|
||||||
|
for (Path file : files) {
|
||||||
status.setStatus("Flushing " + this + ": reopening flushed file");
|
try {
|
||||||
HStoreFile sf = createStoreFileAndReader(dstPath);
|
if (validate) {
|
||||||
|
validateStoreFile(file);
|
||||||
StoreFileReader r = sf.getReader();
|
}
|
||||||
this.storeSize.addAndGet(r.length());
|
Path committedPath = hfs.commitStoreFile(familyName, file);
|
||||||
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
|
HStoreFile sf = createStoreFileAndReader(committedPath);
|
||||||
|
committedFiles.add(sf);
|
||||||
if (LOG.isInfoEnabled()) {
|
} catch (IOException e) {
|
||||||
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
|
LOG.error("Failed to commit store file {}", file, e);
|
||||||
", sequenceid=" + logCacheFlushId +
|
// Try to delete the files we have committed before.
|
||||||
", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
|
// It is OK to fail when deleting as leaving the file there does not cause any data
|
||||||
|
// corruption problem. It just introduces some duplicated data which may impact read
|
||||||
|
// performance a little when reading before compaction.
|
||||||
|
for (HStoreFile sf : committedFiles) {
|
||||||
|
Path pathToDelete = sf.getPath();
|
||||||
|
try {
|
||||||
|
sf.deleteStoreFile();
|
||||||
|
} catch (IOException deleteEx) {
|
||||||
|
LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
|
||||||
|
deleteEx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IOException("Failed to commit the flush", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return sf;
|
return committedFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||||
|
@ -1501,7 +1518,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
List<Path> newFiles) throws IOException {
|
List<Path> newFiles) throws IOException {
|
||||||
// Do the steps necessary to complete the compaction.
|
// Do the steps necessary to complete the compaction.
|
||||||
setStoragePolicyFromFileName(newFiles);
|
setStoragePolicyFromFileName(newFiles);
|
||||||
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
|
List<HStoreFile> sfs = commitStoreFiles(newFiles, true);
|
||||||
|
if (this.getCoprocessorHost() != null) {
|
||||||
|
for (HStoreFile sf : sfs) {
|
||||||
|
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
|
||||||
|
}
|
||||||
|
}
|
||||||
writeCompactionWalRecord(filesToCompact, sfs);
|
writeCompactionWalRecord(filesToCompact, sfs);
|
||||||
replaceStoreFiles(filesToCompact, sfs);
|
replaceStoreFiles(filesToCompact, sfs);
|
||||||
if (cr.isMajor()) {
|
if (cr.isMajor()) {
|
||||||
|
@ -1542,29 +1564,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
|
|
||||||
List<Path> newFiles, User user) throws IOException {
|
|
||||||
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
|
|
||||||
for (Path newFile : newFiles) {
|
|
||||||
assert newFile != null;
|
|
||||||
HStoreFile sf = moveFileIntoPlace(newFile);
|
|
||||||
if (this.getCoprocessorHost() != null) {
|
|
||||||
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
|
|
||||||
}
|
|
||||||
assert sf != null;
|
|
||||||
sfs.add(sf);
|
|
||||||
}
|
|
||||||
return sfs;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Package-visible for tests
|
|
||||||
HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
|
|
||||||
validateStoreFile(newFile);
|
|
||||||
// Move the file into the right spot
|
|
||||||
Path destPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), newFile);
|
|
||||||
return createStoreFileAndReader(destPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes the compaction WAL record.
|
* Writes the compaction WAL record.
|
||||||
* @param filesCompacted Files compacted (input).
|
* @param filesCompacted Files compacted (input).
|
||||||
|
@ -2346,42 +2345,31 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
if (CollectionUtils.isEmpty(this.tempFiles)) {
|
if (CollectionUtils.isEmpty(this.tempFiles)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
|
status.setStatus("Flushing " + this + ": reopening flushed file");
|
||||||
for (Path storeFilePath : tempFiles) {
|
List<HStoreFile> storeFiles = commitStoreFiles(tempFiles, false);
|
||||||
try {
|
|
||||||
HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
|
|
||||||
outputFileSize += sf.getReader().length();
|
|
||||||
storeFiles.add(sf);
|
|
||||||
} catch (IOException ex) {
|
|
||||||
LOG.error("Failed to commit store file {}", storeFilePath, ex);
|
|
||||||
// Try to delete the files we have committed before.
|
|
||||||
for (HStoreFile sf : storeFiles) {
|
|
||||||
Path pathToDelete = sf.getPath();
|
|
||||||
try {
|
|
||||||
sf.deleteStoreFile();
|
|
||||||
} catch (IOException deleteEx) {
|
|
||||||
LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, "
|
|
||||||
+ "halting {}", pathToDelete, ex);
|
|
||||||
Runtime.getRuntime().halt(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new IOException("Failed to commit the flush", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (HStoreFile sf : storeFiles) {
|
for (HStoreFile sf : storeFiles) {
|
||||||
if (HStore.this.getCoprocessorHost() != null) {
|
StoreFileReader r = sf.getReader();
|
||||||
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
|
if (LOG.isInfoEnabled()) {
|
||||||
|
LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
|
||||||
|
cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
|
||||||
}
|
}
|
||||||
|
outputFileSize += r.length();
|
||||||
|
storeSize.addAndGet(r.length());
|
||||||
|
totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
|
||||||
committedFiles.add(sf.getPath());
|
committedFiles.add(sf.getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
|
flushedCellsCount.addAndGet(cacheFlushCount);
|
||||||
HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
|
flushedCellsSize.addAndGet(cacheFlushSize);
|
||||||
HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);
|
flushedOutputFileSize.addAndGet(outputFileSize);
|
||||||
|
// call coprocessor after we have done all the accounting above
|
||||||
// Add new file to store files. Clear snapshot too while we have the Store write lock.
|
for (HStoreFile sf : storeFiles) {
|
||||||
return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
|
if (getCoprocessorHost() != null) {
|
||||||
|
getCoprocessorHost().postFlush(HStore.this, sf, tracker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Add new file to store files. Clear snapshot too while we have the Store write lock.
|
||||||
|
return updateStorefiles(storeFiles, snapshot.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE
|
||||||
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
|
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
@ -36,6 +37,7 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -72,6 +74,7 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -85,8 +88,6 @@ import org.junit.rules.TestName;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test compaction framework and common functions
|
* Test compaction framework and common functions
|
||||||
|
@ -98,8 +99,6 @@ public class TestCompaction {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestCompaction.class);
|
HBaseClassTestRule.forClass(TestCompaction.class);
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);
|
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||||
|
@ -346,32 +345,22 @@ public class TestCompaction {
|
||||||
HStore store = r.getStore(COLUMN_FAMILY);
|
HStore store = r.getStore(COLUMN_FAMILY);
|
||||||
|
|
||||||
Collection<HStoreFile> storeFiles = store.getStorefiles();
|
Collection<HStoreFile> storeFiles = store.getStorefiles();
|
||||||
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
|
DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
|
||||||
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
|
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
|
||||||
tool.compact(request, NoLimitThroughputController.INSTANCE, null);
|
tool.compact(request, NoLimitThroughputController.INSTANCE, null);
|
||||||
|
|
||||||
// Now lets corrupt the compacted file.
|
// Now lets corrupt the compacted file.
|
||||||
FileSystem fs = store.getFileSystem();
|
FileSystem fs = store.getFileSystem();
|
||||||
// default compaction policy created one and only one new compacted file
|
// default compaction policy created one and only one new compacted file
|
||||||
Path dstPath = store.getRegionFileSystem().createTempName();
|
Path tmpPath = store.getRegionFileSystem().createTempName();
|
||||||
FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
|
try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
|
||||||
stream.writeChars("CORRUPT FILE!!!!");
|
stream.writeChars("CORRUPT FILE!!!!");
|
||||||
stream.close();
|
|
||||||
Path origPath = store.getRegionFileSystem().commitStoreFile(
|
|
||||||
Bytes.toString(COLUMN_FAMILY), dstPath);
|
|
||||||
|
|
||||||
try {
|
|
||||||
((HStore)store).moveFileIntoPlace(origPath);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// The complete compaction should fail and the corrupt file should remain
|
|
||||||
// in the 'tmp' directory;
|
|
||||||
assertTrue(fs.exists(origPath));
|
|
||||||
assertFalse(fs.exists(dstPath));
|
|
||||||
LOG.info("testCompactionWithCorruptResult Passed");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
fail("testCompactionWithCorruptResult failed since no exception was" +
|
// The complete compaction should fail and the corrupt file should remain
|
||||||
"thrown while completing a corrupt file");
|
// in the 'tmp' directory;
|
||||||
|
assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
|
||||||
|
EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
|
||||||
|
assertTrue(fs.exists(tmpPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue