HBASE-2727 Splits writing one file only is untenable; need dir of recovered edits ordered by sequenceid

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@964965 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-07-16 22:28:21 +00:00
parent e4ee10d24f
commit 4158a4e4a3
5 changed files with 335 additions and 175 deletions

View File

@ -435,6 +435,8 @@ Release 0.21.0 - Unreleased
(Nicolas Spiegelberg via Stack)
HBASE-2781 ZKW.createUnassignedRegion doesn't make sure existing znode is
in the right state (Karthik Ranganathan via JD)
HBASE-2727 Splits writing one file only is untenable; need dir of recovered
edits ordered by sequenceid
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -19,6 +19,27 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -64,28 +85,6 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions.
@ -126,6 +125,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges";
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
@ -330,9 +330,8 @@ public class HRegion implements HeapSize { // , Writable{
// Remove temporary data left over from old regions
cleanupTmpDir();
// Load in all the HStores. Get min and max seqids across all families.
// Load in all the HStores. Get maximum seqid.
long maxSeqId = -1;
long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
@ -340,12 +339,9 @@ public class HRegion implements HeapSize { // , Writable{
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
if (minSeqId > storeSeqId) {
minSeqId = storeSeqId;
}
}
// Recover any edits if available.
long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
@ -362,7 +358,7 @@ public class HRegion implements HeapSize { // , Writable{
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = Math.max(seqid, maxSeqId) + 1;
long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
return nextSeqid;
}
@ -902,7 +898,9 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
* Flushing the cache is a little tricky. We have a lot of updates in the
* Flush the memstore.
*
* Flushing the memstore is a little tricky. We have a lot of updates in the
* memstore, all of which have also been written to the log. We need to
* write those updates in the memstore out to disk, while being able to
* process reads/writes as much as possible during the flush operation. Also,
@ -934,6 +932,19 @@ public class HRegion implements HeapSize { // , Writable{
* because a Snapshot was not properly persisted.
*/
protected boolean internalFlushcache() throws IOException {
return internalFlushcache(this.log, -1);
}
/**
* @param wal Null if we're NOT to go via hlog/wal.
* @param myseqid The seqid to use if <code>wal</code> is null writing out
* flush file.
* @return true if the region needs compacting
* @throws IOException
* @see {@link #internalFlushcache()}
*/
protected boolean internalFlushcache(final HLog wal, final long myseqid)
throws IOException {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Clear flush flag.
// Record latest flush time
@ -945,7 +956,8 @@ public class HRegion implements HeapSize { // , Writable{
if (LOG.isDebugEnabled()) {
LOG.debug("Started memstore flush for region " + this +
". Current region memstore size " +
StringUtils.humanReadableInt(this.memstoreSize.get()));
StringUtils.humanReadableInt(this.memstoreSize.get()) +
((wal != null)? "": "; wal is null, using passed myseqid=" + myseqid));
}
// Stop updates while we snapshot the memstore of all stores. We only have
@ -958,14 +970,14 @@ public class HRegion implements HeapSize { // , Writable{
long sequenceId = -1L;
long completeSequenceId = -1L;
// we have to take a write lock during snapshot, or else a write could
// We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
this.updatesLock.writeLock().lock();
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
sequenceId = log.startCacheFlush();
sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
for (Store s : stores.values()) {
@ -1009,9 +1021,9 @@ public class HRegion implements HeapSize { // , Writable{
}
try {
if (atomicWork != null) {
atomicWork.call();
}
if (atomicWork != null) {
atomicWork.call();
}
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
@ -1038,7 +1050,7 @@ public class HRegion implements HeapSize { // , Writable{
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
// all and sundry.
this.log.abortCacheFlush();
if (wal != null) wal.abortCacheFlush();
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionName()));
dse.initCause(t);
@ -1052,9 +1064,11 @@ public class HRegion implements HeapSize { // , Writable{
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(getRegionName(),
if (wal != null) {
wal.completeCacheFlush(getRegionName(),
regionInfo.getTableDesc().getName(), completeSequenceId,
this.getRegionInfo().isMetaRegion());
}
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@ -1067,12 +1081,12 @@ public class HRegion implements HeapSize { // , Writable{
LOG.info("Finished memstore flush of ~" +
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
", compaction requested=" + compactionRequested);
", compaction requested=" + compactionRequested +
((wal == null)? "; wal=null": ""));
}
return compactionRequested;
}
/**
* A hook for sub classed wishing to perform operations prior to the cache
* flush commit stage.
@ -1853,46 +1867,75 @@ public class HRegion implements HeapSize { // , Writable{
* Read the edits log put under this region by wal log splitting process. Put
* the recovered edits back up into this region.
*
* We can ignore any log message that has a sequence ID that's equal to or
* <p>We can ignore any log message that has a sequence ID that's equal to or
* lower than minSeqId. (Because we know such log messages are already
* reflected in the HFiles.)
*
* <p>While this is running we are putting pressure on memory yet we are
* outside of our usual accounting because we are not yet an onlined region
* (this stuff is being run as part of Region initialization). This means
* that if we're up against global memory limits, we'll not be flagged to flush
* because we are not online. We can't be flushed by usual mechanisms anyways;
* we're not yet online so our relative sequenceids are not yet aligned with
* HLog sequenceids -- not till we come up online, post processing of split
* edits.
*
* <p>But to help relieve memory pressure, at least manage our own heap size
* flushing if are in excess of per-region limits. Flushing, though, we have
* to be careful and avoid using the regionserver/hlog sequenceid. Its running
* on a different line to whats going on in here in this region context so if we
* crashed replaying these edits, but in the midst had a flush that used the
* regionserver log with a sequenceid in excess of whats going on in here
* in this region and with its split editlogs, then we could miss edits the
* next time we go to recover. So, we have to flush inline, using seqids that
* make sense in a this single region context only -- until we online.
*
* @param regiondir
* @param minSeqId Minimum sequenceid found in a store file. Edits in log
* must be larger than this to be replayed.
* @param minSeqId Any edit found in split editlogs needs to be in excess of
* this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log, or -1 if no log recovered
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws UnsupportedEncodingException
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
if (edits == null || !this.fs.exists(edits)) return -1;
if (isZeroLengthThenDelete(this.fs, edits)) return -1;
long maxSeqIdInLog = -1;
try {
maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
LOG.debug("Deleting recovered edits file: " + edits);
if (!this.fs.delete(edits, false)) {
LOG.error("Failed delete of " + edits);
long seqid = minSeqId;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
if (files == null || files.isEmpty()) return seqid;
for (Path edits: files) {
if (edits == null || !this.fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
continue;
}
} catch (IOException e) {
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
if (skipErrors) {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
System.currentTimeMillis());
LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
" as " + moveAsideName, e);
if (!this.fs.rename(edits, moveAsideName)) {
LOG.error("hbase.skip.errors=true so continuing. Rename failed");
if (isZeroLengthThenDelete(this.fs, edits)) continue;
try {
seqid = replayRecoveredEdits(edits, seqid, reporter);
} catch (IOException e) {
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
if (skipErrors) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
" as " + p, e);
} else {
throw e;
}
} else {
throw e;
}
}
return maxSeqIdInLog;
if (seqid > minSeqId) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid);
for (Path file: files) {
if (!this.fs.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
}
return seqid;
}
/*
@ -1901,12 +1944,13 @@ public class HRegion implements HeapSize { // , Writable{
* must be larger than this to be replayed.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log, or -1 if no log recovered
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final Progressable reporter)
throws IOException {
LOG.info("Replaying edits from " + edits + "; minSeqId=" + minSeqId);
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
return replayRecoveredEdits(reader, minSeqId, reporter);
@ -1916,26 +1960,22 @@ public class HRegion implements HeapSize { // , Writable{
}
/* @param reader Reader against file of recovered edits.
* @param minSeqId Minimum sequenceid found in a store file. Edits in log
* must be larger than this to be replayed.
* @param minSeqId Any edit found in split editlogs needs to be in excess of
* this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log, or -1 if no log recovered
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final HLog.Reader reader,
final long minSeqId, final Progressable reporter)
throws IOException {
long currentEditSeqId = -1;
long currentEditSeqId = minSeqId;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
HLog.Entry entry;
Store store = null;
// Get map of family name to maximum sequence id. Do it here up front
// because as we progress, the sequence id can change if we happen to flush
// The flush ups the seqid for the Store. The new seqid can cause us skip edits.
Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
// How many edits to apply before we send a progress report.
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while ((entry = reader.next()) != null) {
@ -1945,12 +1985,13 @@ public class HRegion implements HeapSize { // , Writable{
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
if (key.getLogSeqNum() <= minSeqId) {
if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
for (KeyValue kv : val.getKeyValues()) {
currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
@ -1969,16 +2010,13 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
}
// The edits' id has to be in excess of the original max seqid of the
// targeted store.
long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
if (currentEditSeqId < storeMaxSeqId) {
skippedEdits++;
continue;
}
restoreEdit(kv);
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
flush = restoreEdit(store, kv);
editsCount++;
}
if (flush) internalFlushcache(null, currentEditSeqId);
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
@ -1994,34 +2032,14 @@ public class HRegion implements HeapSize { // , Writable{
return currentEditSeqId;
}
/*
* @param stores
* @return Map of family name to maximum sequenceid.
/**
* Used by tests
* @param s Store to add edit too.
* @param kv KeyValue to add.
* @return True if we should flush.
*/
private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte [], Store> e: stores.entrySet()) {
map.put(e.getKey(), e.getValue().getMaxSequenceId());
}
return map;
}
/*
* @param kv Apply this value to this region.
* @throws IOException
*/
// This method is protected so can be called from tests.
protected void restoreEdit(final KeyValue kv) throws IOException {
// This is really expensive to do per edit. Loads of object creation.
// TODO: Optimization. Apply edits batched by family.
Map<byte [], List<KeyValue>> map =
new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
map.put(kv.getFamily(), Collections.singletonList(kv));
if (kv.isDelete()) {
delete(map, true);
} else {
put(map, true);
}
protected boolean restoreEdit(final Store s, final KeyValue kv) {
return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
}
/*

View File

@ -36,7 +36,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -61,6 +62,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -119,9 +121,17 @@ import com.google.common.util.concurrent.NamingThreadFactory;
*/
public class HLog implements Syncable {
static final Log LOG = LogFactory.getLog(HLog.class);
private static final String HLOG_DATFILE = "hlog.dat.";
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
/*
* Name of directory that holds recovered edits written by the wal log
* splitting code, one per region
*/
private static final String RECOVERED_EDITS_DIR = "recovered.edits";
private static final Pattern EDITFILES_NAME_PATTERN =
Pattern.compile("-?[0-9]+");
private final FileSystem fs;
private final Path dir;
private final Configuration conf;
@ -144,11 +154,6 @@ public class HLog implements Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
/** Name of file that holds recovered edits written by the wal log splitting
* code, one per region
*/
public static final String RECOVERED_EDITS = "recovered.edits";
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
@ -1459,7 +1464,7 @@ public class HLog implements Syncable {
NamingThreadFactory f = new NamingThreadFactory(
"SplitWriter-%1$d", Executors.defaultThreadFactory());
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
for (final byte[] region : splitLogsMap.keySet()) {
for (final byte [] region : splitLogsMap.keySet()) {
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
writeFutureResult.put(region, threadPool.submit(splitter));
}
@ -1579,17 +1584,19 @@ public class HLog implements Syncable {
WriterAndPath wap = logWriters.get(region);
for (Entry logEntry: entries) {
if (wap == null) {
Path logFile = getRegionLogPath(logEntry, rootDir);
if (fs.exists(logFile)) {
LOG.warn("Found existing old hlog file. It could be the result of a previous" +
"failed split attempt. Deleting " + logFile +
", length=" + fs.getFileStatus(logFile).getLen());
fs.delete(logFile, false);
Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the " +
"result of a previous failed split attempt. Deleting " +
regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
if (!fs.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits);
}
}
Writer w = createWriter(fs, logFile, conf);
wap = new WriterAndPath(logFile, w);
Writer w = createWriter(fs, regionedits, conf);
wap = new WriterAndPath(regionedits, w);
logWriters.put(region, wap);
LOG.debug("Creating writer path=" + logFile +
LOG.debug("Creating writer path=" + regionedits +
" region=" + Bytes.toStringBinary(region));
}
wap.w.append(logEntry);
@ -1643,14 +1650,101 @@ public class HLog implements Syncable {
}
}
private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
Path tableDir =
HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
Path regionDir =
HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
return new Path(regionDir, RECOVERED_EDITS);
/*
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in
* <code>logEntry</code> named for the sequenceid in the passed
* <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
* This method also ensures existence of RECOVERED_EDITS_DIR under the region
* creating it if necessary.
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
private static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir)
throws IOException {
Path tableDir = HTableDescriptor.getTableDir(rootDir,
logEntry.getKey().getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
Path dir = getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
return new Path(dir,
formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
}
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
/**
* Returns sorted set of edit files made by wal-log splitter.
* @param fs
* @param regiondir
* @return Files in passed <code>regiondir</code> as a sorted set.
* @throws IOException
*/
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
final Path regiondir)
throws IOException {
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
@Override
public boolean accept(Path p) {
boolean result = false;
try {
// Return files and only files that match the editfile names pattern.
// There can be other files in this directory other than edit files.
// In particular, on error, we'll move aside the bad edit file giving
// it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
result = fs.isFile(p) && m.matches();
} catch (IOException e) {
LOG.warn("Failed isFile check on " + p);
}
return result;
}
});
NavigableSet<Path> filesSorted = new TreeSet<Path>();
if (files == null) return filesSorted;
for (FileStatus status: files) {
filesSorted.add(status.getPath());
}
return filesSorted;
}
/**
* Move aside a bad edits file.
* @param fs
* @param edits Edits file to move aside.
* @return The name of the moved aside file.
* @throws IOException
*/
public static Path moveAsideBadEditsFile(final FileSystem fs,
final Path edits)
throws IOException {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
System.currentTimeMillis());
if (!fs.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
}
return moveAsideName;
}
/**
* @param regiondir This regions directory in the filesystem.
* @return The directory that holds recovered edits files for the region
* <code>regiondir</code>
*/
public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
return new Path(regiondir, RECOVERED_EDITS_DIR);
}
/**
*
* @param visitor

View File

@ -476,9 +476,6 @@ public class TestHLogSplit {
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
}
/**
* This thread will keep writing to the file after the split process has started
* It simulates a region server that was considered dead but woke up and wrote
@ -610,11 +607,14 @@ public class TestHLogSplit {
}
}
private Path getLogForRegion(Path rootdir, byte[] table, String region) {
return new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootdir, table),
HRegionInfo.encodeRegionName(region.getBytes())),
HLog.RECOVERED_EDITS);
private Path getLogForRegion(Path rootdir, byte[] table, String region)
throws IOException {
Path tdir = HTableDescriptor.getTableDir(rootdir, table);
Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
HRegionInfo.encodeRegionName(region.getBytes())));
FileStatus [] files = this.fs.listStatus(editsdir);
assertEquals(1, files.length);
return files[0].getPath();
}
private void corruptHLog(Path path, Corruptions corruption, boolean close,
@ -722,8 +722,15 @@ public class TestHLogSplit {
FileStatus[] f2 = fs.listStatus(p2);
for (int i=0; i<f1.length; i++) {
if (!logsAreEqual(new Path(f1[i].getPath(), HLog.RECOVERED_EDITS),
new Path(f2[i].getPath(), HLog.RECOVERED_EDITS))) {
// Regions now have a directory named RECOVERED_EDITS_DIR and in here
// are split edit files. In below presume only 1.
Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
FileStatus [] rd1fs = fs.listStatus(rd1);
assertEquals(1, rd1fs.length);
Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
FileStatus [] rd2fs = fs.listStatus(rd2);
assertEquals(1, rd2fs.length);
if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
return -1;
}
}
@ -745,6 +752,4 @@ public class TestHLogSplit {
}
return true;
}
}

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -114,6 +114,57 @@ public class TestWALReplay {
}
}
/**
* Tests for hbase-2727.
* @throws Exception
* @see https://issues.apache.org/jira/browse/HBASE-2727
*/
@Test
public void test2727() throws Exception {
// Test being able to have > 1 set of edits in the recovered.edits directory.
// Ensure edits are replayed properly.
final String tableNameStr = "test2727";
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
Path basedir = new Path(hbaseRootDir, tableNameStr);
deleteDir(basedir);
final byte [] tableName = Bytes.toBytes(tableNameStr);
final byte [] rowName = tableName;
HLog wal1 = createWAL(this.conf);
// Add 1k to each family.
final int countPerFamily = 1000;
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1);
}
wal1.close();
runWALSplit(this.conf);
HLog wal2 = createWAL(this.conf);
// Up the sequenceid so that these edits are after the ones added above.
wal2.setSequenceNumber(wal1.getSequenceNumber());
// Add 1k to each family.
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2);
}
wal2.close();
runWALSplit(this.conf);
HLog wal3 = createWAL(this.conf);
wal3.setSequenceNumber(wal2.getSequenceNumber());
try {
final HRegion region = new HRegion(basedir, wal3, this.fs, this.conf, hri,
null);
long seqid = region.initialize();
assertTrue(seqid > wal3.getSequenceNumber());
// TODO: Scan all.
region.close();
} finally {
wal3.closeAndDelete();
}
}
/**
* Test case of HRegion that is only made out of bulk loaded files. Assert
* that we don't 'crash'.
@ -210,8 +261,8 @@ public class TestWALReplay {
HLog wal2 = createWAL(this.conf);
HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, null) {
@Override
protected void restoreEdit(KeyValue kv) throws IOException {
super.restoreEdit(kv);
protected boolean restoreEdit(Store s, KeyValue kv) {
super.restoreEdit(s, kv);
throw new RuntimeException("Called when it should not have been!");
}
};
@ -221,7 +272,7 @@ public class TestWALReplay {
assertTrue(seqid + result.size() < seqid2);
// Next test. Add more edits, then 'crash' this region by stealing its wal
// out from under it and assert that replay of the log addes the edits back
// out from under it and assert that replay of the log adds the edits back
// correctly when region is opened again.
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
@ -242,9 +293,10 @@ public class TestWALReplay {
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
@Override
protected void restoreEdit(KeyValue kv) throws IOException {
super.restoreEdit(kv);
protected boolean restoreEdit(Store s, KeyValue kv) {
boolean b = super.restoreEdit(s, kv);
countOfRestoredEdits.incrementAndGet();
return b;
}
};
long seqid3 = region3.initialize();
@ -317,14 +369,20 @@ public class TestWALReplay {
newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
// Make a new wal for new region.
HLog newWal = createWAL(newConf);
final AtomicInteger flushcount = new AtomicInteger(0);
try {
TestFlusher flusher = new TestFlusher();
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
flusher);
flusher.r = region;
null) {
protected boolean internalFlushcache(HLog wal, long myseqid)
throws IOException {
boolean b = super.internalFlushcache(wal, myseqid);
flushcount.incrementAndGet();
return b;
};
};
long seqid = region.initialize();
// Assert we flushed.
assertTrue(flusher.count > 0);
// We flushed during init.
assertTrue(flushcount.get() > 0);
assertTrue(seqid > wal.getSequenceNumber());
Get get = new Get(rowName);
@ -338,23 +396,6 @@ public class TestWALReplay {
}
}
// Flusher used in this test. Keep count of how often we are called and
// actually run the flush inside here.
class TestFlusher implements FlushRequester {
private int count = 0;
private HRegion r;
@Override
public void request(HRegion region) {
count++;
try {
r.flushcache();
} catch (IOException e) {
throw new RuntimeException("Exception flushing", e);
}
}
}
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HLog wal)