diff --git a/CHANGES.txt b/CHANGES.txt index 60a22179a69..aa1c986f79a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -415,6 +415,8 @@ Release 0.21.0 - Unreleased HBASE-2767 Fix reflection in tests that was made incompatible by HDFS-1209 HBASE-2617 Load balancer falls into pathological state if one server under average - slop; endless churn + HBASE-2729 Interrupted or failed memstore flushes should not corrupt the + region IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1794df859a5..40205c400a0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -156,14 +156,17 @@ public class HRegion implements HeapSize { // , Writable{ final AtomicLong memstoreSize = new AtomicLong(0); - // This is the table subdirectory. - final Path basedir; + /** + * The directory for the table this region is part of. + * This directory contains the directory for this region. + */ + final Path tableDir; + final HLog log; final FileSystem fs; final Configuration conf; final HRegionInfo regionInfo; final Path regiondir; - private final Path regionCompactionDir; KeyValue.KVComparator comparator; /* @@ -238,14 +241,13 @@ public class HRegion implements HeapSize { // , Writable{ * Should only be used for testing purposes */ public HRegion(){ - this.basedir = null; + this.tableDir = null; this.blockingMemStoreSize = 0L; this.conf = null; this.flushListener = null; this.fs = null; this.memstoreFlushSize = 0L; this.log = null; - this.regionCompactionDir = null; this.regiondir = null; this.regionInfo = null; this.threadWakeFrequency = 0L; @@ -257,7 +259,7 @@ public class HRegion implements HeapSize { // , Writable{ * {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)} method. * * - * @param basedir qualified path of directory where region should be located, + * @param tableDir qualified path of directory where region should be located, * usually the table directory. * @param log The HLog is the outbound log for any updates to the HRegion * (There's a single HLog for all the HRegions on a single HRegionServer.) @@ -277,9 +279,9 @@ public class HRegion implements HeapSize { // , Writable{ * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester) */ - public HRegion(Path basedir, HLog log, FileSystem fs, Configuration conf, + public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, FlushRequester flushListener) { - this.basedir = basedir; + this.tableDir = tableDir; this.comparator = regionInfo.getComparator(); this.log = log; this.fs = fs; @@ -289,13 +291,11 @@ public class HRegion implements HeapSize { // , Writable{ this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); String encodedNameStr = this.regionInfo.getEncodedName(); - this.regiondir = new Path(basedir, encodedNameStr); + this.regiondir = new Path(tableDir, encodedNameStr); if (LOG.isDebugEnabled()) { // Write out region name as string and its encoded name. LOG.debug("Creating region " + this); } - this.regionCompactionDir = - new Path(getCompactionDir(basedir), encodedNameStr); long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize(); if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) { flushSize = conf.getLong("hbase.hregion.memstore.flush.size", @@ -327,11 +327,14 @@ public class HRegion implements HeapSize { // , Writable{ // Write HRI to a file in case we need to recover .META. checkRegioninfoOnFilesystem(); + // Remove temporary data left over from old regions + cleanupTmpDir(); + // Load in all the HStores. Get min and max seqids across all families. long maxSeqId = -1; long minSeqId = Integer.MAX_VALUE; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { - Store store = instantiateHStore(this.basedir, c); + Store store = instantiateHStore(this.tableDir, c); this.stores.put(c.getName(), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { @@ -694,10 +697,10 @@ public class HRegion implements HeapSize { // , Writable{ // Create a region instance and then move the splits into place under // regionA and regionB. HRegion regionA = - HRegion.newHRegion(basedir, log, fs, conf, regionAInfo, null); + HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null); moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir()); HRegion regionB = - HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null); + HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null); moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir()); return new HRegion [] {regionA, regionB}; @@ -729,29 +732,26 @@ public class HRegion implements HeapSize { // , Writable{ // nothing } - /* - * @param dir - * @return compaction directory for the passed in dir - */ - static Path getCompactionDir(final Path dir) { - return new Path(dir, HConstants.HREGION_COMPACTIONDIR_NAME); - } - /* * Do preparation for pending compaction. - * Clean out any vestiges of previous failed compactions. * @throws IOException */ private void doRegionCompactionPrep() throws IOException { - doRegionCompactionCleanup(); } /* - * Removes the compaction directory for this Store. - * @throws IOException + * Removes the temporary directory for this Store. */ - private void doRegionCompactionCleanup() throws IOException { - FSUtils.deleteDirectory(this.fs, this.regionCompactionDir); + private void cleanupTmpDir() throws IOException { + FSUtils.deleteDirectory(this.fs, getTmpDir()); + } + + /** + * Get the temporary diretory for this region. This directory + * will have its contents removed when the region is reopened. + */ + Path getTmpDir() { + return new Path(getRegionDir(), ".tmp"); } void setForceMajorCompaction(final boolean b) { @@ -832,7 +832,6 @@ public class HRegion implements HeapSize { // , Writable{ splitRow = ss.getSplitRow(); } } - doRegionCompactionCleanup(); String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), startTime); LOG.info("compaction completed on region " + this + " in " + timeTaken); @@ -2040,9 +2039,9 @@ public class HRegion implements HeapSize { // , Writable{ return true; } - protected Store instantiateHStore(Path baseDir, HColumnDescriptor c) + protected Store instantiateHStore(Path tableDir, HColumnDescriptor c) throws IOException { - return new Store(baseDir, this, c, this.fs, this.conf); + return new Store(tableDir, this, c, this.fs, this.conf); } /** @@ -2270,8 +2269,8 @@ public class HRegion implements HeapSize { // , Writable{ } /** @return Path of region base directory */ - public Path getBaseDir() { - return this.basedir; + public Path getTableDir() { + return this.tableDir; } /** @@ -2465,7 +2464,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * A utility method to create new instances of HRegion based on the * {@link HConstants#REGION_IMPL} configuration property. - * @param basedir qualified path of directory where region should be located, + * @param tableDir qualified path of directory where region should be located, * usually the table directory. * @param log The HLog is the outbound log for any updates to the HRegion * (There's a single HLog for all the HRegions on a single HRegionServer.) @@ -2483,7 +2482,7 @@ public class HRegion implements HeapSize { // , Writable{ * failed. Can be null. * @return the new instance */ - public static HRegion newHRegion(Path basedir, HLog log, FileSystem fs, Configuration conf, + public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, FlushRequester flushListener) { try { @SuppressWarnings("unchecked") @@ -2494,7 +2493,7 @@ public class HRegion implements HeapSize { // , Writable{ regionClass.getConstructor(Path.class, HLog.class, FileSystem.class, Configuration.class, HRegionInfo.class, FlushRequester.class); - return c.newInstance(basedir, log, fs, conf, regionInfo, flushListener); + return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener); } catch (Throwable e) { // todo: what should I throw here? throw new IllegalStateException("Could not instantiate a region instance.", e); @@ -2796,7 +2795,7 @@ public class HRegion implements HeapSize { // , Writable{ Configuration conf = a.getConf(); HTableDescriptor tabledesc = a.getTableDesc(); HLog log = a.getLog(); - Path basedir = a.getBaseDir(); + Path tableDir = a.getTableDir(); // Presume both are of same region type -- i.e. both user or catalog // table regions. This way can use comparator. final byte[] startKey = @@ -2825,7 +2824,7 @@ public class HRegion implements HeapSize { // , Writable{ HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey); LOG.info("Creating new region " + newRegionInfo.toString()); String encodedName = newRegionInfo.getEncodedName(); - Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName); + Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName); if(fs.exists(newRegionDir)) { throw new IOException("Cannot merge; target file collision at " + newRegionDir); @@ -2844,7 +2843,7 @@ public class HRegion implements HeapSize { // , Writable{ byFamily = filesByFamily(byFamily, b.close()); for (Map.Entry> es : byFamily.entrySet()) { byte [] colFamily = es.getKey(); - makeColumnFamilyDirs(fs, basedir, newRegionInfo, colFamily); + makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily); // Because we compacted the source regions we should have no more than two // HStoreFiles per family and there will be no reference store List srcFiles = es.getValue(); @@ -2860,7 +2859,7 @@ public class HRegion implements HeapSize { // , Writable{ } for (StoreFile hsf: srcFiles) { StoreFile.rename(fs, hsf.getPath(), - StoreFile.getUniqueFile(fs, Store.getStoreHomedir(basedir, + StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir, newRegionInfo.getEncodedName(), colFamily))); } } @@ -2868,7 +2867,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Files for new region"); listPaths(fs, newRegionDir); } - HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null); + HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf, newRegionInfo, null); dstRegion.initialize(); dstRegion.compactStores(); if (LOG.isDebugEnabled()) { @@ -3073,7 +3072,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); + (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 04b7522b5f4..5a2c1536083 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.ImmutableList; @@ -103,7 +102,6 @@ public class Store implements HeapSize { private final CopyOnWriteArraySet changedReaderObservers = new CopyOnWriteArraySet(); - private final Path regionCompactionDir; private final Object compactLock = new Object(); private final int compactionThreshold; private final int blocksize; @@ -153,8 +151,6 @@ public class Store implements HeapSize { this.ttl *= 1000; } this.memstore = new MemStore(this.comparator); - this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir), - info.getEncodedName()); this.storeNameStr = Bytes.toString(this.family.getName()); // By default, we compact if an HStore has more than @@ -207,6 +203,14 @@ public class Store implements HeapSize { return new Path(tabledir, new Path(encodedName, new Path(Bytes.toString(family)))); } + + /** + * Return the directory in which this store stores its + * StoreFiles + */ + public Path getHomedir() { + return homedir; + } /* * Creates a series of StoreFile loaded from the given directory. @@ -322,8 +326,7 @@ public class Store implements HeapSize { if (!srcFs.equals(fs)) { LOG.info("File " + srcPath + " on different filesystem than " + "destination store - moving to this filesystem."); - Path tmpDir = new Path(homedir, "_tmp"); - Path tmpPath = StoreFile.getRandomFilename(fs, tmpDir); + Path tmpPath = getTmpPath(); FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); LOG.info("Copied to temporary path on dst filesystem: " + tmpPath); srcPath = tmpPath; @@ -354,6 +357,16 @@ public class Store implements HeapSize { + " into store " + this + " (new location: " + dstPath + ")"); } + /** + * Get a temporary path in this region. These temporary files + * will get cleaned up when the region is re-opened if they are + * still around. + */ + private Path getTmpPath() throws IOException { + return StoreFile.getRandomFilename( + fs, region.getTmpDir()); + } + /** * Close all the readers * @@ -424,7 +437,7 @@ public class Store implements HeapSize { // if we fail. synchronized (flushLock) { // A. Write the map out to the disk - writer = createWriter(this.homedir, set.size()); + writer = createWriterInTmp(set.size()); int entries = 0; try { for (KeyValue kv: set) { @@ -441,7 +454,13 @@ public class Store implements HeapSize { writer.close(); } } - StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache, + + // Write-out finished successfully, move into the right spot + Path dstPath = StoreFile.getUniqueFile(fs, homedir); + LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath); + fs.rename(writer.getPath(), dstPath); + + StoreFile sf = new StoreFile(this.fs, dstPath, blockcache, this.conf, this.family.getBloomFilterType(), this.inMemory); Reader r = sf.createReader(); this.storeSize += r.length(); @@ -456,13 +475,11 @@ public class Store implements HeapSize { } /* - * @return Writer for this store. - * @param basedir Directory to put writer in. - * @throws IOException + * @return Writer for a new StoreFile in the tmp dir. */ - private StoreFile.Writer createWriter(final Path basedir, int maxKeyCount) + private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return StoreFile.createWriter(this.fs, basedir, this.blocksize, + return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize, this.compression, this.comparator, this.conf, this.family.getBloomFilterType(), maxKeyCount); } @@ -570,12 +587,6 @@ public class Store implements HeapSize { return checkSplit(forceSplit); } - if (!fs.exists(this.regionCompactionDir) && - !fs.mkdirs(this.regionCompactionDir)) { - LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed"); - return checkSplit(forceSplit); - } - // HBASE-745, preparing all store file sizes for incremental compacting // selection. int countOfFiles = filesToCompact.size(); @@ -641,7 +652,7 @@ public class Store implements HeapSize { LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + (references? ", hasReferences=true,": " ") + " into " + - FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId); + region.getTmpDir() + ", seqid=" + maxId); HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); @@ -783,7 +794,7 @@ public class Store implements HeapSize { // output to writer: for (KeyValue kv : kvs) { if (writer == null) { - writer = createWriter(this.regionCompactionDir, maxKeyCount); + writer = createWriterInTmp(maxKeyCount); } writer.append(kv); } @@ -798,7 +809,7 @@ public class Store implements HeapSize { MinorCompactingStoreScanner scanner = null; try { scanner = new MinorCompactingStoreScanner(this, scanners); - writer = createWriter(this.regionCompactionDir, maxKeyCount); + writer = createWriterInTmp(maxKeyCount); while (scanner.next(writer)) { // Nothing to do } @@ -1451,7 +1462,7 @@ public class Store implements HeapSize { } public static final long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + + ClassSize.OBJECT + (14 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index dc38b3bcfaa..d1dbb17cdb2 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -165,7 +165,7 @@ public abstract class HBaseTestCase extends TestCase { protected HRegion openClosedRegion(final HRegion closedRegion) throws IOException { - HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(), + HRegion r = new HRegion(closedRegion.getTableDir(), closedRegion.getLog(), closedRegion.getFilesystem(), closedRegion.getConf(), closedRegion.getRegionInfo(), null); r.initialize(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 9e5ca46b69a..34b80440d52 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -74,9 +74,6 @@ public class TestCompaction extends HBaseTestCase { super.setUp(); HTableDescriptor htd = createTableDescriptor(getName()); this.r = createNewHRegion(htd, null, null); - this.compactionDir = HRegion.getCompactionDir(this.r.getBaseDir()); - this.regionCompactionDir = new Path(this.compactionDir, - this.r.getRegionInfo().getEncodedName()); } @Override @@ -150,10 +147,6 @@ public class TestCompaction extends HBaseTestCase { // assertEquals(cellValues.length, 3); r.flushcache(); r.compactStores(); - // check compaction dir is exists - assertTrue(this.cluster.getFileSystem().exists(this.compactionDir)); - // check Compaction Dir for this Regions is cleaned up - assertTrue(!this.cluster.getFileSystem().exists(this.regionCompactionDir)); // Always 3 versions if that is what max versions is. byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); // Increment the least significant character so we get to next row. diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index a65e9473044..b15ae53af57 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -31,8 +32,15 @@ import java.util.concurrent.ConcurrentSkipListSet; import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -44,11 +52,17 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.security.UnixUserGroupInformation; + +import com.google.common.base.Joiner; /** * Test class fosr the Store */ public class TestStore extends TestCase { + public static final Log LOG = LogFactory.getLog(TestStore.class); + Store store; byte [] table = Bytes.toBytes("table"); byte [] family = Bytes.toBytes("family"); @@ -91,12 +105,16 @@ public class TestStore extends TestCase { } private void init(String methodName) throws IOException { + init(methodName, HBaseConfiguration.create()); + } + + private void init(String methodName, Configuration conf) + throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path logdir = new Path(DIR+methodName+"/logs"); Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); HColumnDescriptor hcd = new HColumnDescriptor(family); - Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); fs.delete(logdir, true); @@ -309,6 +327,93 @@ public class TestStore extends TestCase { } + public void testHandleErrorsInFlush() throws Exception { + LOG.info("Setting up a faulty file system that cannot write"); + + Configuration conf = HBaseConfiguration.create(); + // Set a different UGI so we don't get the same cached LocalFS instance + conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, + "testhandleerrorsinflush,foo"); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, + FileSystem.class); + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + assertEquals(FaultyFileSystem.class, fs.getClass()); + + // Initialize region + init(getName(), conf); + + LOG.info("Adding some data"); + this.store.add(new KeyValue(row, family, qf1, null)); + this.store.add(new KeyValue(row, family, qf2, null)); + this.store.add(new KeyValue(row, family, qf3, null)); + + LOG.info("Before flush, we should have no files"); + FileStatus[] files = fs.listStatus(store.getHomedir()); + Path[] paths = FileUtil.stat2Paths(files); + System.err.println("Got paths: " + Joiner.on(",").join(paths)); + assertEquals(0, paths.length); + + //flush + try { + LOG.info("Flushing"); + flush(1); + fail("Didn't bubble up IOE!"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Fault injected")); + } + + LOG.info("After failed flush, we should still have no files!"); + files = fs.listStatus(store.getHomedir()); + paths = FileUtil.stat2Paths(files); + System.err.println("Got paths: " + Joiner.on(",").join(paths)); + assertEquals(0, paths.length); + } + + + static class FaultyFileSystem extends FilterFileSystem { + List> outStreams = + new ArrayList>(); + private long faultPos = 200; + + public FaultyFileSystem() { + super(new LocalFileSystem()); + System.err.println("Creating faulty!"); + } + + @Override + public FSDataOutputStream create(Path p) throws IOException { + return new FaultyOutputStream(super.create(p), faultPos); + } + + } + + static class FaultyOutputStream extends FSDataOutputStream { + volatile long faultPos = Long.MAX_VALUE; + + public FaultyOutputStream(FSDataOutputStream out, + long faultPos) throws IOException { + super(out, null); + this.faultPos = faultPos; + } + + @Override + public void write(byte[] buf, int offset, int length) throws IOException { + System.err.println("faulty stream write at pos " + getPos()); + injectFault(); + super.write(buf, offset, length); + } + + private void injectFault() throws IOException { + if (getPos() >= faultPos) { + throw new IOException("Fault injected"); + } + } + } + + + private static void flushStore(Store store, long id) throws IOException { StoreFlusher storeFlusher = store.getStoreFlusher(id); storeFlusher.prepare();