diff --git a/CHANGES.txt b/CHANGES.txt index bbe2ae8f81b..e80700b76ca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -355,6 +355,7 @@ Release 0.92.0 - Unreleased file (Mingjie Lai) HBASE-4582 Store.java cleanup (failing TestHeapSize and has warnings) HBASE-4556 Fix all incorrect uses of InternalScanner.next(...) (Lars H) + HBASE-4078 Validate store files after flush/compaction TESTS HBASE-4450 test for number of blocks read: to serve as baseline for expected diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 2605d6b904d..220e7634310 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -520,6 +520,7 @@ public class Store implements HeapSize { // Write-out finished successfully, move into the right spot Path dstPath = StoreFile.getUniqueFile(fs, homedir); + validateStoreFile(writer.getPath()); String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath; LOG.info(msg); status.setStatus("Flushing " + this + ": " + msg); @@ -1091,7 +1092,7 @@ public class Store implements HeapSize { * nothing made it through the compaction. * @throws IOException */ - private StoreFile.Writer compactStore(final Collection filesToCompact, + StoreFile.Writer compactStore(final Collection filesToCompact, final boolean majorCompaction, final long maxId) throws IOException { // calculate maximum key count after compaction (for blooms) @@ -1193,6 +1194,30 @@ public class Store implements HeapSize { return writer; } + /** + * Validates a store file by opening and closing it. In HFileV2 this should + * not be an expensive operation. + * + * @param path the path to the store file + */ + private void validateStoreFile(Path path) + throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(this.fs, path, this.conf, + this.cacheConf, this.family.getBloomFilterType()); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Failed to open store file : " + path + + ", keeping it in tmp location", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(); + } + } + } + /* *

It works by processing a compaction that's been written to disk. * @@ -1212,13 +1237,14 @@ public class Store implements HeapSize { * @return StoreFile created. May be null. * @throws IOException */ - private StoreFile completeCompaction(final Collection compactedFiles, + StoreFile completeCompaction(final Collection compactedFiles, final StoreFile.Writer compactedFile) throws IOException { // 1. Moving the new files into place -- if there is a new file (may not // be if all cells were expired or deleted). StoreFile result = null; if (compactedFile != null) { + validateStoreFile(compactedFile.getPath()); Path p = null; try { p = StoreFile.rename(this.fs, compactedFile.getPath(), diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 27ed6bf2490..d02b52ab026 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -24,14 +24,19 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import static org.junit.Assert.fail; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; @@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -478,4 +484,41 @@ public class TestCompaction extends HBaseTestCase { "bbb").getBytes(), null); loader.flushcache(); } + + public void testCompactionWithCorruptResult() throws Exception { + int nfiles = 10; + for (int i = 0; i < nfiles; i++) { + createStoreFile(r); + } + Store store = r.getStore(COLUMN_FAMILY); + + List storeFiles = store.getStorefiles(); + long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); + + StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId); + + // Now lets corrupt the compacted file. + FileSystem fs = cluster.getFileSystem(); + Path origPath = compactedFile.getPath(); + Path homedir = store.getHomedir(); + Path dstPath = new Path(homedir, origPath.getName()); + FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3, + (long) 1024, + null); + stream.writeChars("CORRUPT FILE!!!!"); + stream.close(); + + try { + store.completeCompaction(storeFiles, compactedFile); + } catch (Exception e) { + // The complete compaction should fail and the corrupt file should remain + // in the 'tmp' directory; + assert (fs.exists(origPath)); + assert (!fs.exists(dstPath)); + System.out.println("testCompactionWithCorruptResult Passed"); + return; + } + fail("testCompactionWithCorruptResult failed since no exception was" + + "thrown while completing a corrupt file"); + } }