HBASE-1025 Reconstruction log playback has no bounds on memory used

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@956706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-06-21 21:33:17 +00:00
parent d59d054fc9
commit 52ae2e469b
14 changed files with 824 additions and 500 deletions

View File

@ -402,6 +402,7 @@ Release 0.21.0 - Unreleased
regions
HBASE-2760 Fix MetaScanner TableNotFoundException when scanning starting at
the first row in a table.
HBASE-1025 Reconstruction log playback has no bounds on memory used
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -139,9 +139,6 @@ public final class HConstants {
/** Like the previous, but for old logs that are about to be deleted */
public static final String HREGION_OLDLOGDIR_NAME = ".oldlogs";
/** Name of old log file for reconstruction */
public static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";
/** Used to construct the name of the compaction directory during compaction */
public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir";

View File

@ -153,12 +153,12 @@ class HMerge {
if (currentRegion == null) {
currentRegion =
HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null);
currentRegion.initialize(null, null);
currentRegion.initialize();
currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion =
HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null);
nextRegion.initialize(null, null);
nextRegion.initialize();
nextSize = nextRegion.getLargestHStoreSize();
if ((currentSize + nextSize) <= (maxFilesize / 2)) {
@ -330,7 +330,7 @@ class HMerge {
root = HRegion.newHRegion(rootTableDir, hlog, fs, conf,
HRegionInfo.ROOT_REGIONINFO, null);
root.initialize(null, null);
root.initialize();
Scan scan = new Scan();
scan.addColumn(HConstants.CATALOG_FAMILY,

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -70,6 +71,7 @@ import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -222,7 +224,6 @@ public class HRegion implements HeapSize { // , Writable{
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
private final Object splitLock = new Object();
private long minSequenceId;
private boolean splitRequest;
private final ReadWriteConsistencyControl rwcc =
@ -306,51 +307,42 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
* Initialize this region and get it ready to roll.
* Called after construction.
*
* @param initialFiles path
* @param reporter progressable
* Initialize this region.
* @return What the next sequence (edit) id should be.
* @throws IOException e
*/
public void initialize(Path initialFiles, final Progressable reporter)
public long initialize() throws IOException {
return initialize(null);
}
/**
* Initialize this region.
*
* @param reporter Tickle every so often if initialize is taking a while.
* @return What the next sequence (edit) id should be.
* @throws IOException e
*/
public long initialize(final Progressable reporter)
throws IOException {
Path oldLogFile = new Path(regiondir, HConstants.HREGION_OLDLOGFILE_NAME);
moveInitialFilesIntoPlace(this.fs, initialFiles, this.regiondir);
// Write HRI to a file in case we need to recover .META.
checkRegioninfoOnFilesystem();
// Load in all the HStores.
// Load in all the HStores. Get min and max seqids across all families.
long maxSeqId = -1;
long minSeqIdToRecover = Integer.MAX_VALUE;
long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
Store store = instantiateHStore(this.basedir, c);
this.stores.put(c.getName(), store);
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery();
if (storeSeqIdBeforeRecovery < minSeqIdToRecover) {
minSeqIdToRecover = storeSeqIdBeforeRecovery;
if (minSeqId > storeSeqId) {
minSeqId = storeSeqId;
}
}
// Play log if one. Delete when done.
doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter);
if (fs.exists(oldLogFile)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting old log file: " + oldLogFile);
}
fs.delete(oldLogFile, false);
}
// Add one to the current maximum sequence id so new edits are beyond.
this.minSequenceId = maxSeqId + 1;
// Recover any edits if available.
long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
@ -363,11 +355,13 @@ public class HRegion implements HeapSize { // , Writable{
this.writestate.setReadOnly(true);
}
// HRegion is ready to go!
this.writestate.compacting = false;
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("region " + this +
" available; sequence id is " + this.minSequenceId);
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = Math.max(seqid, maxSeqId) + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
return nextSeqid;
}
/*
@ -423,14 +417,6 @@ public class HRegion implements HeapSize { // , Writable{
}
}
/**
* @return Updates to this region need to have a sequence id that is >= to
* the this number.
*/
long getMinSequenceId() {
return this.minSequenceId;
}
/** @return a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
return this.regionInfo;
@ -613,7 +599,6 @@ public class HRegion implements HeapSize { // , Writable{
}
/** @return the last time the region was flushed */
@SuppressWarnings({"UnusedDeclaration"})
public long getLastFlushTime() {
return this.lastFlushTime;
}
@ -1865,19 +1850,199 @@ public class HRegion implements HeapSize { // , Writable{
return size > this.memstoreFlushSize;
}
// Do any reconstruction needed from the log
protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
Progressable reporter)
/**
* Read the edits log put under this region by wal log splitting process. Put
* the recovered edits back up into this region.
*
* We can ignore any log message that has a sequence ID that's equal to or
* lower than minSeqId. (Because we know such log messages are already
* reflected in the HFiles.)
* @param regiondir
* @param minSeqId Minimum sequenceid found in a store file. Edits in log
* must be larger than this to be replayed.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log, or -1 if no log recovered
* @throws UnsupportedEncodingException
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
// Nothing to do (Replaying is done in HStores)
// Used by subclasses; e.g. THBase.
Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
if (edits == null || !this.fs.exists(edits)) return -1;
if (isZeroLengthThenDelete(this.fs, edits)) return -1;
long maxSeqIdInLog = -1;
try {
maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
LOG.debug("Deleting recovered edits file: " + edits);
if (!this.fs.delete(edits, false)) {
LOG.error("Failed delete of " + edits);
}
} catch (IOException e) {
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
if (skipErrors) {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
System.currentTimeMillis());
LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
" as " + moveAsideName, e);
if (!this.fs.rename(edits, moveAsideName)) {
LOG.error("hbase.skip.errors=true so continuing. Rename failed");
}
} else {
throw e;
}
}
return maxSeqIdInLog;
}
protected Store instantiateHStore(Path baseDir,
HColumnDescriptor c, Path oldLogFile, Progressable reporter)
/*
* @param edits File of recovered edits.
* @param minSeqId Minimum sequenceid found in a store file. Edits in log
* must be larger than this to be replayed.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log, or -1 if no log recovered
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final Progressable reporter)
throws IOException {
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
return replayRecoveredEdits(reader, minSeqId, reporter);
} finally {
reader.close();
}
}
/* @param reader Reader against file of recovered edits.
* @param minSeqId Minimum sequenceid found in a store file. Edits in log
* must be larger than this to be replayed.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log, or -1 if no log recovered
* @throws IOException
*/
private long replayRecoveredEdits(final HLog.Reader reader,
final long minSeqId, final Progressable reporter)
throws IOException {
return new Store(baseDir, this, c, this.fs, oldLogFile,
this.conf, reporter);
long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
HLog.Entry entry;
Store store = null;
// Get map of family name to maximum sequence id. Do it here up front
// because as we progress, the sequence id can change if we happen to flush
// The flush ups the seqid for the Store. The new seqid can cause us skip edits.
Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
// How many edits to apply before we send a progress report.
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
if (key.getLogSeqNum() <= minSeqId) {
skippedEdits++;
continue;
}
for (KeyValue kv : val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) {
skippedEdits++;
continue;
}
// Figure which store the edit is meant for.
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
// crash and redeploy?
LOG.warn("No family for " + kv);
skippedEdits++;
continue;
}
// The edits' id has to be in excess of the original max seqid of the
// targeted store.
long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
if (currentEditSeqId < storeMaxSeqId) {
skippedEdits++;
continue;
}
restoreEdit(kv);
editsCount++;
}
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % interval) == 0) {
reporter.progress();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
", firstSeqIdInLog=" + firstSeqIdInLog +
", maxSeqIdInLog=" + currentEditSeqId);
}
return currentEditSeqId;
}
/*
* @param stores
* @return Map of family name to maximum sequenceid.
*/
private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte [], Store> e: stores.entrySet()) {
map.put(e.getKey(), e.getValue().getMaxSequenceId());
}
return map;
}
/*
* @param kv Apply this value to this region.
* @throws IOException
*/
// This method is protected so can be called from tests.
protected void restoreEdit(final KeyValue kv) throws IOException {
// This is really expensive to do per edit. Loads of object creation.
// TODO: Optimization. Apply edits batched by family.
Map<byte [], List<KeyValue>> map =
new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
map.put(kv.getFamily(), Collections.singletonList(kv));
if (kv.isDelete()) {
delete(map, true);
} else {
put(map, true);
}
}
/*
* @param fs
* @param p File to check.
* @return True if file was zero-length (and if so, we'll delete it in here).
* @throws IOException
*/
private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
throws IOException {
FileStatus stat = fs.getFileStatus(p);
if (stat.getLen() > 0) return false;
LOG.warn("File " + p + " is zero-length, deleting.");
fs.delete(p, false);
return true;
}
protected Store instantiateHStore(Path baseDir, HColumnDescriptor c)
throws IOException {
return new Store(baseDir, this, c, this.fs, this.conf);
}
/**
@ -2361,7 +2526,7 @@ public class HRegion implements HeapSize { // , Writable{
new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf, null),
fs, conf, info, null);
region.initialize(null, null);
region.initialize();
return region;
}
@ -2390,9 +2555,9 @@ public class HRegion implements HeapSize { // , Writable{
HRegion r = HRegion.newHRegion(
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
log, FileSystem.get(conf), conf, info, null);
r.initialize(null, null);
long seqid = r.initialize();
if (log != null) {
log.setSequenceNumber(r.getMinSequenceId());
log.setSequenceNumber(seqid);
}
return r;
}
@ -2704,7 +2869,7 @@ public class HRegion implements HeapSize { // , Writable{
listPaths(fs, newRegionDir);
}
HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null);
dstRegion.initialize(null, null);
dstRegion.initialize();
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
@ -2974,7 +3139,7 @@ public class HRegion implements HeapSize { // , Writable{
throw new IOException("Not a known catalog table: " + p.toString());
}
try {
region.initialize(null, null);
region.initialize();
if (majorCompact) {
region.compactStores(true);
} else {

View File

@ -1241,26 +1241,6 @@ public class HRegionServer implements HRegionInterface,
return result;
}
private HServerInfo createServerInfoWithNewStartCode(final HServerInfo hsi) {
return new HServerInfo(hsi.getServerAddress(), hsi.getInfoPort(),
hsi.getHostname());
}
/* Add to the outbound message buffer */
private void reportOpen(HRegionInfo region) {
this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
}
/* Add to the outbound message buffer */
private void reportClose(HRegionInfo region) {
reportClose(region, null);
}
/* Add to the outbound message buffer */
private void reportClose(final HRegionInfo region, final byte[] message) {
this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
}
/**
* Add to the outbound message buffer
*
@ -1431,7 +1411,7 @@ public class HRegionServer implements HRegionInterface,
if (region == null) {
try {
zkUpdater.startRegionOpenEvent(null, true);
region = instantiateRegion(regionInfo);
region = instantiateRegion(regionInfo, this.hlog);
// Startup a compaction early if one is needed, if region has references
// or if a store has too many store files
if (region.hasReferences() || region.hasTooManyStoreFiles()) {
@ -1458,7 +1438,6 @@ public class HRegionServer implements HRegionInterface,
}
this.lock.writeLock().lock();
try {
this.hlog.setSequenceNumber(region.getMinSequenceId());
this.onlineRegions.put(mapKey, region);
} finally {
this.lock.writeLock().unlock();
@ -1472,16 +1451,28 @@ public class HRegionServer implements HRegionInterface,
}
}
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = HRegion.newHRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
.getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
this.cacheFlusher);
r.initialize(null, new Progressable() {
/*
* @param regionInfo RegionInfo for the Region we're to instantiate and
* initialize.
* @param wal Set into here the regions' seqid.
* @return
* @throws IOException
*/
protected HRegion instantiateRegion(final HRegionInfo regionInfo, final HLog wal)
throws IOException {
Path dir =
HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName());
HRegion r = HRegion.newHRegion(dir, this.hlog, this.fs, conf, regionInfo,
this.cacheFlusher);
long seqid = r.initialize(new Progressable() {
public void progress() {
addProcessingMessage(regionInfo);
}
});
// If a wal and its seqid is < that of new region, use new regions seqid.
if (wal != null) {
if (seqid > wal.getSequenceNumber()) wal.setSequenceNumber(seqid);
}
return r;
}

View File

@ -19,6 +19,18 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -39,32 +51,14 @@ import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.EOFException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
* or more StoreFiles, which stretch backwards over time.
@ -74,28 +68,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* services to manage sets of StoreFiles. One of the most important of those
* services is compaction services where files are aggregated once they pass
* a configurable threshold.
*
* <p>The only thing having to do with logs that Store needs to deal with is
* the reconstructionLog. This is a segment of an HRegion's log that might
* NOT be present upon startup. If the param is NULL, there's nothing to do.
* If the param is non-NULL, we need to process the log to reconstruct
* a TreeMap that might not have been written to disk before the process
* died.
*
* <p>It's assumed that after this constructor returns, the reconstructionLog
* file will be deleted (by whoever has instantiated the Store).
*
* <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager.
*/
public class Store implements HeapSize {
static final Log LOG = LogFactory.getLog(Store.class);
/**
* Comparator that looks at columns and compares their family portions.
* Presumes columns have already been checked for presence of delimiter.
* If no delimiter present, presume the buffer holds a store name so no need
* of a delimiter.
*/
protected final MemStore memstore;
// This stores directory in the filesystem.
private final Path homedir;
@ -111,7 +89,6 @@ public class Store implements HeapSize {
private volatile long storeSize = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final byte [] storeName;
private final String storeNameStr;
private final boolean inMemory;
@ -126,14 +103,6 @@ public class Store implements HeapSize {
private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
new CopyOnWriteArraySet<ChangedReadersObserver>();
// The most-recent log-seq-ID. The most-recent such ID means we can ignore
// all log messages up to and including that ID (because they're already
// reflected in the TreeMaps).
private volatile long maxSeqId = -1;
// The most-recent log-seq-id before we recovered from the LOG.
private long maxSeqIdBeforeLogRecovery = -1;
private final Path regionCompactionDir;
private final Object compactLock = new Object();
private final int compactionThreshold;
@ -151,21 +120,16 @@ public class Store implements HeapSize {
* @param region
* @param family HColumnDescriptor for this column
* @param fs file system object
* @param reconstructionLog existing log file to apply if any
* @param conf configuration object
* @param reporter Call on a period so hosting server can report we're
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
* @throws IOException
*/
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Path reconstructionLog, Configuration conf,
final Progressable reporter)
FileSystem fs, Configuration conf)
throws IOException {
HRegionInfo info = region.regionInfo;
this.fs = fs;
this.homedir = getStoreHomedir(basedir, info.getEncodedName(),
family.getName());
this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
if (!this.fs.exists(this.homedir)) {
if (!this.fs.mkdirs(this.homedir))
throw new IOException("Failed create of: " + this.homedir.toString());
@ -191,8 +155,7 @@ public class Store implements HeapSize {
this.memstore = new MemStore(this.comparator);
this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir),
info.getEncodedName());
this.storeName = this.family.getName();
this.storeNameStr = Bytes.toString(this.storeName);
this.storeNameStr = Bytes.toString(this.family.getName());
// By default, we compact if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
@ -219,29 +182,18 @@ public class Store implements HeapSize {
}
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
// loadStoreFiles calculates this.maxSeqId. as side-effect.
this.storefiles = ImmutableList.copyOf(loadStoreFiles());
this.maxSeqIdBeforeLogRecovery = this.maxSeqId;
// Do reconstruction log.
long newId = runReconstructionLog(reconstructionLog, this.maxSeqId, reporter);
if (newId != -1) {
this.maxSeqId = newId; // start with the log id we just recovered.
}
}
HColumnDescriptor getFamily() {
return this.family;
}
/**
* @return The maximum sequence id in all store files.
*/
long getMaxSequenceId() {
return this.maxSeqId;
}
long getMaxSeqIdBeforeLogRecovery() {
return maxSeqIdBeforeLogRecovery;
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
}
/**
@ -256,140 +208,6 @@ public class Store implements HeapSize {
new Path(Bytes.toString(family))));
}
/*
* Run reconstruction log
* @param reconstructionLog
* @param msid
* @param reporter
* @return the new max sequence id as per the log
* @throws IOException
*/
private long runReconstructionLog(final Path reconstructionLog,
final long msid, final Progressable reporter)
throws IOException {
try {
return doReconstructionLog(reconstructionLog, msid, reporter);
} catch (EOFException e) {
// Presume we got here because of lack of HADOOP-1700; for now keep going
// but this is probably not what we want long term. If we got here there
// has been data-loss
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
" opening " + Bytes.toString(this.storeName) +
" -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
} catch (IOException e) {
// Presume we got here because of some HDFS issue. Don't just keep going.
// Fail to open the HStore. Probably means we'll fail over and over
// again until human intervention but alternative has us skipping logs
// and losing edits: HBASE-642.
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
" opening " + Bytes.toString(this.storeName), e);
throw e;
}
return -1;
}
/*
* Read the reconstructionLog and put into memstore.
*
* We can ignore any log message that has a sequence ID that's equal to or
* lower than maxSeqID. (Because we know such log messages are already
* reflected in the HFiles.)
*
* @return the new max sequence id as per the log, or -1 if no log recovered
*/
private long doReconstructionLog(final Path reconstructionLog,
final long maxSeqID, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
if (reconstructionLog == null || !this.fs.exists(reconstructionLog)) {
// Nothing to do.
return -1;
}
FileStatus stat = this.fs.getFileStatus(reconstructionLog);
if (stat.getLen() <= 0) {
LOG.warn("Passed reconstruction log " + reconstructionLog +
" is zero-length. Deleting existing file");
fs.delete(reconstructionLog, false);
return -1;
}
// TODO: This could grow large and blow heap out. Need to get it into
// general memory usage accounting.
long maxSeqIdInLog = -1;
long firstSeqIdInLog = -1;
HLog.Reader logReader = HLog.getReader(this.fs, reconstructionLog, conf);
try {
long skippedEdits = 0;
long editsCount = 0;
// How many edits to apply before we send a progress report.
int reportInterval =
this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
HLog.Entry entry;
// TBD: Need to add an exception handler around logReader.next.
//
// A transaction now appears as a single edit. If logReader.next()
// returns an exception, then it must be a incomplete/partial
// transaction at the end of the file. Rather than bubble up
// the exception, we should catch it and simply ignore the
// partial transaction during this recovery phase.
//
while ((entry = logReader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
if (key.getLogSeqNum() <= maxSeqID) {
skippedEdits++;
continue;
}
for (KeyValue kv : val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
!kv.matchingFamily(family.getName())) {
continue;
}
if (kv.isDelete()) {
this.memstore.delete(kv);
} else {
this.memstore.add(kv);
}
editsCount++;
}
// Every 2k edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % reportInterval) == 0) {
reporter.progress();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
"; store maxSeqID=" + maxSeqID +
", firstSeqIdInLog=" + firstSeqIdInLog +
", maxSeqIdInLog=" + maxSeqIdInLog);
}
} finally {
logReader.close();
}
if (maxSeqIdInLog > -1) {
// We read some edits, so we should flush the memstore
StoreFlusher flusher = getStoreFlusher(maxSeqIdInLog);
flusher.prepare();
flusher.flushCache();
boolean needCompaction = flusher.commit();
if (needCompaction) {
this.compact(false);
}
}
return maxSeqIdInLog;
}
/*
* Creates a series of StoreFile loaded from the given directory.
* @throws IOException
@ -428,7 +246,6 @@ public class Store implements HeapSize {
}
results.add(curfile);
}
maxSeqId = StoreFile.getMaxSequenceIdInList(results);
Collections.sort(results, StoreFile.Comparators.FLUSH_TIME);
return results;
}
@ -584,7 +401,6 @@ public class Store implements HeapSize {
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
return internalFlushCache(snapshot, logCacheFlushId);
}
/*

View File

@ -270,7 +270,8 @@ public class StoreFile {
* the given list. Store files that were created by a mapreduce
* bulk load are ignored, as they do not correspond to any edit
* log items.
* @return 0 if no non-bulk-load files are provided
* @return 0 if no non-bulk-load files are provided or, this is Store that
* does not yet have any store files.
*/
public static long getMaxSequenceIdInList(List<StoreFile> sfs) {
long max = 0;

View File

@ -35,7 +35,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@ -142,6 +141,11 @@ public class HLog implements Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
/** Name of file that holds recovered edits written by the wal log splitting
* code, one per region
*/
public static final String RECOVERED_EDITS = "recovered.edits";
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
@ -1257,8 +1261,7 @@ public class HLog implements Syncable {
// Number of logs in a read batch
// More means faster but bigger mem consumption
//TODO make a note on the conf rename and update hbase-site.xml if needed
int logFilesPerStep =
conf.getInt("hbase.hlog.split.batch.size", 3);
int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
@ -1626,7 +1629,7 @@ public class HLog implements Syncable {
HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
Path regionDir =
HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
return new Path(regionDir, HConstants.HREGION_OLDLOGFILE_NAME);
return new Path(regionDir, RECOVERED_EDITS);
}

View File

@ -168,7 +168,7 @@ public abstract class HBaseTestCase extends TestCase {
HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
closedRegion.getFilesystem(), closedRegion.getConf(),
closedRegion.getRegionInfo(), null);
r.initialize(null, null);
r.initialize();
return r;
}

View File

@ -19,8 +19,12 @@
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
@ -51,7 +55,13 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.zookeeper.ZooKeeper;
import static org.junit.Assert.*;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import com.google.common.base.Preconditions;
/**
* Facility for testing HBase. Added as tool to abet junit4 testing. Replaces
@ -829,4 +839,82 @@ public class HBaseTestingUtility {
}
}
/**
* This method clones the passed <code>c</code> configuration setting a new
* user into the clone. Use it getting new instances of FileSystem. Only
* works for DistributedFileSystem.
* @param c Initial configuration
* @param differentiatingSuffix Suffix to differentiate this user from others.
* @return A new configuration instance with a different user set into it.
* @throws IOException
*/
public static Configuration setDifferentUser(final Configuration c,
final String differentiatingSuffix)
throws IOException {
FileSystem currentfs = FileSystem.get(c);
Preconditions.checkArgument(currentfs instanceof DistributedFileSystem);
// Else distributed filesystem. Make a new instance per daemon. Below
// code is taken from the AppendTestUtil over in hdfs.
Configuration c2 = new Configuration(c);
String username = UserGroupInformation.getCurrentUGI().getUserName() +
differentiatingSuffix;
UnixUserGroupInformation.saveToConf(c2,
UnixUserGroupInformation.UGI_PROPERTY_NAME,
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
return c2;
}
/**
* Set soft and hard limits in namenode.
* You'll get a NPE if you call before you've started a minidfscluster.
* @param soft Soft limit
* @param hard Hard limit
* @throws NoSuchFieldException
* @throws SecurityException
* @throws IllegalAccessException
* @throws IllegalArgumentException
*/
public void setNameNodeNameSystemLeasePeriod(final int soft, final int hard)
throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
// TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another.
// Not available in 0.20 hdfs. Use reflection to make it happen.
// private NameNode nameNode;
Field field = this.dfsCluster.getClass().getDeclaredField("nameNode");
field.setAccessible(true);
NameNode nn = (NameNode)field.get(this.dfsCluster);
nn.namesystem.leaseManager.setLeasePeriod(100, 50000);
}
/**
* Set maxRecoveryErrorCount in DFSClient. Currently its hard-coded to 5 and
* makes tests linger. Here is the exception you'll see:
* <pre>
* 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
* </pre>
* @param stream A DFSClient.DFSOutputStream.
* @param max
* @throws NoSuchFieldException
* @throws SecurityException
* @throws IllegalAccessException
* @throws IllegalArgumentException
*/
public static void setMaxRecoveryErrorCount(final OutputStream stream,
final int max)
throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
for (Class<?> clazz: clazzes) {
String className = clazz.getSimpleName();
if (className.equals("DFSOutputStream")) {
if (clazz.isInstance(stream)) {
Field maxRecoveryErrorCountField =
stream.getClass().getDeclaredField("maxRecoveryErrorCount");
maxRecoveryErrorCountField.setAccessible(true);
maxRecoveryErrorCountField.setInt(stream, max);
break;
}
}
}
}
}

View File

@ -20,22 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -45,6 +29,22 @@ import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Test class fosr the Store
*/
@ -96,10 +96,8 @@ public class TestStore extends TestCase {
Path logdir = new Path(DIR+methodName+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(family);
HBaseConfiguration conf = new HBaseConfiguration();
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Path reconstructionLog = null;
Progressable reporter = null;
fs.delete(logdir, true);
@ -109,8 +107,7 @@ public class TestStore extends TestCase {
HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null);
HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
store = new Store(basedir, region, hcd, fs, reconstructionLog, conf,
reporter);
store = new Store(basedir, region, hcd, fs, conf);
}
@ -133,7 +130,7 @@ public class TestStore extends TestCase {
StoreFile f = this.store.getStorefiles().get(0);
Path storedir = f.getPath().getParent();
long seqid = f.getMaxSequenceId();
HBaseConfiguration c = new HBaseConfiguration();
Configuration c = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(c);
StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
StoreFile.DEFAULT_BLOCKSIZE_SMALL);
@ -143,7 +140,7 @@ public class TestStore extends TestCase {
// Reopen it... should pick up two files
this.store = new Store(storedir.getParent().getParent(),
this.store.getHRegion(),
this.store.getFamily(), fs, null, c, null);
this.store.getFamily(), fs, c);
System.out.println(this.store.getHRegionInfo().getEncodedName());
assertEquals(2, this.store.getStorefilesCount());
this.store.get(get, qualifiers, result);
@ -318,4 +315,4 @@ public class TestStore extends TestCase {
storeFlusher.flushCache();
storeFlusher.commit();
}
}
}

View File

@ -1,157 +0,0 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
public class TestStoreReconstruction {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Path dir;
private MiniDFSCluster cluster;
private static final String TABLE = "testtable";
private static final int TOTAL_EDITS = 10000;
private Configuration conf;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception { }
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
conf = TEST_UTIL.getConfiguration();
cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
// Set the hbase.rootdir to be the home directory in mini dfs.
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR,
this.cluster.getFileSystem().getHomeDirectory().toString());
this.dir = new Path("/hbase", TABLE);
conf.setInt("hbase.regionserver.flushlogentries", 1);
if (cluster.getFileSystem().exists(dir)) {
cluster.getFileSystem().delete(dir, true);
}
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {}
/**
* Create a Store with the result of a HLog split and test we only
* see the good edits
* @throws Exception
*/
@Test
public void runReconstructionLog() throws Exception {
byte[] family = Bytes.toBytes("column");
HColumnDescriptor hcd = new HColumnDescriptor(family);
HTableDescriptor htd = new HTableDescriptor(TABLE);
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logDir = new Path(this.dir, HConstants.HREGION_LOGDIR_NAME);
HLog log = new HLog(cluster.getFileSystem(), logDir, oldLogDir, conf, null);
HRegion region = new HRegion(dir, log,
cluster.getFileSystem(),conf, info, null);
List<KeyValue> result = new ArrayList<KeyValue>();
// Empty set to get all columns
NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
final byte[] tableName = Bytes.toBytes(TABLE);
final byte[] rowName = tableName;
final byte[] regionName = info.getRegionName();
// Add 10 000 edits to HLog on the good family
for (int j = 0; j < TOTAL_EDITS; j++) {
byte[] qualifier = Bytes.toBytes(Integer.toString(j));
byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
log.append(info, tableName, edit,
System.currentTimeMillis());
}
// Add a cache flush, shouldn't have any effect
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
System.currentTimeMillis(), rowName));
log.append(info, tableName, edit,
System.currentTimeMillis());
log.sync();
// TODO dont close the file here.
log.close();
List<Path> splits = HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)),
logDir, oldLogDir, cluster.getFileSystem(), conf);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
// Make sure the file exists
assertTrue(cluster.getFileSystem().exists(splits.get(0)));
// This will run the log reconstruction
Store store = new Store(dir, region, hcd, cluster.getFileSystem(),
splits.get(0), conf, null);
Get get = new Get(rowName);
store.get(get, qualifiers, result);
// Make sure we only see the good edits
assertEquals(TOTAL_EDITS, result.size());
}
}

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -39,15 +38,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
import org.junit.AfterClass;
@ -125,17 +121,7 @@ public class TestHLogSplit {
Collections.addAll(regions, "bbb", "ccc");
InstrumentedSequenceFileLogWriter.activateFailure = false;
// Set the soft lease for hdfs to be down from default of 5 minutes or so.
// TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another.
// Not available in 0.20 hdfs
// TEST_UTIL.getDFSCluster().getNamesystem().leaseManager.
// setLeasePeriod(100, 50000);
// Use reflection to get at the 0.20 version of above.
MiniDFSCluster dfsCluster = TEST_UTIL.getDFSCluster();
// private NameNode nameNode;
Field field = dfsCluster.getClass().getDeclaredField("nameNode");
field.setAccessible(true);
NameNode nn = (NameNode)field.get(dfsCluster);
nn.namesystem.leaseManager.setLeasePeriod(100, 50000);
TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 50000);
}
@After
@ -628,7 +614,7 @@ public class TestHLogSplit {
return new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootdir, table),
HRegionInfo.encodeRegionName(region.getBytes())),
HConstants.HREGION_OLDLOGFILE_NAME);
HLog.RECOVERED_EDITS);
}
private void corruptHLog(Path path, Corruptions corruption, boolean close,
@ -736,8 +722,8 @@ public class TestHLogSplit {
FileStatus[] f2 = fs.listStatus(p2);
for (int i=0; i<f1.length; i++) {
if (!logsAreEqual(new Path(f1[i].getPath(), HConstants.HREGION_OLDLOGFILE_NAME),
new Path(f2[i].getPath(), HConstants.HREGION_OLDLOGFILE_NAME))) {
if (!logsAreEqual(new Path(f1[i].getPath(), HLog.RECOVERED_EDITS),
new Path(f2[i].getPath(), HLog.RECOVERED_EDITS))) {
return -1;
}
}

View File

@ -0,0 +1,436 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mortbay.log.Log;
/**
* Test replay of edits out of a WAL split.
*/
public class TestWALReplay {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
private Path hbaseRootDir = null;
private Path oldLogDir;
private Path logDir;
private FileSystem fs;
private Configuration conf;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("dfs.support.append", true);
// The below config not supported until
conf.setInt("dfs.client.block.recovery.retries", 2);
conf.setInt("hbase.regionserver.flushlogentries", 1);
TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
Path hbaseRootDir =
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
Log.info("hbase.rootdir=" + hbaseRootDir);
conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniDFSCluster();
}
@Before
public void setUp() throws Exception {
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
}
@After
public void tearDown() throws Exception {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
/*
* @param p Directory to cleanup
*/
private void deleteDir(final Path p) throws IOException {
if (this.fs.exists(p)) {
if (!this.fs.delete(p, true)) {
throw new IOException("Failed remove of " + p);
}
}
}
/**
* Test case of HRegion that is only made out of bulk loaded files. Assert
* that we don't 'crash'.
* @throws IOException
* @throws IllegalAccessException
* @throws NoSuchFieldException
* @throws IllegalArgumentException
* @throws SecurityException
*/
@Test
public void testRegionMadeOfBulkLoadedFilesOnly()
throws IOException, SecurityException, IllegalArgumentException,
NoSuchFieldException, IllegalAccessException {
final String tableNameStr = "testReplayEditsWrittenViaHRegion";
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
Path basedir = new Path(this.hbaseRootDir, tableNameStr);
deleteDir(basedir);
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(hri, basedir, wal, this.conf);
Path f = new Path(basedir, "hfile");
HFile.Writer writer = new HFile.Writer(this.fs, f);
byte [] family = hri.getTableDesc().getFamilies().iterator().next().getName();
byte [] row = Bytes.toBytes(tableNameStr);
writer.append(new KeyValue(row, family, family, row));
writer.close();
region.bulkLoadHFile(f.toString(), family);
// Add an edit so something in the WAL
region.put((new Put(row)).add(family, family, family));
wal.sync();
// Now 'crash' the region by stealing its wal
Configuration newConf = HBaseTestingUtility.setDifferentUser(this.conf,
tableNameStr);
runWALSplit(newConf);
HLog wal2 = createWAL(newConf);
HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
newConf, hri, null);
long seqid2 = region2.initialize();
assertTrue(seqid2 > -1);
// I can't close wal1. Its been appropriated when we split.
region2.close();
wal2.closeAndDelete();
}
/**
* Test writing edits into an HRegion, closing it, splitting logs, opening
* Region again. Verify seqids.
* @throws IOException
* @throws IllegalAccessException
* @throws NoSuchFieldException
* @throws IllegalArgumentException
* @throws SecurityException
*/
@Test
public void testReplayEditsWrittenViaHRegion()
throws IOException, SecurityException, IllegalArgumentException,
NoSuchFieldException, IllegalAccessException {
final String tableNameStr = "testReplayEditsWrittenViaHRegion";
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
Path basedir = new Path(this.hbaseRootDir, tableNameStr);
deleteDir(basedir);
final byte[] rowName = Bytes.toBytes(tableNameStr);
final int countPerFamily = 10;
// Write countPerFamily edits into the three families. Do a flush on one
// of the families during the load of edits so its seqid is not same as
// others to test we do right thing when different seqids.
HLog wal = createWAL(this.conf);
HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, null);
long seqid = region.initialize();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal.setSequenceNumber(seqid);
boolean first = true;
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
if (first ) {
// If first, so we have at least one family w/ different seqid to rest.
region.flushcache();
first = false;
}
}
// Now assert edits made it in.
Get g = new Get(rowName);
Result result = region.get(g, null);
assertEquals(countPerFamily * hri.getTableDesc().getFamilies().size(),
result.size());
// Now close the region, split the log, reopen the region and assert that
// replay of log has no effect, that our seqids are calculated correctly so
// all edits in logs are seen as 'stale'/old.
region.close();
wal.close();
runWALSplit(this.conf);
HLog wal2 = createWAL(this.conf);
HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, null) {
@Override
protected void restoreEdit(KeyValue kv) throws IOException {
super.restoreEdit(kv);
throw new RuntimeException("Called when it should not have been!");
}
};
long seqid2 = region2.initialize();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal2.setSequenceNumber(seqid2);
assertTrue(seqid + result.size() < seqid2);
// Next test. Add more edits, then 'crash' this region by stealing its wal
// out from under it and assert that replay of the log addes the edits back
// correctly when region is opened again.
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
}
// Get count of edits.
Result result2 = region2.get(g, null);
assertEquals(2 * result.size(), result2.size());
wal2.sync();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal2.getOutputStream(), 1);
Configuration newConf = HBaseTestingUtility.setDifferentUser(this.conf,
tableNameStr);
runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf);
// Make a new wal for new region open.
HLog wal3 = createWAL(newConf);
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
@Override
protected void restoreEdit(KeyValue kv) throws IOException {
super.restoreEdit(kv);
countOfRestoredEdits.incrementAndGet();
}
};
long seqid3 = region3.initialize();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal3.setSequenceNumber(seqid3);
Result result3 = region3.get(g, null);
// Assert that count of cells is same as before crash.
assertEquals(result2.size(), result3.size());
assertEquals(hri.getTableDesc().getFamilies().size() * countPerFamily,
countOfRestoredEdits.get());
// I can't close wal1. Its been appropriated when we split.
region3.close();
wal3.closeAndDelete();
}
/**
* Create an HRegion with the result of a HLog split and test we only see the
* good edits
* @throws Exception
*/
@Test
public void testReplayEditsWrittenIntoWAL() throws Exception {
final String tableNameStr = "testReplayEditsWrittenIntoWAL";
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
Path basedir = new Path(hbaseRootDir, tableNameStr);
deleteDir(basedir);
HLog wal = createWAL(this.conf);
final byte[] tableName = Bytes.toBytes(tableNameStr);
final byte[] rowName = tableName;
final byte[] regionName = hri.getRegionName();
// Add 1k to each family.
final int countPerFamily = 1000;
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal);
}
// Add a cache flush, shouldn't have any effect
long logSeqId = wal.startCacheFlush();
wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
long now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
wal.append(hri, tableName, edit, now);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
KeyValue.Type.DeleteFamily));
wal.append(hri, tableName, edit, now);
// Sync.
wal.sync();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
// Make a new conf and a new fs for the splitter to run on so we can take
// over old wal.
Configuration newConf = HBaseTestingUtility.setDifferentUser(this.conf,
".replay.wal.secondtime");
runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf);
// 100k seems to make for about 4 flushes during HRegion#initialize.
newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
// Make a new wal for new region.
HLog newWal = createWAL(newConf);
try {
TestFlusher flusher = new TestFlusher();
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
flusher);
flusher.r = region;
long seqid = region.initialize();
// Assert we flushed.
assertTrue(flusher.count > 0);
assertTrue(seqid > wal.getSequenceNumber());
Get get = new Get(rowName);
Result result = region.get(get, -1);
// Make sure we only see the good edits
assertEquals(countPerFamily * (hri.getTableDesc().getFamilies().size() - 1),
result.size());
region.close();
} finally {
newWal.closeAndDelete();
}
}
// Flusher used in this test. Keep count of how often we are called and
// actually run the flush inside here.
class TestFlusher implements FlushRequester {
private int count = 0;
private HRegion r;
@Override
public void request(HRegion region) {
count++;
try {
r.flushcache();
} catch (IOException e) {
throw new RuntimeException("Exception flushing", e);
}
}
}
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HLog wal)
throws IOException {
String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes,
ee.currentTimeMillis(), columnBytes));
wal.append(hri, tableName, edit, ee.currentTimeMillis());
}
}
private void addRegionEdits (final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HRegion r,
final String qualifierPrefix)
throws IOException {
for (int j = 0; j < count; j++) {
byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
Put p = new Put(rowName);
p.add(family, qualifier, ee.currentTimeMillis(), rowName);
r.put(p);
}
}
/*
* Creates an HRI around an HTD that has <code>tableName</code> and three
* column families named 'a','b', and 'c'.
* @param tableName Name of table to use when we create HTableDescriptor.
*/
private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
htd.addFamily(a);
HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
htd.addFamily(b);
HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
htd.addFamily(c);
return new HRegionInfo(htd, null, null, false);
}
/*
* Run the split. Verify only single split file made.
* @param c
* @return The single split file made
* @throws IOException
*/
private Path runWALSplit(final Configuration c) throws IOException {
FileSystem fs = FileSystem.get(c);
List<Path> splits = HLog.splitLog(this.hbaseRootDir, this.logDir,
this.oldLogDir, fs, c);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
// Make sure the file exists
assertTrue(fs.exists(splits.get(0)));
Log.info("Split file=" + splits.get(0));
return splits.get(0);
}
/*
* @param c
* @return WAL with retries set down from 5 to 1 only.
* @throws IOException
*/
private HLog createWAL(final Configuration c) throws IOException {
HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c, null);
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
try {
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
} catch (Exception e) {
// These exceptions should never happen... make RuntimeException of them
// if they do.
throw new RuntimeException(e);
}
return wal;
}
}