HBASE-4078 Validate store files after flush/compaction

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1183071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-10-13 20:25:03 +00:00
parent 9814ffbaf0
commit df9b82c082
3 changed files with 72 additions and 2 deletions

View File

@ -355,6 +355,7 @@ Release 0.92.0 - Unreleased
file (Mingjie Lai)
HBASE-4582 Store.java cleanup (failing TestHeapSize and has warnings)
HBASE-4556 Fix all incorrect uses of InternalScanner.next(...) (Lars H)
HBASE-4078 Validate store files after flush/compaction
TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -520,6 +520,7 @@ public class Store implements HeapSize {
// Write-out finished successfully, move into the right spot
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
validateStoreFile(writer.getPath());
String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
LOG.info(msg);
status.setStatus("Flushing " + this + ": " + msg);
@ -1091,7 +1092,7 @@ public class Store implements HeapSize {
* nothing made it through the compaction.
* @throws IOException
*/
private StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
// calculate maximum key count after compaction (for blooms)
@ -1193,6 +1194,30 @@ public class Store implements HeapSize {
return writer;
}
/**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
* @param path the path to the store file
*/
private void validateStoreFile(Path path)
throws IOException {
StoreFile storeFile = null;
try {
storeFile = new StoreFile(this.fs, path, this.conf,
this.cacheConf, this.family.getBloomFilterType());
storeFile.createReader();
} catch (IOException e) {
LOG.error("Failed to open store file : " + path
+ ", keeping it in tmp location", e);
throw e;
} finally {
if (storeFile != null) {
storeFile.closeReader();
}
}
}
/*
* <p>It works by processing a compaction that's been written to disk.
*
@ -1212,13 +1237,14 @@ public class Store implements HeapSize {
* @return StoreFile created. May be null.
* @throws IOException
*/
private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
final StoreFile.Writer compactedFile)
throws IOException {
// 1. Moving the new files into place -- if there is a new file (may not
// be if all cells were expired or deleted).
StoreFile result = null;
if (compactedFile != null) {
validateStoreFile(compactedFile.getPath());
Path p = null;
try {
p = StoreFile.rename(this.fs, compactedFile.getPath(),

View File

@ -24,14 +24,19 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -478,4 +484,41 @@ public class TestCompaction extends HBaseTestCase {
"bbb").getBytes(), null);
loader.flushcache();
}
public void testCompactionWithCorruptResult() throws Exception {
int nfiles = 10;
for (int i = 0; i < nfiles; i++) {
createStoreFile(r);
}
Store store = r.getStore(COLUMN_FAMILY);
List<StoreFile> storeFiles = store.getStorefiles();
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
// Now lets corrupt the compacted file.
FileSystem fs = cluster.getFileSystem();
Path origPath = compactedFile.getPath();
Path homedir = store.getHomedir();
Path dstPath = new Path(homedir, origPath.getName());
FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3,
(long) 1024,
null);
stream.writeChars("CORRUPT FILE!!!!");
stream.close();
try {
store.completeCompaction(storeFiles, compactedFile);
} catch (Exception e) {
// The complete compaction should fail and the corrupt file should remain
// in the 'tmp' directory;
assert (fs.exists(origPath));
assert (!fs.exists(dstPath));
System.out.println("testCompactionWithCorruptResult Passed");
return;
}
fail("testCompactionWithCorruptResult failed since no exception was" +
"thrown while completing a corrupt file");
}
}