diff --git a/CHANGES.txt b/CHANGES.txt index 6e6a35634a3..1d083ec79c7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -435,6 +435,8 @@ Release 0.21.0 - Unreleased (Nicolas Spiegelberg via Stack) HBASE-2781 ZKW.createUnassignedRegion doesn't make sure existing znode is in the right state (Karthik Ranganathan via JD) + HBASE-2727 Splits writing one file only is untenable; need dir of recovered + edits ordered by sequenceid IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a69ece89c56..3bd20f67ce4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,6 +19,27 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Constructor; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -64,28 +85,6 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Constructor; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * HRegion stores data for a certain region of a table. It stores all columns * for each row. A given table consists of one or more HRegions. @@ -126,6 +125,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final Log LOG = LogFactory.getLog(HRegion.class); static final String SPLITDIR = "splits"; static final String MERGEDIR = "merges"; + final AtomicBoolean closed = new AtomicBoolean(false); /* Closing can take some time; use the closing flag if there is stuff we don't * want to do while in closing state; e.g. like offer this region up to the @@ -330,9 +330,8 @@ public class HRegion implements HeapSize { // , Writable{ // Remove temporary data left over from old regions cleanupTmpDir(); - // Load in all the HStores. Get min and max seqids across all families. + // Load in all the HStores. Get maximum seqid. long maxSeqId = -1; - long minSeqId = Integer.MAX_VALUE; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { Store store = instantiateHStore(this.tableDir, c); this.stores.put(c.getName(), store); @@ -340,12 +339,9 @@ public class HRegion implements HeapSize { // , Writable{ if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } - if (minSeqId > storeSeqId) { - minSeqId = storeSeqId; - } } // Recover any edits if available. - long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter); + maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter); // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was @@ -362,7 +358,7 @@ public class HRegion implements HeapSize { // , Writable{ this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). - long nextSeqid = Math.max(seqid, maxSeqId) + 1; + long nextSeqid = maxSeqId + 1; LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid); return nextSeqid; } @@ -902,7 +898,9 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Flushing the cache is a little tricky. We have a lot of updates in the + * Flush the memstore. + * + * Flushing the memstore is a little tricky. We have a lot of updates in the * memstore, all of which have also been written to the log. We need to * write those updates in the memstore out to disk, while being able to * process reads/writes as much as possible during the flush operation. Also, @@ -934,6 +932,19 @@ public class HRegion implements HeapSize { // , Writable{ * because a Snapshot was not properly persisted. */ protected boolean internalFlushcache() throws IOException { + return internalFlushcache(this.log, -1); + } + + /** + * @param wal Null if we're NOT to go via hlog/wal. + * @param myseqid The seqid to use if wal is null writing out + * flush file. + * @return true if the region needs compacting + * @throws IOException + * @see {@link #internalFlushcache()} + */ + protected boolean internalFlushcache(final HLog wal, final long myseqid) + throws IOException { final long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Clear flush flag. // Record latest flush time @@ -945,7 +956,8 @@ public class HRegion implements HeapSize { // , Writable{ if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for region " + this + ". Current region memstore size " + - StringUtils.humanReadableInt(this.memstoreSize.get())); + StringUtils.humanReadableInt(this.memstoreSize.get()) + + ((wal != null)? "": "; wal is null, using passed myseqid=" + myseqid)); } // Stop updates while we snapshot the memstore of all stores. We only have @@ -958,14 +970,14 @@ public class HRegion implements HeapSize { // , Writable{ long sequenceId = -1L; long completeSequenceId = -1L; - // we have to take a write lock during snapshot, or else a write could + // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic // rows then) this.updatesLock.writeLock().lock(); final long currentMemStoreSize = this.memstoreSize.get(); List storeFlushers = new ArrayList(stores.size()); try { - sequenceId = log.startCacheFlush(); + sequenceId = (wal == null)? myseqid: wal.startCacheFlush(); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); for (Store s : stores.values()) { @@ -1009,9 +1021,9 @@ public class HRegion implements HeapSize { // , Writable{ } try { - if (atomicWork != null) { - atomicWork.call(); - } + if (atomicWork != null) { + atomicWork.call(); + } // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). @@ -1038,7 +1050,7 @@ public class HRegion implements HeapSize { // , Writable{ // We used to only catch IOEs but its possible that we'd get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. - this.log.abortCacheFlush(); + if (wal != null) wal.abortCacheFlush(); DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); @@ -1052,9 +1064,11 @@ public class HRegion implements HeapSize { // , Writable{ // This tells future readers that the HStores were emitted correctly, // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. - this.log.completeCacheFlush(getRegionName(), + if (wal != null) { + wal.completeCacheFlush(getRegionName(), regionInfo.getTableDesc().getName(), completeSequenceId, this.getRegionInfo().isMetaRegion()); + } // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -1067,12 +1081,12 @@ public class HRegion implements HeapSize { // , Writable{ LOG.info("Finished memstore flush of ~" + StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId + - ", compaction requested=" + compactionRequested); + ", compaction requested=" + compactionRequested + + ((wal == null)? "; wal=null": "")); } return compactionRequested; } - /** * A hook for sub classed wishing to perform operations prior to the cache * flush commit stage. @@ -1853,46 +1867,75 @@ public class HRegion implements HeapSize { // , Writable{ * Read the edits log put under this region by wal log splitting process. Put * the recovered edits back up into this region. * - * We can ignore any log message that has a sequence ID that's equal to or + *

We can ignore any log message that has a sequence ID that's equal to or * lower than minSeqId. (Because we know such log messages are already * reflected in the HFiles.) + * + *

While this is running we are putting pressure on memory yet we are + * outside of our usual accounting because we are not yet an onlined region + * (this stuff is being run as part of Region initialization). This means + * that if we're up against global memory limits, we'll not be flagged to flush + * because we are not online. We can't be flushed by usual mechanisms anyways; + * we're not yet online so our relative sequenceids are not yet aligned with + * HLog sequenceids -- not till we come up online, post processing of split + * edits. + * + *

But to help relieve memory pressure, at least manage our own heap size + * flushing if are in excess of per-region limits. Flushing, though, we have + * to be careful and avoid using the regionserver/hlog sequenceid. Its running + * on a different line to whats going on in here in this region context so if we + * crashed replaying these edits, but in the midst had a flush that used the + * regionserver log with a sequenceid in excess of whats going on in here + * in this region and with its split editlogs, then we could miss edits the + * next time we go to recover. So, we have to flush inline, using seqids that + * make sense in a this single region context only -- until we online. + * * @param regiondir - * @param minSeqId Minimum sequenceid found in a store file. Edits in log - * must be larger than this to be replayed. + * @param minSeqId Any edit found in split editlogs needs to be in excess of + * this minSeqId to be applied, else its skipped. * @param reporter * @return the sequence id of the last edit added to this region out of the - * recovered edits log, or -1 if no log recovered + * recovered edits log or minSeqId if nothing added from editlogs. * @throws UnsupportedEncodingException * @throws IOException */ protected long replayRecoveredEditsIfAny(final Path regiondir, final long minSeqId, final Progressable reporter) throws UnsupportedEncodingException, IOException { - Path edits = new Path(regiondir, HLog.RECOVERED_EDITS); - if (edits == null || !this.fs.exists(edits)) return -1; - if (isZeroLengthThenDelete(this.fs, edits)) return -1; - long maxSeqIdInLog = -1; - try { - maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter); - LOG.debug("Deleting recovered edits file: " + edits); - if (!this.fs.delete(edits, false)) { - LOG.error("Failed delete of " + edits); + long seqid = minSeqId; + NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); + if (files == null || files.isEmpty()) return seqid; + for (Path edits: files) { + if (edits == null || !this.fs.exists(edits)) { + LOG.warn("Null or non-existent edits file: " + edits); + continue; } - } catch (IOException e) { - boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); - if (skipErrors) { - Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + - System.currentTimeMillis()); - LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits + - " as " + moveAsideName, e); - if (!this.fs.rename(edits, moveAsideName)) { - LOG.error("hbase.skip.errors=true so continuing. Rename failed"); + if (isZeroLengthThenDelete(this.fs, edits)) continue; + try { + seqid = replayRecoveredEdits(edits, seqid, reporter); + } catch (IOException e) { + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + if (skipErrors) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits + + " as " + p, e); + } else { + throw e; } - } else { - throw e; } } - return maxSeqIdInLog; + if (seqid > minSeqId) { + // Then we added some edits to memory. Flush and cleanup split edit files. + internalFlushcache(null, seqid); + for (Path file: files) { + if (!this.fs.delete(file, false)) { + LOG.error("Failed delete of " + file); + } else { + LOG.debug("Deleted recovered.edits file=" + file); + } + } + } + return seqid; } /* @@ -1901,12 +1944,13 @@ public class HRegion implements HeapSize { // , Writable{ * must be larger than this to be replayed. * @param reporter * @return the sequence id of the last edit added to this region out of the - * recovered edits log, or -1 if no log recovered + * recovered edits log or minSeqId if nothing added from editlogs. * @throws IOException */ private long replayRecoveredEdits(final Path edits, final long minSeqId, final Progressable reporter) throws IOException { + LOG.info("Replaying edits from " + edits + "; minSeqId=" + minSeqId); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); try { return replayRecoveredEdits(reader, minSeqId, reporter); @@ -1916,26 +1960,22 @@ public class HRegion implements HeapSize { // , Writable{ } /* @param reader Reader against file of recovered edits. - * @param minSeqId Minimum sequenceid found in a store file. Edits in log - * must be larger than this to be replayed. + * @param minSeqId Any edit found in split editlogs needs to be in excess of + * this minSeqId to be applied, else its skipped. * @param reporter * @return the sequence id of the last edit added to this region out of the - * recovered edits log, or -1 if no log recovered + * recovered edits log or minSeqId if nothing added from editlogs. * @throws IOException */ private long replayRecoveredEdits(final HLog.Reader reader, final long minSeqId, final Progressable reporter) throws IOException { - long currentEditSeqId = -1; + long currentEditSeqId = minSeqId; long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; HLog.Entry entry; Store store = null; - // Get map of family name to maximum sequence id. Do it here up front - // because as we progress, the sequence id can change if we happen to flush - // The flush ups the seqid for the Store. The new seqid can cause us skip edits. - Map familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores); // How many edits to apply before we send a progress report. int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); while ((entry = reader.next()) != null) { @@ -1945,12 +1985,13 @@ public class HRegion implements HeapSize { // , Writable{ firstSeqIdInLog = key.getLogSeqNum(); } // Now, figure if we should skip this edit. - currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum()); - if (key.getLogSeqNum() <= minSeqId) { + if (key.getLogSeqNum() <= currentEditSeqId) { skippedEdits++; continue; } - for (KeyValue kv : val.getKeyValues()) { + currentEditSeqId = key.getLogSeqNum(); + boolean flush = false; + for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries if (kv.matchingFamily(HLog.METAFAMILY) || @@ -1969,16 +2010,13 @@ public class HRegion implements HeapSize { // , Writable{ skippedEdits++; continue; } - // The edits' id has to be in excess of the original max seqid of the - // targeted store. - long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName()); - if (currentEditSeqId < storeMaxSeqId) { - skippedEdits++; - continue; - } - restoreEdit(kv); + // Once we are over the limit, restoreEdit will keep returning true to + // flush -- but don't flush until we've played all the kvs that make up + // the WALEdit. + flush = restoreEdit(store, kv); editsCount++; } + if (flush) internalFlushcache(null, currentEditSeqId); // Every 'interval' edits, tell the reporter we're making progress. // Have seen 60k edits taking 3minutes to complete. @@ -1994,34 +2032,14 @@ public class HRegion implements HeapSize { // , Writable{ return currentEditSeqId; } - /* - * @param stores - * @return Map of family name to maximum sequenceid. + /** + * Used by tests + * @param s Store to add edit too. + * @param kv KeyValue to add. + * @return True if we should flush. */ - private Map familyToMaxSeqId(final Map stores) { - Map map = new TreeMap(Bytes.BYTES_COMPARATOR); - for (Map.Entry e: stores.entrySet()) { - map.put(e.getKey(), e.getValue().getMaxSequenceId()); - } - return map; - } - - /* - * @param kv Apply this value to this region. - * @throws IOException - */ - // This method is protected so can be called from tests. - protected void restoreEdit(final KeyValue kv) throws IOException { - // This is really expensive to do per edit. Loads of object creation. - // TODO: Optimization. Apply edits batched by family. - Map> map = - new TreeMap>(Bytes.BYTES_COMPARATOR); - map.put(kv.getFamily(), Collections.singletonList(kv)); - if (kv.isDelete()) { - delete(map, true); - } else { - put(map, true); - } + protected boolean restoreEdit(final Store s, final KeyValue kv) { + return isFlushSize(this.memstoreSize.addAndGet(s.add(kv))); } /* diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 704489101c2..e1b40779670 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -36,7 +36,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; @@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -61,6 +62,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -119,9 +121,17 @@ import com.google.common.util.concurrent.NamingThreadFactory; */ public class HLog implements Syncable { static final Log LOG = LogFactory.getLog(HLog.class); - private static final String HLOG_DATFILE = "hlog.dat."; public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte [] METAROW = Bytes.toBytes("METAROW"); + + /* + * Name of directory that holds recovered edits written by the wal log + * splitting code, one per region + */ + private static final String RECOVERED_EDITS_DIR = "recovered.edits"; + private static final Pattern EDITFILES_NAME_PATTERN = + Pattern.compile("-?[0-9]+"); + private final FileSystem fs; private final Path dir; private final Configuration conf; @@ -144,11 +154,6 @@ public class HLog implements Syncable { private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; - /** Name of file that holds recovered edits written by the wal log splitting - * code, one per region - */ - public static final String RECOVERED_EDITS = "recovered.edits"; - // used to indirectly tell syncFs to force the sync private boolean forceSync = false; @@ -1459,7 +1464,7 @@ public class HLog implements Syncable { NamingThreadFactory f = new NamingThreadFactory( "SplitWriter-%1$d", Executors.defaultThreadFactory()); ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f); - for (final byte[] region : splitLogsMap.keySet()) { + for (final byte [] region : splitLogsMap.keySet()) { Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf); writeFutureResult.put(region, threadPool.submit(splitter)); } @@ -1579,17 +1584,19 @@ public class HLog implements Syncable { WriterAndPath wap = logWriters.get(region); for (Entry logEntry: entries) { if (wap == null) { - Path logFile = getRegionLogPath(logEntry, rootDir); - if (fs.exists(logFile)) { - LOG.warn("Found existing old hlog file. It could be the result of a previous" + - "failed split attempt. Deleting " + logFile + - ", length=" + fs.getFileStatus(logFile).getLen()); - fs.delete(logFile, false); + Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir); + if (fs.exists(regionedits)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); + } } - Writer w = createWriter(fs, logFile, conf); - wap = new WriterAndPath(logFile, w); + Writer w = createWriter(fs, regionedits, conf); + wap = new WriterAndPath(regionedits, w); logWriters.put(region, wap); - LOG.debug("Creating writer path=" + logFile + + LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); } wap.w.append(logEntry); @@ -1643,14 +1650,101 @@ public class HLog implements Syncable { } } - private static Path getRegionLogPath(Entry logEntry, Path rootDir) { - Path tableDir = - HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); - Path regionDir = - HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName())); - return new Path(regionDir, RECOVERED_EDITS); + /* + * Path to a file under RECOVERED_EDITS_DIR directory of the region found in + * logEntry named for the sequenceid in the passed + * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332. + * This method also ensures existence of RECOVERED_EDITS_DIR under the region + * creating it if necessary. + * @param fs + * @param logEntry + * @param rootDir HBase root dir. + * @return Path to file into which to dump split log edits. + * @throws IOException + */ + private static Path getRegionSplitEditsPath(final FileSystem fs, + final Entry logEntry, final Path rootDir) + throws IOException { + Path tableDir = HTableDescriptor.getTableDir(rootDir, + logEntry.getKey().getTablename()); + Path regiondir = HRegion.getRegionDir(tableDir, + HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName())); + Path dir = getRegionDirRecoveredEditsDir(regiondir); + if (!fs.exists(dir)) { + if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + } + return new Path(dir, + formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum())); } + static String formatRecoveredEditsFileName(final long seqid) { + return String.format("%019d", seqid); + } + + + /** + * Returns sorted set of edit files made by wal-log splitter. + * @param fs + * @param regiondir + * @return Files in passed regiondir as a sorted set. + * @throws IOException + */ + public static NavigableSet getSplitEditFilesSorted(final FileSystem fs, + final Path regiondir) + throws IOException { + Path editsdir = getRegionDirRecoveredEditsDir(regiondir); + FileStatus [] files = fs.listStatus(editsdir, new PathFilter () { + @Override + public boolean accept(Path p) { + boolean result = false; + try { + // Return files and only files that match the editfile names pattern. + // There can be other files in this directory other than edit files. + // In particular, on error, we'll move aside the bad edit file giving + // it a timestamp suffix. See moveAsideBadEditsFile. + Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); + result = fs.isFile(p) && m.matches(); + } catch (IOException e) { + LOG.warn("Failed isFile check on " + p); + } + return result; + } + }); + NavigableSet filesSorted = new TreeSet(); + if (files == null) return filesSorted; + for (FileStatus status: files) { + filesSorted.add(status.getPath()); + } + return filesSorted; + } + + /** + * Move aside a bad edits file. + * @param fs + * @param edits Edits file to move aside. + * @return The name of the moved aside file. + * @throws IOException + */ + public static Path moveAsideBadEditsFile(final FileSystem fs, + final Path edits) + throws IOException { + Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + + System.currentTimeMillis()); + if (!fs.rename(edits, moveAsideName)) { + LOG.warn("Rename failed from " + edits + " to " + moveAsideName); + } + return moveAsideName; + } + + /** + * @param regiondir This regions directory in the filesystem. + * @return The directory that holds recovered edits files for the region + * regiondir + */ + public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { + return new Path(regiondir, RECOVERED_EDITS_DIR); + } + /** * * @param visitor diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 3fff2fad4e8..908633e6eb4 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -476,9 +476,6 @@ public class TestHLogSplit { assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath)); } - - - /** * This thread will keep writing to the file after the split process has started * It simulates a region server that was considered dead but woke up and wrote @@ -610,11 +607,14 @@ public class TestHLogSplit { } } - private Path getLogForRegion(Path rootdir, byte[] table, String region) { - return new Path(HRegion.getRegionDir(HTableDescriptor - .getTableDir(rootdir, table), - HRegionInfo.encodeRegionName(region.getBytes())), - HLog.RECOVERED_EDITS); + private Path getLogForRegion(Path rootdir, byte[] table, String region) + throws IOException { + Path tdir = HTableDescriptor.getTableDir(rootdir, table); + Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, + HRegionInfo.encodeRegionName(region.getBytes()))); + FileStatus [] files = this.fs.listStatus(editsdir); + assertEquals(1, files.length); + return files[0].getPath(); } private void corruptHLog(Path path, Corruptions corruption, boolean close, @@ -722,8 +722,15 @@ public class TestHLogSplit { FileStatus[] f2 = fs.listStatus(p2); for (int i=0; i 1 set of edits in the recovered.edits directory. + // Ensure edits are replayed properly. + final String tableNameStr = "test2727"; + HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr); + Path basedir = new Path(hbaseRootDir, tableNameStr); + deleteDir(basedir); + + final byte [] tableName = Bytes.toBytes(tableNameStr); + final byte [] rowName = tableName; + + HLog wal1 = createWAL(this.conf); + // Add 1k to each family. + final int countPerFamily = 1000; + for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { + addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1); + } + wal1.close(); + runWALSplit(this.conf); + + HLog wal2 = createWAL(this.conf); + // Up the sequenceid so that these edits are after the ones added above. + wal2.setSequenceNumber(wal1.getSequenceNumber()); + // Add 1k to each family. + for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { + addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2); + } + wal2.close(); + runWALSplit(this.conf); + + HLog wal3 = createWAL(this.conf); + wal3.setSequenceNumber(wal2.getSequenceNumber()); + try { + final HRegion region = new HRegion(basedir, wal3, this.fs, this.conf, hri, + null); + long seqid = region.initialize(); + assertTrue(seqid > wal3.getSequenceNumber()); + + // TODO: Scan all. + region.close(); + } finally { + wal3.closeAndDelete(); + } + } + /** * Test case of HRegion that is only made out of bulk loaded files. Assert * that we don't 'crash'. @@ -210,8 +261,8 @@ public class TestWALReplay { HLog wal2 = createWAL(this.conf); HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, null) { @Override - protected void restoreEdit(KeyValue kv) throws IOException { - super.restoreEdit(kv); + protected boolean restoreEdit(Store s, KeyValue kv) { + super.restoreEdit(s, kv); throw new RuntimeException("Called when it should not have been!"); } }; @@ -221,7 +272,7 @@ public class TestWALReplay { assertTrue(seqid + result.size() < seqid2); // Next test. Add more edits, then 'crash' this region by stealing its wal - // out from under it and assert that replay of the log addes the edits back + // out from under it and assert that replay of the log adds the edits back // correctly when region is opened again. for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); @@ -242,9 +293,10 @@ public class TestWALReplay { final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) { @Override - protected void restoreEdit(KeyValue kv) throws IOException { - super.restoreEdit(kv); + protected boolean restoreEdit(Store s, KeyValue kv) { + boolean b = super.restoreEdit(s, kv); countOfRestoredEdits.incrementAndGet(); + return b; } }; long seqid3 = region3.initialize(); @@ -317,14 +369,20 @@ public class TestWALReplay { newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100); // Make a new wal for new region. HLog newWal = createWAL(newConf); + final AtomicInteger flushcount = new AtomicInteger(0); try { - TestFlusher flusher = new TestFlusher(); final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, - flusher); - flusher.r = region; + null) { + protected boolean internalFlushcache(HLog wal, long myseqid) + throws IOException { + boolean b = super.internalFlushcache(wal, myseqid); + flushcount.incrementAndGet(); + return b; + }; + }; long seqid = region.initialize(); - // Assert we flushed. - assertTrue(flusher.count > 0); + // We flushed during init. + assertTrue(flushcount.get() > 0); assertTrue(seqid > wal.getSequenceNumber()); Get get = new Get(rowName); @@ -338,23 +396,6 @@ public class TestWALReplay { } } - // Flusher used in this test. Keep count of how often we are called and - // actually run the flush inside here. - class TestFlusher implements FlushRequester { - private int count = 0; - private HRegion r; - - @Override - public void request(HRegion region) { - count++; - try { - r.flushcache(); - } catch (IOException e) { - throw new RuntimeException("Exception flushing", e); - } - } - } - private void addWALEdits (final byte [] tableName, final HRegionInfo hri, final byte [] rowName, final byte [] family, final int count, EnvironmentEdge ee, final HLog wal)