From 02d263e7dde146956fda0ec245aeae85926ab12f Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 26 Jul 2021 20:58:16 +0800 Subject: [PATCH] HBASE-26118 The HStore.commitFile and HStore.moveFileIntoPlace almost have the same logic (#3525) Signed-off-by: Yulin Niu --- .../hadoop/hbase/regionserver/HStore.java | 136 ++++++++---------- .../hbase/regionserver/TestCompaction.java | 35 ++--- 2 files changed, 74 insertions(+), 97 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 9b482b6ebfd..2e1c862f9c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1084,27 +1084,44 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } /** - * @param path The pathname of the tmp file into which the store was flushed - * @return store file created. + * Commit the given {@code files}. + *

+ * We will move the file into data directory, and open it. + * @param files the files want to commit + * @param validate whether to validate the store files + * @return the committed store files */ - private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) - throws IOException { - // Write-out finished successfully, move into the right spot - Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); - - status.setStatus("Flushing " + this + ": reopening flushed file"); - HStoreFile sf = createStoreFileAndReader(dstPath); - - StoreFileReader r = sf.getReader(); - this.storeSize.addAndGet(r.length()); - this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); - - if (LOG.isInfoEnabled()) { - LOG.info("Added " + sf + ", entries=" + r.getEntries() + - ", sequenceid=" + logCacheFlushId + - ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); + private List commitStoreFiles(List files, boolean validate) throws IOException { + List committedFiles = new ArrayList<>(files.size()); + HRegionFileSystem hfs = getRegionFileSystem(); + String familyName = getColumnFamilyName(); + for (Path file : files) { + try { + if (validate) { + validateStoreFile(file); + } + Path committedPath = hfs.commitStoreFile(familyName, file); + HStoreFile sf = createStoreFileAndReader(committedPath); + committedFiles.add(sf); + } catch (IOException e) { + LOG.error("Failed to commit store file {}", file, e); + // Try to delete the files we have committed before. + // It is OK to fail when deleting as leaving the file there does not cause any data + // corruption problem. It just introduces some duplicated data which may impact read + // performance a little when reading before compaction. + for (HStoreFile sf : committedFiles) { + Path pathToDelete = sf.getPath(); + try { + sf.deleteStoreFile(); + } catch (IOException deleteEx) { + LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, + deleteEx); + } + } + throw new IOException("Failed to commit the flush", e); + } } - return sf; + return committedFiles; } public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, @@ -1501,7 +1518,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, List newFiles) throws IOException { // Do the steps necessary to complete the compaction. setStoragePolicyFromFileName(newFiles); - List sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); + List sfs = commitStoreFiles(newFiles, true); + if (this.getCoprocessorHost() != null) { + for (HStoreFile sf : sfs) { + getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); + } + } writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { @@ -1542,29 +1564,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } } - private List moveCompactedFilesIntoPlace(CompactionRequestImpl cr, - List newFiles, User user) throws IOException { - List sfs = new ArrayList<>(newFiles.size()); - for (Path newFile : newFiles) { - assert newFile != null; - HStoreFile sf = moveFileIntoPlace(newFile); - if (this.getCoprocessorHost() != null) { - getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); - } - assert sf != null; - sfs.add(sf); - } - return sfs; - } - - // Package-visible for tests - HStoreFile moveFileIntoPlace(Path newFile) throws IOException { - validateStoreFile(newFile); - // Move the file into the right spot - Path destPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), newFile); - return createStoreFileAndReader(destPath); - } - /** * Writes the compaction WAL record. * @param filesCompacted Files compacted (input). @@ -2346,42 +2345,31 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, if (CollectionUtils.isEmpty(this.tempFiles)) { return false; } - List storeFiles = new ArrayList<>(this.tempFiles.size()); - for (Path storeFilePath : tempFiles) { - try { - HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); - outputFileSize += sf.getReader().length(); - storeFiles.add(sf); - } catch (IOException ex) { - LOG.error("Failed to commit store file {}", storeFilePath, ex); - // Try to delete the files we have committed before. - for (HStoreFile sf : storeFiles) { - Path pathToDelete = sf.getPath(); - try { - sf.deleteStoreFile(); - } catch (IOException deleteEx) { - LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " - + "halting {}", pathToDelete, ex); - Runtime.getRuntime().halt(1); - } - } - throw new IOException("Failed to commit the flush", ex); - } - } - + status.setStatus("Flushing " + this + ": reopening flushed file"); + List storeFiles = commitStoreFiles(tempFiles, false); for (HStoreFile sf : storeFiles) { - if (HStore.this.getCoprocessorHost() != null) { - HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); + StoreFileReader r = sf.getReader(); + if (LOG.isInfoEnabled()) { + LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), + cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); } + outputFileSize += r.length(); + storeSize.addAndGet(r.length()); + totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); committedFiles.add(sf.getPath()); } - HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); - HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); - HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); - - // Add new file to store files. Clear snapshot too while we have the Store write lock. - return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); + flushedCellsCount.addAndGet(cacheFlushCount); + flushedCellsSize.addAndGet(cacheFlushSize); + flushedOutputFileSize.addAndGet(outputFileSize); + // call coprocessor after we have done all the accounting above + for (HStoreFile sf : storeFiles) { + if (getCoprocessorHost() != null) { + getCoprocessorHost().postFlush(HStore.this, sf, tracker); + } + } + // Add new file to store files. Clear snapshot too while we have the Store write lock. + return updateStorefiles(storeFiles, snapshot.getId()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index ac4af015443..3c3b162097f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -36,6 +37,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -72,6 +74,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; @@ -85,8 +88,6 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Test compaction framework and common functions @@ -98,8 +99,6 @@ public class TestCompaction { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCompaction.class); - private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class); - @Rule public TestName name = new TestName(); private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); @@ -346,32 +345,22 @@ public class TestCompaction { HStore store = r.getStore(COLUMN_FAMILY); Collection storeFiles = store.getStorefiles(); - DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor(); + DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); tool.compact(request, NoLimitThroughputController.INSTANCE, null); // Now lets corrupt the compacted file. FileSystem fs = store.getFileSystem(); // default compaction policy created one and only one new compacted file - Path dstPath = store.getRegionFileSystem().createTempName(); - FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null); - stream.writeChars("CORRUPT FILE!!!!"); - stream.close(); - Path origPath = store.getRegionFileSystem().commitStoreFile( - Bytes.toString(COLUMN_FAMILY), dstPath); - - try { - ((HStore)store).moveFileIntoPlace(origPath); - } catch (Exception e) { - // The complete compaction should fail and the corrupt file should remain - // in the 'tmp' directory; - assertTrue(fs.exists(origPath)); - assertFalse(fs.exists(dstPath)); - LOG.info("testCompactionWithCorruptResult Passed"); - return; + Path tmpPath = store.getRegionFileSystem().createTempName(); + try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { + stream.writeChars("CORRUPT FILE!!!!"); } - fail("testCompactionWithCorruptResult failed since no exception was" + - "thrown while completing a corrupt file"); + // The complete compaction should fail and the corrupt file should remain + // in the 'tmp' directory; + assertThrows(IOException.class, () -> store.doCompaction(null, null, null, + EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); + assertTrue(fs.exists(tmpPath)); } /**