diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9b482b6ebfd..2e1c862f9c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1084,27 +1084,44 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
/**
- * @param path The pathname of the tmp file into which the store was flushed
- * @return store file created.
+ * Commit the given {@code files}.
+ *
+ * We will move the file into data directory, and open it.
+ * @param files the files want to commit
+ * @param validate whether to validate the store files
+ * @return the committed store files
*/
- private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
- throws IOException {
- // Write-out finished successfully, move into the right spot
- Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
-
- status.setStatus("Flushing " + this + ": reopening flushed file");
- HStoreFile sf = createStoreFileAndReader(dstPath);
-
- StoreFileReader r = sf.getReader();
- this.storeSize.addAndGet(r.length());
- this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Added " + sf + ", entries=" + r.getEntries() +
- ", sequenceid=" + logCacheFlushId +
- ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+ private List commitStoreFiles(List files, boolean validate) throws IOException {
+ List committedFiles = new ArrayList<>(files.size());
+ HRegionFileSystem hfs = getRegionFileSystem();
+ String familyName = getColumnFamilyName();
+ for (Path file : files) {
+ try {
+ if (validate) {
+ validateStoreFile(file);
+ }
+ Path committedPath = hfs.commitStoreFile(familyName, file);
+ HStoreFile sf = createStoreFileAndReader(committedPath);
+ committedFiles.add(sf);
+ } catch (IOException e) {
+ LOG.error("Failed to commit store file {}", file, e);
+ // Try to delete the files we have committed before.
+ // It is OK to fail when deleting as leaving the file there does not cause any data
+ // corruption problem. It just introduces some duplicated data which may impact read
+ // performance a little when reading before compaction.
+ for (HStoreFile sf : committedFiles) {
+ Path pathToDelete = sf.getPath();
+ try {
+ sf.deleteStoreFile();
+ } catch (IOException deleteEx) {
+ LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
+ deleteEx);
+ }
+ }
+ throw new IOException("Failed to commit the flush", e);
+ }
}
- return sf;
+ return committedFiles;
}
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
@@ -1501,7 +1518,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
List newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
- List sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
+ List sfs = commitStoreFiles(newFiles, true);
+ if (this.getCoprocessorHost() != null) {
+ for (HStoreFile sf : sfs) {
+ getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
+ }
+ }
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
@@ -1542,29 +1564,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
}
- private List moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
- List newFiles, User user) throws IOException {
- List sfs = new ArrayList<>(newFiles.size());
- for (Path newFile : newFiles) {
- assert newFile != null;
- HStoreFile sf = moveFileIntoPlace(newFile);
- if (this.getCoprocessorHost() != null) {
- getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
- }
- assert sf != null;
- sfs.add(sf);
- }
- return sfs;
- }
-
- // Package-visible for tests
- HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
- validateStoreFile(newFile);
- // Move the file into the right spot
- Path destPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), newFile);
- return createStoreFileAndReader(destPath);
- }
-
/**
* Writes the compaction WAL record.
* @param filesCompacted Files compacted (input).
@@ -2346,42 +2345,31 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
if (CollectionUtils.isEmpty(this.tempFiles)) {
return false;
}
- List storeFiles = new ArrayList<>(this.tempFiles.size());
- for (Path storeFilePath : tempFiles) {
- try {
- HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
- outputFileSize += sf.getReader().length();
- storeFiles.add(sf);
- } catch (IOException ex) {
- LOG.error("Failed to commit store file {}", storeFilePath, ex);
- // Try to delete the files we have committed before.
- for (HStoreFile sf : storeFiles) {
- Path pathToDelete = sf.getPath();
- try {
- sf.deleteStoreFile();
- } catch (IOException deleteEx) {
- LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, "
- + "halting {}", pathToDelete, ex);
- Runtime.getRuntime().halt(1);
- }
- }
- throw new IOException("Failed to commit the flush", ex);
- }
- }
-
+ status.setStatus("Flushing " + this + ": reopening flushed file");
+ List storeFiles = commitStoreFiles(tempFiles, false);
for (HStoreFile sf : storeFiles) {
- if (HStore.this.getCoprocessorHost() != null) {
- HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
+ StoreFileReader r = sf.getReader();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
+ cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
}
+ outputFileSize += r.length();
+ storeSize.addAndGet(r.length());
+ totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
committedFiles.add(sf.getPath());
}
- HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
- HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
- HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);
-
- // Add new file to store files. Clear snapshot too while we have the Store write lock.
- return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
+ flushedCellsCount.addAndGet(cacheFlushCount);
+ flushedCellsSize.addAndGet(cacheFlushSize);
+ flushedOutputFileSize.addAndGet(outputFileSize);
+ // call coprocessor after we have done all the accounting above
+ for (HStoreFile sf : storeFiles) {
+ if (getCoprocessorHost() != null) {
+ getCoprocessorHost().postFlush(HStore.this, sf, tracker);
+ }
+ }
+ // Add new file to store files. Clear snapshot too while we have the Store write lock.
+ return updateStorefiles(storeFiles, snapshot.getId());
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index ac4af015443..3c3b162097f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -36,6 +37,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
@@ -72,6 +74,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
@@ -85,8 +88,6 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Test compaction framework and common functions
@@ -98,8 +99,6 @@ public class TestCompaction {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompaction.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);
-
@Rule
public TestName name = new TestName();
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
@@ -346,32 +345,22 @@ public class TestCompaction {
HStore store = r.getStore(COLUMN_FAMILY);
Collection storeFiles = store.getStorefiles();
- DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
+ DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
tool.compact(request, NoLimitThroughputController.INSTANCE, null);
// Now lets corrupt the compacted file.
FileSystem fs = store.getFileSystem();
// default compaction policy created one and only one new compacted file
- Path dstPath = store.getRegionFileSystem().createTempName();
- FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
- stream.writeChars("CORRUPT FILE!!!!");
- stream.close();
- Path origPath = store.getRegionFileSystem().commitStoreFile(
- Bytes.toString(COLUMN_FAMILY), dstPath);
-
- try {
- ((HStore)store).moveFileIntoPlace(origPath);
- } catch (Exception e) {
- // The complete compaction should fail and the corrupt file should remain
- // in the 'tmp' directory;
- assertTrue(fs.exists(origPath));
- assertFalse(fs.exists(dstPath));
- LOG.info("testCompactionWithCorruptResult Passed");
- return;
+ Path tmpPath = store.getRegionFileSystem().createTempName();
+ try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
+ stream.writeChars("CORRUPT FILE!!!!");
}
- fail("testCompactionWithCorruptResult failed since no exception was" +
- "thrown while completing a corrupt file");
+ // The complete compaction should fail and the corrupt file should remain
+ // in the 'tmp' directory;
+ assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
+ EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
+ assertTrue(fs.exists(tmpPath));
}
/**