HBASE-586 HRegion runs HStore memcache snapshotting -- fix it so only HStore
knows about workings of memcache HStore knows about workings of memcache This patch moves the running of mecache snapshots out of the control of HRegion and hides the memcache details in HStore. This patch also does a bunch of work on the merge tool fixing a bug in the metautils along the way. The merge tool test was failing. We weren't setting into the HLog the maximum sequence id after we'd opened a region -- as HRS does -- so were losing edits. On the way, refactored the merge tool test to get rid of duplicated code. Finally, cleans up logging in HStore to aid debugging; e.g. we always refer to the sequence id as the 'sequence id' in log messages rather than as seqId or 'sequence record' so can sort log as see state of sequence id transitions. Version 2 changes the order in which things are run in memcache. 532 made it so flushing did snapshot and then cleared the snapshot. Now, we snapshot before we flush a store, then inside in the store flush, we call getSnapshot and then clearSnapshot. M src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java How snapshotting changed. Change test in accordance. M src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Use accessor to get file number. M src/test/org/apache/hadoop/hbase/util/TestMergeTool.java Refactored to remove duplicated code so could tell what was going on. (mergeAndVerify, verifyMerge): Addd. M src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (snapshot): Changed so it no longer returns snapshot. M src/java/org/apache/hadoop/hbase/regionserver/HStore.java Changed log messages removing the useless and adding info to others. (this.maxSeqId): We used to add 1 to this in here in HStore. Let HRegion do it. Its the one that does the machinations w/ sequenceids anyways. Make flushes return the amount flushed. Use this updating the regions memcacheSize accounting. (snapshot): Added method for the region to call. M src/java/org/apache/hadoop/hbase/regionserver/HLog.java Made data members private. (getFileNum): Added accessor for tests. M src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Added info to logs.. (snapshotMemcaches): removed. (internalFlushcache): No longer takes startime. Internally now does some of what used happen in snapshotMemcaches including sending of message to stores to snapshot. (getEntrySize): Added method for calculating size of an update. Used by HRegion and flushing so both come up w/ same answer. M src/java/org/apache/hadoop/hbase/util/Merge.java Add logging of whats happening during merges and fail earlier than we used if stuff is not right. Renamed local variables from region1 to r1, etc., so didn't clash with data members of same name. M src/java/org/apache/hadoop/hbase/util/MetaUtils.java Added a TODO git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@650298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e411d3c61f
commit
cffa3a254e
|
@ -14,6 +14,8 @@ Hbase Change Log
|
|||
HBASE-598 Loggging, no .log file; all goes into .out
|
||||
HBASE-595 RowFilterInterface.rowProcessed() is called *before* fhe final
|
||||
filtering decision is made (Clint Morgan via Stack)
|
||||
HBASE-586 HRegion runs HStore memcache snapshotting -- fix it so only HStore
|
||||
knows about workings of memcache
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-559 MR example job to count table rows
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* HLog stores all the edits to the HStore.
|
||||
|
@ -121,15 +122,15 @@ public class HLog implements HConstants {
|
|||
*/
|
||||
final Map<Text, Long> lastSeqWritten = new ConcurrentHashMap<Text, Long>();
|
||||
|
||||
volatile boolean closed = false;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
private final Integer sequenceLock = new Integer(0);
|
||||
volatile long logSeqNum = 0;
|
||||
private volatile long logSeqNum = 0;
|
||||
|
||||
volatile long filenum = 0;
|
||||
volatile long old_filenum = -1;
|
||||
private volatile long filenum = 0;
|
||||
private volatile long old_filenum = -1;
|
||||
|
||||
volatile int numEntries = 0;
|
||||
private volatile int numEntries = 0;
|
||||
|
||||
// This lock prevents starting a log roll during a cache flush.
|
||||
// synchronized is insufficient because a cache flush spans two method calls.
|
||||
|
@ -167,7 +168,15 @@ public class HLog implements HConstants {
|
|||
fs.mkdirs(dir);
|
||||
rollWriter();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Accessor for tests.
|
||||
* @return Current state of the monotonically increasing file id.
|
||||
*/
|
||||
long getFilenum() {
|
||||
return this.filenum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the compression type for the hlog files.
|
||||
* @param c Configuration to use.
|
||||
|
@ -191,7 +200,7 @@ public class HLog implements HConstants {
|
|||
if (newvalue > logSeqNum) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("changing sequence number from " + logSeqNum + " to " +
|
||||
newvalue);
|
||||
newvalue);
|
||||
}
|
||||
logSeqNum = newvalue;
|
||||
}
|
||||
|
@ -226,8 +235,7 @@ public class HLog implements HConstants {
|
|||
this.writer.close();
|
||||
Path p = computeFilename(old_filenum);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Closing current log writer " + p.toString() +
|
||||
" to get a new one");
|
||||
LOG.debug("Closing current log writer " + FSUtils.getPath(p));
|
||||
}
|
||||
if (filenum > 0) {
|
||||
synchronized (this.sequenceLock) {
|
||||
|
@ -240,7 +248,7 @@ public class HLog implements HConstants {
|
|||
Path newPath = computeFilename(filenum);
|
||||
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
|
||||
HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
|
||||
LOG.info("new log writer created at " + newPath);
|
||||
LOG.info("New log writer created at " + FSUtils.getPath(newPath));
|
||||
|
||||
// Can we delete any of the old log files?
|
||||
if (this.outputfiles.size() > 0) {
|
||||
|
@ -295,7 +303,7 @@ public class HLog implements HConstants {
|
|||
}
|
||||
|
||||
private void deleteLogFile(final Path p, final Long seqno) throws IOException {
|
||||
LOG.info("removing old log file " + p.toString() +
|
||||
LOG.info("removing old log file " + FSUtils.getPath(p) +
|
||||
" whose highest sequence/edit id is " + seqno);
|
||||
this.fs.delete(p, true);
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate;
|
|||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
@ -75,7 +76,6 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
|||
* they make up all the data for the rows.
|
||||
*
|
||||
* <p>Each HRegion has a 'startKey' and 'endKey'.
|
||||
*
|
||||
* <p>The first is inclusive, the second is exclusive (except for
|
||||
* the final region) The endKey of region 0 is the same as
|
||||
* startKey for region 1 (if it exists). The startKey for the
|
||||
|
@ -213,8 +213,7 @@ public class HRegion implements HConstants {
|
|||
makeColumnFamilyDirs(fs, basedir, encodedRegionName, colFamily, tabledesc);
|
||||
|
||||
// Because we compacted the source regions we should have no more than two
|
||||
// HStoreFiles per family and there will be no reference stores
|
||||
|
||||
// HStoreFiles per family and there will be no reference store
|
||||
List<HStoreFile> srcFiles = es.getValue();
|
||||
if (srcFiles.size() == 2) {
|
||||
long seqA = srcFiles.get(0).loadInfo(fs);
|
||||
|
@ -222,8 +221,9 @@ public class HRegion implements HConstants {
|
|||
if (seqA == seqB) {
|
||||
// We can't have duplicate sequence numbers
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adjusting sequence number of storeFile " +
|
||||
srcFiles.get(1));
|
||||
LOG.debug("Adjusting sequence id of storeFile " + srcFiles.get(1) +
|
||||
" down by one; sequence id A=" + seqA + ", sequence id B=" +
|
||||
seqB);
|
||||
}
|
||||
srcFiles.get(1).writeInfo(fs, seqB - 1);
|
||||
}
|
||||
|
@ -423,7 +423,11 @@ public class HRegion implements HConstants {
|
|||
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
this.regiondir = new Path(basedir, this.regionInfo.getEncodedName());
|
||||
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Opening region " + this.regionInfo.getRegionName() + "/" +
|
||||
this.regionInfo.getEncodedName());
|
||||
}
|
||||
this.regionCompactionDir =
|
||||
new Path(getCompactionDir(basedir), this.regionInfo.getEncodedName());
|
||||
|
||||
|
@ -452,7 +456,8 @@ public class HRegion implements HConstants {
|
|||
fs.delete(oldLogFile);
|
||||
}
|
||||
|
||||
this.minSequenceId = maxSeqId;
|
||||
// Add one to the current maximum sequence id so new edits are beyond.
|
||||
this.minSequenceId = maxSeqId + 1;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Next sequence id for region " + regionInfo.getRegionName() +
|
||||
" is " + this.minSequenceId);
|
||||
|
@ -478,7 +483,8 @@ public class HRegion implements HConstants {
|
|||
// HRegion is ready to go!
|
||||
this.writestate.compacting = false;
|
||||
this.lastFlushTime = System.currentTimeMillis();
|
||||
LOG.info("region " + this.regionInfo.getRegionName() + " available");
|
||||
LOG.info("region " + this.regionInfo.getRegionName() + "/" +
|
||||
this.regionInfo.getEncodedName() + " available");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -596,7 +602,7 @@ public class HRegion implements HConstants {
|
|||
|
||||
// Don't flush the cache if we are aborting
|
||||
if (!abort) {
|
||||
internalFlushcache(snapshotMemcaches());
|
||||
internalFlushcache();
|
||||
}
|
||||
|
||||
List<HStoreFile> result = new ArrayList<HStoreFile>();
|
||||
|
@ -788,7 +794,7 @@ public class HRegion implements HConstants {
|
|||
// Cleanup
|
||||
boolean deleted = fs.delete(splits); // Get rid of splits directory
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
|
||||
LOG.debug("Cleaned up " + FSUtils.getPath(splits) + " " + deleted);
|
||||
}
|
||||
HRegion regions[] = new HRegion [] {regionA, regionB};
|
||||
return regions;
|
||||
|
@ -915,11 +921,7 @@ public class HRegion implements HConstants {
|
|||
try {
|
||||
lock.readLock().lock(); // Prevent splits and closes
|
||||
try {
|
||||
long startTime = -1;
|
||||
synchronized (updateLock) {// Stop updates while we snapshot the memcaches
|
||||
startTime = snapshotMemcaches();
|
||||
}
|
||||
return internalFlushcache(startTime);
|
||||
return internalFlushcache();
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
@ -931,33 +933,6 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* It is assumed that updates are blocked for the duration of this method
|
||||
*/
|
||||
private long snapshotMemcaches() {
|
||||
if (this.memcacheSize.get() == 0) {
|
||||
return -1;
|
||||
}
|
||||
long startTime = System.currentTimeMillis();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started memcache flush for region " +
|
||||
this.regionInfo.getRegionName() + ". Size " +
|
||||
StringUtils.humanReadableInt(this.memcacheSize.get()));
|
||||
}
|
||||
|
||||
// We reset the aggregate memcache size here so that subsequent updates
|
||||
// will add to the unflushed size
|
||||
this.memcacheSize.set(0L);
|
||||
this.flushRequested = false;
|
||||
|
||||
// Record latest flush time
|
||||
this.lastFlushTime = System.currentTimeMillis();
|
||||
for (HStore hstore: stores.values()) {
|
||||
hstore.snapshotMemcache();
|
||||
}
|
||||
return startTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushing the cache is a little tricky. We have a lot of updates in the
|
||||
* HMemcache, all of which have also been written to the log. We need to
|
||||
|
@ -984,45 +959,56 @@ public class HRegion implements HConstants {
|
|||
*
|
||||
* <p> This method may block for some time.
|
||||
*
|
||||
* @param startTime the time the cache was snapshotted or -1 if a flush is
|
||||
* not needed
|
||||
*
|
||||
* @return true if the cache was flushed
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws DroppedSnapshotException Thrown when replay of hlog is required
|
||||
* because a Snapshot was not properly persisted.
|
||||
*/
|
||||
private boolean internalFlushcache(long startTime) throws IOException {
|
||||
if (startTime == -1) {
|
||||
return false;
|
||||
}
|
||||
private boolean internalFlushcache() throws IOException {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
// Clear flush flag.
|
||||
this.flushRequested = false;
|
||||
|
||||
// Record latest flush time
|
||||
this.lastFlushTime = startTime;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started memcache flush for region " +
|
||||
this.regionInfo.getRegionName() + ". Current region memcache size " +
|
||||
StringUtils.humanReadableInt(this.memcacheSize.get()));
|
||||
}
|
||||
|
||||
// We pass the log to the HMemcache, so we can lock down both
|
||||
// simultaneously. We only have to do this for a moment: we need the
|
||||
// HMemcache state at the time of a known log sequence number. Since
|
||||
// multiple HRegions may write to a single HLog, the sequence numbers may
|
||||
// zoom past unless we lock it.
|
||||
//
|
||||
// When execution returns from snapshotMemcacheForLog() with a non-NULL
|
||||
// value, the HMemcache will have a snapshot object stored that must be
|
||||
// explicitly cleaned up using a call to deleteSnapshot() or by calling
|
||||
// abort.
|
||||
//
|
||||
// Stop updates while we snapshot the memcache of all stores. We only have
|
||||
// to do this for a moment. Its quick. The subsequent sequence id that
|
||||
// goes into the HLog after we've flushed all these snapshots also goes
|
||||
// into the info file that sits beside the flushed files.
|
||||
synchronized (updateLock) {
|
||||
for (HStore s: stores.values()) {
|
||||
s.snapshot();
|
||||
}
|
||||
}
|
||||
long sequenceId = log.startCacheFlush();
|
||||
|
||||
// Any failure from here on out will be catastrophic requiring server
|
||||
// restart so hlog content can be replayed and put back into the memcache.
|
||||
// Otherwise, the snapshot content while backed up in the hlog, it will not
|
||||
// be part of the current running servers state.
|
||||
|
||||
try {
|
||||
// A. Flush memcache to all the HStores.
|
||||
// Keep running vector of all store files that includes both old and the
|
||||
// just-made new flush store file.
|
||||
|
||||
for (HStore hstore: stores.values()) {
|
||||
hstore.flushCache(sequenceId);
|
||||
long flushed = hstore.flushCache(sequenceId);
|
||||
// Subtract amount flushed.
|
||||
long size = this.memcacheSize.addAndGet(-flushed);
|
||||
if (size < 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.warn("Memcache size went negative " + size + "; resetting");
|
||||
}
|
||||
this.memcacheSize.set(0);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// An exception here means that the snapshot was not persisted.
|
||||
|
@ -1051,7 +1037,7 @@ public class HRegion implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finished memcache flush for region " +
|
||||
this.regionInfo.getRegionName() + " in " +
|
||||
(System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
|
||||
(System.currentTimeMillis() - startTime) + "ms, sequence id=" +
|
||||
sequenceId);
|
||||
}
|
||||
return true;
|
||||
|
@ -1533,8 +1519,7 @@ public class HRegion implements HConstants {
|
|||
for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
|
||||
HStoreKey key = e.getKey();
|
||||
byte[] val = e.getValue();
|
||||
size = this.memcacheSize.addAndGet(key.getSize() +
|
||||
(val == null ? 0 : val.length));
|
||||
size = this.memcacheSize.addAndGet(getEntrySize(key, val));
|
||||
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
|
||||
}
|
||||
if (this.flushListener != null && !this.flushRequested &&
|
||||
|
@ -1545,6 +1530,19 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Calculate size of passed key/value pair.
|
||||
* Used here when we update region to figure what to add to this.memcacheSize
|
||||
* Also used in Store when flushing calculating size of flush. Both need to
|
||||
* use same method making size calculation.
|
||||
* @param key
|
||||
* @param value
|
||||
* @return Size of the passed key + value
|
||||
*/
|
||||
static long getEntrySize(final HStoreKey key, byte [] value) {
|
||||
return key.getSize() + (value == null ? 0 : value.length);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Support code
|
||||
|
@ -1868,7 +1866,6 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
|
||||
throw new UnsupportedOperationException("Unimplemented serverside. " +
|
||||
"next(HStoreKey, StortedMap(...) is more efficient");
|
||||
|
@ -1913,10 +1910,13 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
|
||||
/**
|
||||
* Convenience method to open a HRegion.
|
||||
* Convenience method to open a HRegion outside of an HRegionServer context.
|
||||
* @param info Info for region to be opened.
|
||||
* @param rootDir Root directory for HBase instance
|
||||
* @param log HLog for region to use
|
||||
* @param log HLog for region to use. This method will call
|
||||
* HLog#setSequenceNumber(long) passing the result of the call to
|
||||
* HRegion#getMinSequenceId() to ensure the log id is properly kept
|
||||
* up. HRegionStore does this every time it opens a new region.
|
||||
* @param conf
|
||||
* @return new HRegion
|
||||
*
|
||||
|
@ -1927,9 +1927,16 @@ public class HRegion implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Opening region: " + info);
|
||||
}
|
||||
return new HRegion(
|
||||
if (info == null) {
|
||||
throw new NullPointerException("Passed region info is null");
|
||||
}
|
||||
HRegion r = new HRegion(
|
||||
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
|
||||
log, FileSystem.get(conf), conf, info, null, null);
|
||||
if (log != null) {
|
||||
log.setSequenceNumber(r.getMinSequenceId());
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* HStore maintains a bunch of data files. It is responsible for maintaining
|
||||
|
@ -222,21 +223,16 @@ public class HStore implements HConstants {
|
|||
this.bloomFilter = loadOrCreateBloomFilter();
|
||||
}
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("starting " + storeName);
|
||||
}
|
||||
|
||||
// Go through the 'mapdir' and 'infodir' together, make sure that all
|
||||
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
|
||||
// corresponding one in 'loginfodir'. Without a corresponding log info
|
||||
// file, the entry in 'mapdir' must be deleted.
|
||||
// loadHStoreFiles also computes the max sequence id
|
||||
// loadHStoreFiles also computes the max sequence id internally.
|
||||
this.maxSeqId = -1L;
|
||||
this.storefiles.putAll(loadHStoreFiles(infodir, mapdir));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("maximum sequence id for hstore " + storeName + " is " +
|
||||
this.maxSeqId);
|
||||
LOG.debug("Loaded " + this.storefiles.size() + " file(s) in hstore " +
|
||||
this.storeName + ", max sequence id " + this.maxSeqId);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -250,9 +246,6 @@ public class HStore implements HConstants {
|
|||
" -- continuing. Probably DATA LOSS!", e);
|
||||
}
|
||||
|
||||
// Move maxSeqId on by one. Why here? And not in HRegion?
|
||||
this.maxSeqId += 1;
|
||||
|
||||
// Finally, start up all the map readers! (There could be more than one
|
||||
// since we haven't compacted yet.)
|
||||
boolean first = true;
|
||||
|
@ -365,10 +358,6 @@ public class HStore implements HConstants {
|
|||
*/
|
||||
private SortedMap<Long, HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("infodir: " + infodir.toString() + " mapdir: " +
|
||||
mapdir.toString());
|
||||
}
|
||||
// Look first at info files. If a reference, these contain info we need
|
||||
// to create the HStoreFile.
|
||||
FileStatus infofiles[] = fs.listStatus(infodir);
|
||||
|
@ -388,11 +377,6 @@ public class HStore implements HConstants {
|
|||
*/
|
||||
boolean isReference = isReference(p, m);
|
||||
long fid = Long.parseLong(m.group(1));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("loading file " + p.toString() + ", isReference=" +
|
||||
isReference + ", file id=" + fid);
|
||||
}
|
||||
|
||||
HStoreFile curfile = null;
|
||||
HStoreFile.Reference reference = null;
|
||||
|
@ -400,8 +384,7 @@ public class HStore implements HConstants {
|
|||
reference = readSplitInfo(p, fs);
|
||||
}
|
||||
curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(),
|
||||
family.getFamilyName(), fid, reference);
|
||||
|
||||
family.getFamilyName(), fid, reference);
|
||||
storeSize += curfile.length();
|
||||
long storeSeqId = -1;
|
||||
try {
|
||||
|
@ -414,7 +397,7 @@ public class HStore implements HConstants {
|
|||
// That means it was built prior to the previous run of HStore, and so
|
||||
// it cannot contain any updates also contained in the log.
|
||||
LOG.info("HSTORE_LOGINFOFILE " + curfile +
|
||||
" does not contain a sequence number - ignoring");
|
||||
" does not contain a sequence number - ignoring");
|
||||
}
|
||||
|
||||
Path mapfile = curfile.getMapFilePath();
|
||||
|
@ -428,7 +411,11 @@ public class HStore implements HConstants {
|
|||
// TODO: Confirm referent exists.
|
||||
|
||||
// Found map and sympathetic info file. Add this hstorefile to result.
|
||||
results.put(storeSeqId, curfile);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
|
||||
isReference + ", sequence id=" + storeSeqId);
|
||||
}
|
||||
results.put(Long.valueOf(storeSeqId), curfile);
|
||||
// Keep list of sympathetic data mapfiles for cleaning info dir in next
|
||||
// section. Make sure path is fully qualified for compare.
|
||||
mapfiles.add(mapfile);
|
||||
|
@ -556,7 +543,6 @@ public class HStore implements HConstants {
|
|||
lock.readLock().lock();
|
||||
try {
|
||||
this.memcache.add(key, value);
|
||||
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
@ -590,41 +576,38 @@ public class HStore implements HConstants {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Prior to doing a cache flush, we need to snapshot the memcache.
|
||||
* TODO: This method is ugly. Why let client of HStore run snapshots. How
|
||||
* do we know they'll be cleaned up?
|
||||
* Snapshot this stores memcache. Call before running
|
||||
* {@link #flushCache(long)} so it has some work to do.
|
||||
*/
|
||||
void snapshotMemcache() {
|
||||
void snapshot() {
|
||||
this.memcache.snapshot();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write out a brand-new set of items to the disk.
|
||||
*
|
||||
* We should only store key/vals that are appropriate for the data-columns
|
||||
* stored in this HStore.
|
||||
*
|
||||
* Also, we are not expecting any reads of this MapFile just yet.
|
||||
*
|
||||
* Return the entire list of HStoreFiles currently used by the HStore.
|
||||
*
|
||||
* Write out current snapshot. Presumes {@link #snapshot()} has been called
|
||||
* previously.
|
||||
* @param logCacheFlushId flush sequence number
|
||||
* @return count of bytes flushed
|
||||
* @throws IOException
|
||||
*/
|
||||
void flushCache(final long logCacheFlushId) throws IOException {
|
||||
SortedMap<HStoreKey, byte []> cache = this.memcache.snapshot();
|
||||
internalFlushCache(cache, logCacheFlushId);
|
||||
long flushCache(final long logCacheFlushId) throws IOException {
|
||||
// Get the snapshot to flush. Presumes that a call to
|
||||
// this.memcache.snapshot() has happened earlier up in the chain.
|
||||
SortedMap<HStoreKey, byte []> cache = this.memcache.getSnapshot();
|
||||
long flushed = internalFlushCache(cache, logCacheFlushId);
|
||||
// If an exception happens flushing, we let it out without clearing
|
||||
// the memcache snapshot. The old snapshot will be returned when we say
|
||||
// 'snapshot', the next time flush comes around.
|
||||
this.memcache.clearSnapshot(cache);
|
||||
return flushed;
|
||||
}
|
||||
|
||||
private void internalFlushCache(SortedMap<HStoreKey, byte []> cache,
|
||||
private long internalFlushCache(SortedMap<HStoreKey, byte []> cache,
|
||||
long logCacheFlushId) throws IOException {
|
||||
long flushed = 0;
|
||||
// Don't flush if there are no entries.
|
||||
if (cache.size() == 0) {
|
||||
return;
|
||||
return flushed;
|
||||
}
|
||||
|
||||
// TODO: We can fail in the below block before we complete adding this
|
||||
|
@ -649,7 +632,6 @@ public class HStore implements HConstants {
|
|||
// Related, looks like 'merging compactions' in BigTable paper interlaces
|
||||
// a memcache flush. We don't.
|
||||
int entries = 0;
|
||||
long cacheSize = 0;
|
||||
try {
|
||||
for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
|
||||
HStoreKey curkey = es.getKey();
|
||||
|
@ -658,7 +640,7 @@ public class HStore implements HConstants {
|
|||
if (f.equals(this.family.getFamilyName())) {
|
||||
entries++;
|
||||
out.append(curkey, new ImmutableBytesWritable(bytes));
|
||||
cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0);
|
||||
flushed += HRegion.getEntrySize(curkey, bytes);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -685,16 +667,17 @@ public class HStore implements HConstants {
|
|||
flushedFile.getReader(this.fs, this.bloomFilter));
|
||||
this.storefiles.put(flushid, flushedFile);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added " + flushedFile.toString() + " with " + entries +
|
||||
LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
|
||||
" with " + entries +
|
||||
" entries, sequence id " + logCacheFlushId + ", data size " +
|
||||
StringUtils.humanReadableInt(cacheSize) + ", file size " +
|
||||
StringUtils.humanReadableInt(newStoreSize) + " for " +
|
||||
this.storeName);
|
||||
StringUtils.humanReadableInt(flushed) + ", file size " +
|
||||
StringUtils.humanReadableInt(newStoreSize));
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
return flushed;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -727,25 +710,12 @@ public class HStore implements HConstants {
|
|||
synchronized (storefiles) {
|
||||
filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
|
||||
if (filesToCompact.size() < 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting " + this.storeName +
|
||||
" because no store files to compact.");
|
||||
}
|
||||
return checkSplit();
|
||||
} else if (filesToCompact.size() == 1) {
|
||||
if (!filesToCompact.get(0).isReference()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting " + this.storeName +
|
||||
" because only one store file and it is not a reference");
|
||||
}
|
||||
return checkSplit();
|
||||
}
|
||||
} else if (filesToCompact.size() < compactionThreshold) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting " + this.storeName +
|
||||
" because number of stores " + filesToCompact.size() +
|
||||
" < compaction threshold " + compactionThreshold);
|
||||
}
|
||||
return checkSplit();
|
||||
}
|
||||
|
||||
|
@ -756,8 +726,8 @@ public class HStore implements HConstants {
|
|||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("started compaction of " + filesToCompact.size() +
|
||||
" files using " + compactionDir.toString() + " for " +
|
||||
this.storeName);
|
||||
" files " + filesToCompact.toString() + " into " +
|
||||
compactionDir.toUri().getPath());
|
||||
}
|
||||
|
||||
// Storefiles are keyed by sequence id. The oldest file comes first.
|
||||
|
@ -822,10 +792,7 @@ public class HStore implements HConstants {
|
|||
// Add info about which file threw exception. It may not be in the
|
||||
// exception message so output a message here where we know the
|
||||
// culprit.
|
||||
LOG.warn("Failed with " + e.toString() + ": HStoreFile=" +
|
||||
hsf.toString() + (hsf.isReference()? ", Reference=" +
|
||||
hsf.getReference().toString() : "") + " for Store=" +
|
||||
this.storeName);
|
||||
LOG.warn("Failed with " + e.toString() + ": " + hsf.toString());
|
||||
closeCompactionReaders(rdrs);
|
||||
throw e;
|
||||
}
|
||||
|
@ -1029,14 +996,13 @@ public class HStore implements HConstants {
|
|||
HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
|
||||
info.getEncodedName(), family.getFamilyName(), -1, null);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("moving " + compactedFile.toString() + " in " +
|
||||
this.compactionDir.toString() + " to " +
|
||||
finalCompactedFile.toString() + " in " + basedir.toString() +
|
||||
" for " + this.storeName);
|
||||
LOG.debug("moving " +
|
||||
FSUtils.getPath(compactedFile.getMapFilePath()) +
|
||||
" to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
|
||||
}
|
||||
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
|
||||
LOG.error("Failed move of compacted file " +
|
||||
finalCompactedFile.toString() + " for " + this.storeName);
|
||||
finalCompactedFile.getMapFilePath().toString());
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -449,7 +449,7 @@ public class HStoreFile implements HConstants {
|
|||
@Override
|
||||
public String toString() {
|
||||
return encodedRegionName + "/" + colFamily + "/" + fileId +
|
||||
(isReference()? "/" + reference.toString(): "");
|
||||
(isReference()? "-" + reference.toString(): "");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,7 +23,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.rmi.UnexpectedException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -77,28 +76,28 @@ class Memcache {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a snapshot of the current Memcache or returns existing snapshot.
|
||||
* Must be followed by a call to {@link #clearSnapshot(SortedMap)}
|
||||
* @return Snapshot. Never null. May have no entries.
|
||||
* Creates a snapshot of the current Memcache.
|
||||
* Snapshot must be cleared by call to {@link #clearSnapshot(SortedMap)}
|
||||
* To get the snapshot made by this method, use
|
||||
* {@link #getSnapshot}.
|
||||
*/
|
||||
SortedMap<HStoreKey, byte[]> snapshot() {
|
||||
void snapshot() {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
// If snapshot has entries, then flusher failed or didn't call cleanup.
|
||||
// If snapshot currently has entries, then flusher failed or didn't call
|
||||
// cleanup. Log a warning.
|
||||
if (this.snapshot.size() > 0) {
|
||||
LOG.debug("Returning existing snapshot. Either the snapshot was run " +
|
||||
"by the region -- normal operation but to be fixed -- or there is " +
|
||||
"another ongoing flush or did we fail last attempt?");
|
||||
return this.snapshot;
|
||||
LOG.debug("Snapshot called again without clearing previous. " +
|
||||
"Doing nothing. Another ongoing flush or did we fail last attempt?");
|
||||
} else {
|
||||
// We used to synchronize on the memcache here but we're inside a
|
||||
// write lock so removed it. Comment is left in case removal was a
|
||||
// mistake. St.Ack
|
||||
if (this.memcache.size() != 0) {
|
||||
this.snapshot = this.memcache;
|
||||
this.memcache = createSynchronizedSortedMap();
|
||||
}
|
||||
}
|
||||
// We used to synchronize on the memcache here but we're inside a
|
||||
// write lock so removed it. Comment is left in case removal was a
|
||||
// mistake. St.Ack
|
||||
if (this.memcache.size() != 0) {
|
||||
this.snapshot = this.memcache;
|
||||
this.memcache = createSynchronizedSortedMap();
|
||||
}
|
||||
return this.snapshot;
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
|
@ -106,6 +105,8 @@ class Memcache {
|
|||
|
||||
/**
|
||||
* Return the current snapshot.
|
||||
* Called by flusher to get current snapshot made by a previous
|
||||
* call to {@link snapshot}.
|
||||
* @return Return snapshot.
|
||||
* @see {@link #snapshot()}
|
||||
* @see {@link #clearSnapshot(SortedMap)}
|
||||
|
|
|
@ -46,7 +46,9 @@ public class FSUtils {
|
|||
/**
|
||||
* Not instantiable
|
||||
*/
|
||||
private FSUtils() {}
|
||||
private FSUtils() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see if the specified file system is available
|
||||
|
@ -161,4 +163,18 @@ public class FSUtils {
|
|||
throw io;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the 'path' component of a Path. In Hadoop, Path is an URI. This
|
||||
* method returns the 'path' component of a Path's URI: e.g. If a Path is
|
||||
* <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
|
||||
* this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
|
||||
* This method is useful if you want to print out a Path without qualifying
|
||||
* Filesystem instance.
|
||||
* @param p Filesystem Path whose 'path' component we are to return.
|
||||
* @return Path portion of the Filesystem
|
||||
*/
|
||||
public static String getPath(Path p) {
|
||||
return p.toUri().getPath();
|
||||
}
|
||||
}
|
|
@ -107,7 +107,6 @@ public class Merge extends Configured implements Tool {
|
|||
mergeTwoRegions();
|
||||
}
|
||||
return 0;
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("Merge failed", e);
|
||||
utils.scanMetaRegion(HRegionInfo.firstMetaRegionInfo,
|
||||
|
@ -190,10 +189,12 @@ public class Merge extends Configured implements Tool {
|
|||
* Merges two regions from a user table.
|
||||
*/
|
||||
private void mergeTwoRegions() throws IOException {
|
||||
LOG.info("Merging regions " + this.region1.toString() + " and " +
|
||||
this.region2.toString() + " in table " + this.tableName.toString());
|
||||
// Scan the root region for all the meta regions that contain the regions
|
||||
// we're merging.
|
||||
MetaScannerListener listener = new MetaScannerListener(region1, region2);
|
||||
utils.scanRootRegion(listener);
|
||||
this.utils.scanRootRegion(listener);
|
||||
HRegionInfo meta1 = listener.getMeta1();
|
||||
if (meta1 == null) {
|
||||
throw new IOException("Could not find meta region for " + region1);
|
||||
|
@ -202,11 +203,15 @@ public class Merge extends Configured implements Tool {
|
|||
if (meta2 == null) {
|
||||
throw new IOException("Could not find meta region for " + region2);
|
||||
}
|
||||
|
||||
HRegion metaRegion1 = utils.getMetaRegion(meta1);
|
||||
LOG.info("Found meta for region1 " + meta1.getRegionName() +
|
||||
", meta for region2 " + meta2.getRegionName());
|
||||
HRegion metaRegion1 = this.utils.getMetaRegion(meta1);
|
||||
HRegionInfo info1 = Writables.getHRegionInfo(
|
||||
metaRegion1.get(region1, HConstants.COL_REGIONINFO));
|
||||
|
||||
metaRegion1.get(region1, HConstants.COL_REGIONINFO));
|
||||
if (info1== null) {
|
||||
throw new NullPointerException("info1 is null using key " + region1 +
|
||||
" in " + meta1);
|
||||
}
|
||||
|
||||
HRegion metaRegion2 = null;
|
||||
if (meta1.getRegionName().equals(meta2.getRegionName())) {
|
||||
|
@ -215,8 +220,10 @@ public class Merge extends Configured implements Tool {
|
|||
metaRegion2 = utils.getMetaRegion(meta2);
|
||||
}
|
||||
HRegionInfo info2 = Writables.getHRegionInfo(
|
||||
metaRegion2.get(region2, HConstants.COL_REGIONINFO));
|
||||
|
||||
metaRegion2.get(region2, HConstants.COL_REGIONINFO));
|
||||
if (info2 == null) {
|
||||
throw new NullPointerException("info2 is null using key " + meta2);
|
||||
}
|
||||
HRegion merged = merge(info1, metaRegion1, info2, metaRegion2);
|
||||
|
||||
// Now find the meta region which will contain the newly merged region
|
||||
|
@ -250,7 +257,8 @@ public class Merge extends Configured implements Tool {
|
|||
* Returns HRegion object for newly merged region
|
||||
*/
|
||||
private HRegion merge(HRegionInfo info1, HRegion meta1, HRegionInfo info2,
|
||||
HRegion meta2) throws IOException {
|
||||
HRegion meta2)
|
||||
throws IOException {
|
||||
if (info1 == null) {
|
||||
throw new IOException("Could not find " + region1 + " in " +
|
||||
meta1.getRegionName());
|
||||
|
@ -261,21 +269,19 @@ public class Merge extends Configured implements Tool {
|
|||
}
|
||||
HRegion merged = null;
|
||||
HLog log = utils.getLog();
|
||||
HRegion region1 =
|
||||
HRegion.openHRegion(info1, this.rootdir, log, this.conf);
|
||||
HRegion r1 = HRegion.openHRegion(info1, this.rootdir, log, this.conf);
|
||||
try {
|
||||
HRegion region2 =
|
||||
HRegion.openHRegion(info2, this.rootdir, log, this.conf);
|
||||
HRegion r2 = HRegion.openHRegion(info2, this.rootdir, log, this.conf);
|
||||
try {
|
||||
merged = HRegion.merge(region1, region2);
|
||||
merged = HRegion.merge(r1, r2);
|
||||
} finally {
|
||||
if (!region2.isClosed()) {
|
||||
region2.close();
|
||||
if (!r2.isClosed()) {
|
||||
r2.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (!region1.isClosed()) {
|
||||
region1.close();
|
||||
if (!r1.isClosed()) {
|
||||
r1.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,6 +336,7 @@ public class Merge extends Configured implements Tool {
|
|||
region1 = new Text(remainingArgs[1]);
|
||||
region2 = new Text(remainingArgs[2]);
|
||||
int status = 0;
|
||||
// Why we duplicate code here? St.Ack
|
||||
if (WritableComparator.compareBytes(
|
||||
tableName.getBytes(), 0, tableName.getLength(),
|
||||
region1.getBytes(), 0, tableName.getLength()) != 0) {
|
||||
|
@ -369,5 +376,4 @@ public class Merge extends Configured implements Tool {
|
|||
}
|
||||
System.exit(status);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,28 +22,25 @@ package org.apache.hadoop.hbase.util;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.regionserver.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
* Contains utility methods for manipulating HBase meta tables
|
||||
|
@ -148,7 +145,7 @@ public class MetaUtils {
|
|||
HRegion meta = metaRegions.get(metaInfo.getRegionName());
|
||||
if (meta == null) {
|
||||
meta = openMetaRegion(metaInfo);
|
||||
metaRegions.put(metaInfo.getRegionName(), meta);
|
||||
this.metaRegions.put(metaInfo.getRegionName(), meta);
|
||||
}
|
||||
return meta;
|
||||
}
|
||||
|
@ -246,6 +243,9 @@ public class MetaUtils {
|
|||
/**
|
||||
* Scans a meta region. For every region found, calls the listener with
|
||||
* the HRegionInfo of the region.
|
||||
* TODO: Use Visitor rather than Listener pattern. Allow multiple Visitors.
|
||||
* Use this everywhere we scan meta regions: e.g. in metascanners, in close
|
||||
* handling, etc. Have it pass in the whole row, not just HRegionInfo.
|
||||
*
|
||||
* @param metaRegionInfo HRegionInfo for meta region
|
||||
* @param listener method to be called for each meta region found
|
||||
|
@ -330,4 +330,4 @@ public class MetaUtils {
|
|||
b.delete(HConstants.COL_STARTCODE);
|
||||
t.commit(b);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -118,7 +118,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
long logSeqId = log.startCacheFlush();
|
||||
log.completeCacheFlush(regionName, tableName, logSeqId);
|
||||
log.close();
|
||||
Path filename = log.computeFilename(log.filenum);
|
||||
Path filename = log.computeFilename(log.getFilenum());
|
||||
log = null;
|
||||
// Now open a reader on the log and assert append worked.
|
||||
reader = new SequenceFile.Reader(fs, filename, conf);
|
||||
|
|
|
@ -133,7 +133,8 @@ public class TestHMemcache extends TestCase {
|
|||
private void runSnapshot(final Memcache hmc) throws UnexpectedException {
|
||||
// Save off old state.
|
||||
int oldHistorySize = hmc.getSnapshot().size();
|
||||
SortedMap<HStoreKey, byte[]> ss = hmc.snapshot();
|
||||
hmc.snapshot();
|
||||
SortedMap<HStoreKey, byte[]> ss = hmc.getSnapshot();
|
||||
// Make some assertions about what just happened.
|
||||
assertTrue("History size has not increased", oldHistorySize < ss.size());
|
||||
hmc.clearSnapshot(ss);
|
||||
|
|
|
@ -20,25 +20,25 @@
|
|||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.StaticTestEnvironment;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.regionserver.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/** Test stand alone merge tool that can merge arbitrary regions */
|
||||
public class TestMergeTool extends HBaseTestCase {
|
||||
|
@ -163,8 +163,53 @@ public class TestMergeTool extends HBaseTestCase {
|
|||
super.tearDown();
|
||||
StaticTestEnvironment.shutdownDfs(dfsCluster);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param msg Message that describes this merge
|
||||
* @param regionName1
|
||||
* @param regionName2
|
||||
* @param log Log to use merging.
|
||||
* @param upperbound Verifying, how high up in this.rows to go.
|
||||
* @return Merged region.
|
||||
* @throws Exception
|
||||
*/
|
||||
private HRegion mergeAndVerify(final String msg, final String regionName1,
|
||||
final String regionName2, final HLog log, final int upperbound)
|
||||
throws Exception {
|
||||
Merge merger = new Merge(this.conf);
|
||||
LOG.info(msg);
|
||||
int errCode = ToolRunner.run(merger,
|
||||
new String[] {this.desc.getName().toString(), regionName1, regionName2}
|
||||
);
|
||||
assertTrue("'" + msg + "' failed", errCode == 0);
|
||||
HRegionInfo mergedInfo = merger.getMergedHRegionInfo();
|
||||
|
||||
// Now verify that we can read all the rows from regions 0, 1
|
||||
// in the new merged region.
|
||||
HRegion merged =
|
||||
HRegion.openHRegion(mergedInfo, this.rootdir, log, this.conf);
|
||||
verifyMerge(merged, upperbound);
|
||||
merged.close();
|
||||
LOG.info("Verified " + msg);
|
||||
return merged;
|
||||
}
|
||||
|
||||
private void verifyMerge(final HRegion merged, final int upperbound)
|
||||
throws IOException {
|
||||
for (int i = 0; i < upperbound; i++) {
|
||||
for (int j = 0; j < rows[i].length; j++) {
|
||||
byte[] bytes = merged.get(rows[i][j], COLUMN_NAME).getValue();
|
||||
assertNotNull(rows[i][j].toString(), bytes);
|
||||
Text value = new Text(bytes);
|
||||
assertTrue(value.equals(rows[i][j]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @throws Exception */
|
||||
/**
|
||||
* Test merge tool.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testMergeTool() throws Exception {
|
||||
// First verify we can read the rows from the source regions and that they
|
||||
// contain the right data.
|
||||
|
@ -181,134 +226,32 @@ public class TestMergeTool extends HBaseTestCase {
|
|||
}
|
||||
|
||||
// Create a log that we can reuse when we need to open regions
|
||||
|
||||
HLog log = new HLog(this.fs,
|
||||
new Path("/tmp", HConstants.HREGION_LOGDIR_NAME + "_" +
|
||||
System.currentTimeMillis()
|
||||
),
|
||||
this.conf, null
|
||||
);
|
||||
Path logPath = new Path("/tmp", HConstants.HREGION_LOGDIR_NAME + "_" +
|
||||
System.currentTimeMillis());
|
||||
LOG.info("Creating log " + logPath.toString());
|
||||
HLog log = new HLog(this.fs, logPath, this.conf, null);
|
||||
try {
|
||||
/*
|
||||
* Merge Region 0 and Region 1
|
||||
*/
|
||||
LOG.info("merging regions 0 and 1");
|
||||
Merge merger = new Merge(this.conf);
|
||||
ToolRunner.run(merger,
|
||||
new String[] {
|
||||
this.desc.getName().toString(),
|
||||
this.sourceRegions[0].getRegionName().toString(),
|
||||
this.sourceRegions[1].getRegionName().toString()
|
||||
}
|
||||
);
|
||||
HRegionInfo mergedInfo = merger.getMergedHRegionInfo();
|
||||
|
||||
// Now verify that we can read all the rows from regions 0, 1
|
||||
// in the new merged region.
|
||||
HRegion merged =
|
||||
HRegion.openHRegion(mergedInfo, this.rootdir, log, this.conf);
|
||||
|
||||
for (int i = 0; i < 2 ; i++) {
|
||||
for (int j = 0; j < rows[i].length; j++) {
|
||||
byte[] bytes = merged.get(rows[i][j], COLUMN_NAME).getValue();
|
||||
assertNotNull(rows[i][j].toString(), bytes);
|
||||
Text value = new Text(bytes);
|
||||
assertTrue(value.equals(rows[i][j]));
|
||||
}
|
||||
}
|
||||
merged.close();
|
||||
LOG.info("verified merge of regions 0 and 1");
|
||||
/*
|
||||
* Merge the result of merging regions 0 and 1 with region 2
|
||||
*/
|
||||
LOG.info("merging regions 0+1 and 2");
|
||||
merger = new Merge(this.conf);
|
||||
ToolRunner.run(merger,
|
||||
new String[] {
|
||||
this.desc.getName().toString(),
|
||||
mergedInfo.getRegionName().toString(),
|
||||
this.sourceRegions[2].getRegionName().toString()
|
||||
}
|
||||
);
|
||||
mergedInfo = merger.getMergedHRegionInfo();
|
||||
// Merge Region 0 and Region 1
|
||||
HRegion merged = mergeAndVerify("merging regions 0 and 1",
|
||||
this.sourceRegions[0].getRegionName().toString(),
|
||||
this.sourceRegions[1].getRegionName().toString(), log, 2);
|
||||
|
||||
// Now verify that we can read all the rows from regions 0, 1 and 2
|
||||
// in the new merged region.
|
||||
|
||||
merged = HRegion.openHRegion(mergedInfo, this.rootdir, log, this.conf);
|
||||
// Merge the result of merging regions 0 and 1 with region 2
|
||||
merged = mergeAndVerify("merging regions 0+1 and 2",
|
||||
merged.getRegionInfo().getRegionName().toString(),
|
||||
this.sourceRegions[2].getRegionName().toString(), log, 3);
|
||||
|
||||
for (int i = 0; i < 3 ; i++) {
|
||||
for (int j = 0; j < rows[i].length; j++) {
|
||||
Cell cell = merged.get(rows[i][j], COLUMN_NAME);
|
||||
assertNotNull(cell);
|
||||
byte[] bytes = cell.getValue();
|
||||
assertNotNull(bytes);
|
||||
Text value = new Text(bytes);
|
||||
assertTrue(value.equals(rows[i][j]));
|
||||
}
|
||||
}
|
||||
merged.close();
|
||||
LOG.info("verified merge of regions 0+1 and 2");
|
||||
/*
|
||||
* Merge the result of merging regions 0, 1 and 2 with region 3
|
||||
*/
|
||||
LOG.info("merging regions 0+1+2 and 3");
|
||||
merger = new Merge(this.conf);
|
||||
ToolRunner.run(merger,
|
||||
new String[] {
|
||||
this.desc.getName().toString(),
|
||||
mergedInfo.getRegionName().toString(),
|
||||
this.sourceRegions[3].getRegionName().toString()
|
||||
}
|
||||
);
|
||||
mergedInfo = merger.getMergedHRegionInfo();
|
||||
|
||||
// Now verify that we can read all the rows from regions 0, 1, 2 and 3
|
||||
// in the new merged region.
|
||||
|
||||
merged = HRegion.openHRegion(mergedInfo, this.rootdir, log, this.conf);
|
||||
|
||||
for (int i = 0; i < 4 ; i++) {
|
||||
for (int j = 0; j < rows[i].length; j++) {
|
||||
byte[] bytes = merged.get(rows[i][j], COLUMN_NAME).getValue();
|
||||
assertNotNull(bytes);
|
||||
Text value = new Text(bytes);
|
||||
assertTrue(value.equals(rows[i][j]));
|
||||
}
|
||||
}
|
||||
merged.close();
|
||||
LOG.info("verified merge of regions 0+1+2 and 3");
|
||||
/*
|
||||
* Merge the result of merging regions 0, 1, 2 and 3 with region 4
|
||||
*/
|
||||
LOG.info("merging regions 0+1+2+3 and 4");
|
||||
merger = new Merge(this.conf);
|
||||
ToolRunner.run(merger,
|
||||
new String[] {
|
||||
this.desc.getName().toString(),
|
||||
mergedInfo.getRegionName().toString(),
|
||||
this.sourceRegions[4].getRegionName().toString()
|
||||
}
|
||||
);
|
||||
mergedInfo = merger.getMergedHRegionInfo();
|
||||
|
||||
// Now verify that we can read all the rows from the new merged region.
|
||||
|
||||
merged = HRegion.openHRegion(mergedInfo, this.rootdir, log, this.conf);
|
||||
|
||||
for (int i = 0; i < rows.length ; i++) {
|
||||
for (int j = 0; j < rows[i].length; j++) {
|
||||
byte[] bytes = merged.get(rows[i][j], COLUMN_NAME).getValue();
|
||||
assertNotNull(bytes);
|
||||
Text value = new Text(bytes);
|
||||
assertTrue(value.equals(rows[i][j]));
|
||||
}
|
||||
}
|
||||
merged.close();
|
||||
LOG.info("verified merge of regions 0+1+2+3 and 4");
|
||||
// Merge the result of merging regions 0, 1 and 2 with region 3
|
||||
merged = mergeAndVerify("merging regions 0+1+2 and 3",
|
||||
merged.getRegionInfo().getRegionName().toString(),
|
||||
this.sourceRegions[3].getRegionName().toString(), log, 4);
|
||||
|
||||
// Merge the result of merging regions 0, 1, 2 and 3 with region 4
|
||||
merged = mergeAndVerify("merging regions 0+1+2+3 and 4",
|
||||
merged.getRegionInfo().getRegionName().toString(),
|
||||
this.sourceRegions[4].getRegionName().toString(), log, rows.length);
|
||||
} finally {
|
||||
log.closeAndDelete();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue