HBASE-2729. Interrupted or failed memstore flushes should not corrupt the region

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@957307 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2010-06-23 19:04:40 +00:00
parent 51de1e3aa9
commit c4654b285a
6 changed files with 184 additions and 74 deletions

View File

@ -415,6 +415,8 @@ Release 0.21.0 - Unreleased
HBASE-2767 Fix reflection in tests that was made incompatible by HDFS-1209
HBASE-2617 Load balancer falls into pathological state if one server under
average - slop; endless churn
HBASE-2729 Interrupted or failed memstore flushes should not corrupt the
region
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -156,14 +156,17 @@ public class HRegion implements HeapSize { // , Writable{
final AtomicLong memstoreSize = new AtomicLong(0);
// This is the table subdirectory.
final Path basedir;
/**
* The directory for the table this region is part of.
* This directory contains the directory for this region.
*/
final Path tableDir;
final HLog log;
final FileSystem fs;
final Configuration conf;
final HRegionInfo regionInfo;
final Path regiondir;
private final Path regionCompactionDir;
KeyValue.KVComparator comparator;
/*
@ -238,14 +241,13 @@ public class HRegion implements HeapSize { // , Writable{
* Should only be used for testing purposes
*/
public HRegion(){
this.basedir = null;
this.tableDir = null;
this.blockingMemStoreSize = 0L;
this.conf = null;
this.flushListener = null;
this.fs = null;
this.memstoreFlushSize = 0L;
this.log = null;
this.regionCompactionDir = null;
this.regiondir = null;
this.regionInfo = null;
this.threadWakeFrequency = 0L;
@ -257,7 +259,7 @@ public class HRegion implements HeapSize { // , Writable{
* {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)} method.
*
*
* @param basedir qualified path of directory where region should be located,
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
@ -277,9 +279,9 @@ public class HRegion implements HeapSize { // , Writable{
* @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)
*/
public HRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener) {
this.basedir = basedir;
this.tableDir = tableDir;
this.comparator = regionInfo.getComparator();
this.log = log;
this.fs = fs;
@ -289,13 +291,11 @@ public class HRegion implements HeapSize { // , Writable{
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
10 * 1000);
String encodedNameStr = this.regionInfo.getEncodedName();
this.regiondir = new Path(basedir, encodedNameStr);
this.regiondir = new Path(tableDir, encodedNameStr);
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("Creating region " + this);
}
this.regionCompactionDir =
new Path(getCompactionDir(basedir), encodedNameStr);
long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize();
if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
@ -327,11 +327,14 @@ public class HRegion implements HeapSize { // , Writable{
// Write HRI to a file in case we need to recover .META.
checkRegioninfoOnFilesystem();
// Remove temporary data left over from old regions
cleanupTmpDir();
// Load in all the HStores. Get min and max seqids across all families.
long maxSeqId = -1;
long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
Store store = instantiateHStore(this.basedir, c);
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId > maxSeqId) {
@ -694,10 +697,10 @@ public class HRegion implements HeapSize { // , Writable{
// Create a region instance and then move the splits into place under
// regionA and regionB.
HRegion regionA =
HRegion.newHRegion(basedir, log, fs, conf, regionAInfo, null);
HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null);
moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
HRegion regionB =
HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null);
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
return new HRegion [] {regionA, regionB};
@ -729,29 +732,26 @@ public class HRegion implements HeapSize { // , Writable{
// nothing
}
/*
* @param dir
* @return compaction directory for the passed in <code>dir</code>
*/
static Path getCompactionDir(final Path dir) {
return new Path(dir, HConstants.HREGION_COMPACTIONDIR_NAME);
}
/*
* Do preparation for pending compaction.
* Clean out any vestiges of previous failed compactions.
* @throws IOException
*/
private void doRegionCompactionPrep() throws IOException {
doRegionCompactionCleanup();
}
/*
* Removes the compaction directory for this Store.
* @throws IOException
* Removes the temporary directory for this Store.
*/
private void doRegionCompactionCleanup() throws IOException {
FSUtils.deleteDirectory(this.fs, this.regionCompactionDir);
private void cleanupTmpDir() throws IOException {
FSUtils.deleteDirectory(this.fs, getTmpDir());
}
/**
* Get the temporary diretory for this region. This directory
* will have its contents removed when the region is reopened.
*/
Path getTmpDir() {
return new Path(getRegionDir(), ".tmp");
}
void setForceMajorCompaction(final boolean b) {
@ -832,7 +832,6 @@ public class HRegion implements HeapSize { // , Writable{
splitRow = ss.getSplitRow();
}
}
doRegionCompactionCleanup();
String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
startTime);
LOG.info("compaction completed on region " + this + " in " + timeTaken);
@ -2040,9 +2039,9 @@ public class HRegion implements HeapSize { // , Writable{
return true;
}
protected Store instantiateHStore(Path baseDir, HColumnDescriptor c)
protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
throws IOException {
return new Store(baseDir, this, c, this.fs, this.conf);
return new Store(tableDir, this, c, this.fs, this.conf);
}
/**
@ -2270,8 +2269,8 @@ public class HRegion implements HeapSize { // , Writable{
}
/** @return Path of region base directory */
public Path getBaseDir() {
return this.basedir;
public Path getTableDir() {
return this.tableDir;
}
/**
@ -2465,7 +2464,7 @@ public class HRegion implements HeapSize { // , Writable{
/**
* A utility method to create new instances of HRegion based on the
* {@link HConstants#REGION_IMPL} configuration property.
* @param basedir qualified path of directory where region should be located,
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
@ -2483,7 +2482,7 @@ public class HRegion implements HeapSize { // , Writable{
* failed. Can be null.
* @return the new instance
*/
public static HRegion newHRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener) {
try {
@SuppressWarnings("unchecked")
@ -2494,7 +2493,7 @@ public class HRegion implements HeapSize { // , Writable{
regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
Configuration.class, HRegionInfo.class, FlushRequester.class);
return c.newInstance(basedir, log, fs, conf, regionInfo, flushListener);
return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener);
} catch (Throwable e) {
// todo: what should I throw here?
throw new IllegalStateException("Could not instantiate a region instance.", e);
@ -2796,7 +2795,7 @@ public class HRegion implements HeapSize { // , Writable{
Configuration conf = a.getConf();
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path basedir = a.getBaseDir();
Path tableDir = a.getTableDir();
// Presume both are of same region type -- i.e. both user or catalog
// table regions. This way can use comparator.
final byte[] startKey =
@ -2825,7 +2824,7 @@ public class HRegion implements HeapSize { // , Writable{
HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
LOG.info("Creating new region " + newRegionInfo.toString());
String encodedName = newRegionInfo.getEncodedName();
Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName);
Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " +
newRegionDir);
@ -2844,7 +2843,7 @@ public class HRegion implements HeapSize { // , Writable{
byFamily = filesByFamily(byFamily, b.close());
for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
byte [] colFamily = es.getKey();
makeColumnFamilyDirs(fs, basedir, newRegionInfo, colFamily);
makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
// Because we compacted the source regions we should have no more than two
// HStoreFiles per family and there will be no reference store
List<StoreFile> srcFiles = es.getValue();
@ -2860,7 +2859,7 @@ public class HRegion implements HeapSize { // , Writable{
}
for (StoreFile hsf: srcFiles) {
StoreFile.rename(fs, hsf.getPath(),
StoreFile.getUniqueFile(fs, Store.getStoreHomedir(basedir,
StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
newRegionInfo.getEncodedName(), colFamily)));
}
}
@ -2868,7 +2867,7 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null);
HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf, newRegionInfo, null);
dstRegion.initialize();
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
@ -3073,7 +3072,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
(21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
(20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.ImmutableList;
@ -103,7 +102,6 @@ public class Store implements HeapSize {
private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
new CopyOnWriteArraySet<ChangedReadersObserver>();
private final Path regionCompactionDir;
private final Object compactLock = new Object();
private final int compactionThreshold;
private final int blocksize;
@ -153,8 +151,6 @@ public class Store implements HeapSize {
this.ttl *= 1000;
}
this.memstore = new MemStore(this.comparator);
this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir),
info.getEncodedName());
this.storeNameStr = Bytes.toString(this.family.getName());
// By default, we compact if an HStore has more than
@ -207,6 +203,14 @@ public class Store implements HeapSize {
return new Path(tabledir, new Path(encodedName,
new Path(Bytes.toString(family))));
}
/**
* Return the directory in which this store stores its
* StoreFiles
*/
public Path getHomedir() {
return homedir;
}
/*
* Creates a series of StoreFile loaded from the given directory.
@ -322,8 +326,7 @@ public class Store implements HeapSize {
if (!srcFs.equals(fs)) {
LOG.info("File " + srcPath + " on different filesystem than " +
"destination store - moving to this filesystem.");
Path tmpDir = new Path(homedir, "_tmp");
Path tmpPath = StoreFile.getRandomFilename(fs, tmpDir);
Path tmpPath = getTmpPath();
FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
srcPath = tmpPath;
@ -354,6 +357,16 @@ public class Store implements HeapSize {
+ " into store " + this + " (new location: " + dstPath + ")");
}
/**
* Get a temporary path in this region. These temporary files
* will get cleaned up when the region is re-opened if they are
* still around.
*/
private Path getTmpPath() throws IOException {
return StoreFile.getRandomFilename(
fs, region.getTmpDir());
}
/**
* Close all the readers
*
@ -424,7 +437,7 @@ public class Store implements HeapSize {
// if we fail.
synchronized (flushLock) {
// A. Write the map out to the disk
writer = createWriter(this.homedir, set.size());
writer = createWriterInTmp(set.size());
int entries = 0;
try {
for (KeyValue kv: set) {
@ -441,7 +454,13 @@ public class Store implements HeapSize {
writer.close();
}
}
StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache,
// Write-out finished successfully, move into the right spot
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
fs.rename(writer.getPath(), dstPath);
StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
this.conf, this.family.getBloomFilterType(), this.inMemory);
Reader r = sf.createReader();
this.storeSize += r.length();
@ -456,13 +475,11 @@ public class Store implements HeapSize {
}
/*
* @return Writer for this store.
* @param basedir Directory to put writer in.
* @throws IOException
* @return Writer for a new StoreFile in the tmp dir.
*/
private StoreFile.Writer createWriter(final Path basedir, int maxKeyCount)
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException {
return StoreFile.createWriter(this.fs, basedir, this.blocksize,
return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
this.compression, this.comparator, this.conf,
this.family.getBloomFilterType(), maxKeyCount);
}
@ -570,12 +587,6 @@ public class Store implements HeapSize {
return checkSplit(forceSplit);
}
if (!fs.exists(this.regionCompactionDir) &&
!fs.mkdirs(this.regionCompactionDir)) {
LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed");
return checkSplit(forceSplit);
}
// HBASE-745, preparing all store file sizes for incremental compacting
// selection.
int countOfFiles = filesToCompact.size();
@ -641,7 +652,7 @@ public class Store implements HeapSize {
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
(references? ", hasReferences=true,": " ") + " into " +
FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId);
region.getTmpDir() + ", seqid=" + maxId);
HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
@ -783,7 +794,7 @@ public class Store implements HeapSize {
// output to writer:
for (KeyValue kv : kvs) {
if (writer == null) {
writer = createWriter(this.regionCompactionDir, maxKeyCount);
writer = createWriterInTmp(maxKeyCount);
}
writer.append(kv);
}
@ -798,7 +809,7 @@ public class Store implements HeapSize {
MinorCompactingStoreScanner scanner = null;
try {
scanner = new MinorCompactingStoreScanner(this, scanners);
writer = createWriter(this.regionCompactionDir, maxKeyCount);
writer = createWriterInTmp(maxKeyCount);
while (scanner.next(writer)) {
// Nothing to do
}
@ -1451,7 +1462,7 @@ public class Store implements HeapSize {
}
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
ClassSize.OBJECT + (14 * ClassSize.REFERENCE) +
(4 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +

View File

@ -165,7 +165,7 @@ public abstract class HBaseTestCase extends TestCase {
protected HRegion openClosedRegion(final HRegion closedRegion)
throws IOException {
HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
HRegion r = new HRegion(closedRegion.getTableDir(), closedRegion.getLog(),
closedRegion.getFilesystem(), closedRegion.getConf(),
closedRegion.getRegionInfo(), null);
r.initialize();

View File

@ -74,9 +74,6 @@ public class TestCompaction extends HBaseTestCase {
super.setUp();
HTableDescriptor htd = createTableDescriptor(getName());
this.r = createNewHRegion(htd, null, null);
this.compactionDir = HRegion.getCompactionDir(this.r.getBaseDir());
this.regionCompactionDir = new Path(this.compactionDir,
this.r.getRegionInfo().getEncodedName());
}
@Override
@ -150,10 +147,6 @@ public class TestCompaction extends HBaseTestCase {
// assertEquals(cellValues.length, 3);
r.flushcache();
r.compactStores();
// check compaction dir is exists
assertTrue(this.cluster.getFileSystem().exists(this.compactionDir));
// check Compaction Dir for this Regions is cleaned up
assertTrue(!this.cluster.getFileSystem().exists(this.regionCompactionDir));
// Always 3 versions if that is what max versions is.
byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
// Increment the least significant character so we get to next row.

View File

@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -31,8 +32,15 @@ import java.util.concurrent.ConcurrentSkipListSet;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -44,11 +52,17 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import com.google.common.base.Joiner;
/**
* Test class fosr the Store
*/
public class TestStore extends TestCase {
public static final Log LOG = LogFactory.getLog(TestStore.class);
Store store;
byte [] table = Bytes.toBytes("table");
byte [] family = Bytes.toBytes("family");
@ -91,12 +105,16 @@ public class TestStore extends TestCase {
}
private void init(String methodName) throws IOException {
init(methodName, HBaseConfiguration.create());
}
private void init(String methodName, Configuration conf)
throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path logdir = new Path(DIR+methodName+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(family);
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
@ -309,6 +327,93 @@ public class TestStore extends TestCase {
}
public void testHandleErrorsInFlush() throws Exception {
LOG.info("Setting up a faulty file system that cannot write");
Configuration conf = HBaseConfiguration.create();
// Set a different UGI so we don't get the same cached LocalFS instance
conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME,
"testhandleerrorsinflush,foo");
// Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class,
FileSystem.class);
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
assertEquals(FaultyFileSystem.class, fs.getClass());
// Initialize region
init(getName(), conf);
LOG.info("Adding some data");
this.store.add(new KeyValue(row, family, qf1, null));
this.store.add(new KeyValue(row, family, qf2, null));
this.store.add(new KeyValue(row, family, qf3, null));
LOG.info("Before flush, we should have no files");
FileStatus[] files = fs.listStatus(store.getHomedir());
Path[] paths = FileUtil.stat2Paths(files);
System.err.println("Got paths: " + Joiner.on(",").join(paths));
assertEquals(0, paths.length);
//flush
try {
LOG.info("Flushing");
flush(1);
fail("Didn't bubble up IOE!");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Fault injected"));
}
LOG.info("After failed flush, we should still have no files!");
files = fs.listStatus(store.getHomedir());
paths = FileUtil.stat2Paths(files);
System.err.println("Got paths: " + Joiner.on(",").join(paths));
assertEquals(0, paths.length);
}
static class FaultyFileSystem extends FilterFileSystem {
List<SoftReference<FaultyOutputStream>> outStreams =
new ArrayList<SoftReference<FaultyOutputStream>>();
private long faultPos = 200;
public FaultyFileSystem() {
super(new LocalFileSystem());
System.err.println("Creating faulty!");
}
@Override
public FSDataOutputStream create(Path p) throws IOException {
return new FaultyOutputStream(super.create(p), faultPos);
}
}
static class FaultyOutputStream extends FSDataOutputStream {
volatile long faultPos = Long.MAX_VALUE;
public FaultyOutputStream(FSDataOutputStream out,
long faultPos) throws IOException {
super(out, null);
this.faultPos = faultPos;
}
@Override
public void write(byte[] buf, int offset, int length) throws IOException {
System.err.println("faulty stream write at pos " + getPos());
injectFault();
super.write(buf, offset, length);
}
private void injectFault() throws IOException {
if (getPos() >= faultPos) {
throw new IOException("Fault injected");
}
}
}
private static void flushStore(Store store, long id) throws IOException {
StoreFlusher storeFlusher = store.getStoreFlusher(id);
storeFlusher.prepare();