HADOOP-1646 RegionServer OOME's under sustained, substantial loading by

10 concurrent clients


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@559993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-07-26 21:30:55 +00:00
parent 5f850fcee4
commit 5b1bd1f8f2
11 changed files with 556 additions and 506 deletions

View File

@ -75,3 +75,5 @@ Trunk (unreleased changes)
TestScanner2 (Izaak Rubin via Stack)
48. HADOOP-1516 HClient fails to readjust when ROOT or META redeployed on new
region server
49. HADOOP-1646 RegionServer OOME's under sustained, substantial loading by
10 concurrent clients

View File

@ -102,7 +102,7 @@
</property>
<property>
<name>hbase.regionserver.msginterval</name>
<value>15000</value>
<value>10000</value>
<description>Interval between messages from the RegionServer to HMaster
in milliseconds. Default is 15. Set this value low if you want unit
tests to be responsive.
@ -111,24 +111,60 @@
<property>
<name>hbase.regionserver.maxlogentries</name>
<value>30000</value>
<description>Rotate the logs when count of entries exceeds this value.
Default: 30,000
<description>Rotate the HRegion HLogs when count of entries exceeds this
value. Default: 30,000. Value is checked by a thread that runs every
hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.hregion.maxunflushed</name>
<value>10000</value>
<name>hbase.hregion.memcache.flush.size</name>
<value>16777216</value>
<description>
Memcache will be flushed to disk if number of Memcache writes
are in excess of this number.
A HRegion memcache will be flushed to disk if size of the memcache
exceeds this number of bytes. Value is checked by a thread that runs
every hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.hregion.memcache.block.multiplier</name>
<value>2</value>
<description>
Block updates if memcache has hbase.hregion.block.memcache
time hbase.hregion.flush.size bytes. Useful preventing
runaway memcache during spikes in update traffic. Without an
upper-bound, memcache fills such that when it flushes the
resultant flush files take a long time to compact or split, or
worse, we OOME.
</description>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>134217728</value>
<value>67108864</value>
<description>
Maximum desired file size for an HRegion. If filesize exceeds
value + (value / 2), the HRegion is split in two. Default: 128M.
value + (value / 2), the HRegion is split in two. Default: 64M.
If too large, splits will take so long, clients timeout.
</description>
</property>
<property>
<name>hbase.hregion.compactionThreshold</name>
<value>3</value>
<description>
If more than this number of HStoreFiles in any one HStore
(one HStoreFile is written per flush of memcache) then a compaction
is run to rewrite all HStoreFiles files as one. Larger numbers
put off compaction but when it runs, it takes longer to complete.
During a compaction, updates cannot be flushed to disk. Long
compactions require memory sufficient to carry the logging of
all updates across the duration of the compaction.
If too large, clients timeout during compaction.
</description>
</property>
<property>
<name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
<value>15000</value>
<description>How often a region server runs the split/compaction check.
</description>
</property>

View File

@ -81,7 +81,7 @@ public interface HConstants {
static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";
/** Default maximum file size */
static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB
static final long DEFAULT_MAX_FILE_SIZE = 64 * 1024 * 1024; // 64MB
// Always store the location of the root table's HRegion.
// This HRegion is never split.

View File

@ -49,7 +49,7 @@ public class HLocking {
}
/**
* Caller needs the nonexclusive read-lock
* Caller needs the no-nexclusive read-lock
*/
public void obtainReadLock() {
synchronized(mutex) {
@ -57,6 +57,7 @@ public class HLocking {
try {
mutex.wait();
} catch(InterruptedException ie) {
// continue
}
}
lockers.incrementAndGet();
@ -65,7 +66,7 @@ public class HLocking {
}
/**
* Caller is finished with the nonexclusive read-lock
* Caller is finished with the non-exclusive read-lock
*/
public void releaseReadLock() {
synchronized(mutex) {
@ -85,6 +86,7 @@ public class HLocking {
try {
mutex.wait();
} catch (InterruptedException ie) {
// continue
}
}
mutex.notifyAll();

View File

@ -27,9 +27,8 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
@ -38,30 +37,36 @@ import org.apache.hadoop.io.Text;
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
*/
public class HMemcache {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
TreeMap<HStoreKey, byte []> memcache =
new TreeMap<HStoreKey, byte []>();
Vector<TreeMap<HStoreKey, byte []>> history
final Vector<TreeMap<HStoreKey, byte []>> history
= new Vector<TreeMap<HStoreKey, byte []>>();
TreeMap<HStoreKey, byte []> snapshot = null;
final HLocking lock = new HLocking();
/*
* Approximate size in bytes of the payload carried by this memcache.
*/
private AtomicLong size = new AtomicLong(0);
/** constructor */
/**
* Constructor
*/
public HMemcache() {
super();
}
/** represents the state of the memcache at a specified point in time */
public static class Snapshot {
TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
long sequenceId = 0;
static class Snapshot {
final TreeMap<HStoreKey, byte []> memcacheSnapshot;
final long sequenceId;
Snapshot() {
Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) {
super();
this.memcacheSnapshot = memcache;
this.sequenceId = i;
}
}
@ -79,36 +84,22 @@ public class HMemcache {
* @return frozen HMemcache TreeMap and HLog sequence number.
*/
Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
this.lock.obtainWriteLock();
try {
if(snapshot != null) {
throw new IOException("Snapshot in progress!");
}
// If no entries in memcache.
if(memcache.size() == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("memcache empty. Skipping snapshot");
}
return retval;
return null;
}
if(LOG.isDebugEnabled()) {
LOG.debug("starting memcache snapshot");
}
retval.memcacheSnapshot = memcache;
Snapshot retval = new Snapshot(memcache, log.startCacheFlush());
this.snapshot = memcache;
history.add(memcache);
memcache = new TreeMap<HStoreKey, byte []>();
retval.sequenceId = log.startCacheFlush();
if(LOG.isDebugEnabled()) {
LOG.debug("memcache snapshot complete");
}
// Reset size of this memcache.
this.size.set(0);
return retval;
} finally {
this.lock.releaseWriteLock();
}
@ -127,12 +118,8 @@ public class HMemcache {
if(snapshot == null) {
throw new IOException("Snapshot not present!");
}
if(LOG.isDebugEnabled()) {
LOG.debug("deleting snapshot");
}
for(Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator();
it.hasNext(); ) {
for (Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator();
it.hasNext();) {
TreeMap<HStoreKey, byte []> cur = it.next();
if (snapshot == cur) {
it.remove();
@ -140,9 +127,6 @@ public class HMemcache {
}
}
this.snapshot = null;
if(LOG.isDebugEnabled()) {
LOG.debug("snapshot deleted");
}
} finally {
this.lock.releaseWriteLock();
}
@ -161,18 +145,28 @@ public class HMemcache {
try {
for (Map.Entry<Text, byte []> es: columns.entrySet()) {
HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
memcache.put(key, es.getValue());
byte [] value = es.getValue();
this.size.addAndGet(key.getSize());
this.size.addAndGet(((value == null)? 0: value.length));
memcache.put(key, value);
}
} finally {
this.lock.releaseWriteLock();
}
}
/**
* @return Approximate size in bytes of payload carried by this memcache.
*/
public long getSize() {
return this.size.get();
}
/**
* Look back through all the backlog TreeMaps to find the target.
* @param key
* @param numVersions
* @return An array of byte arrays orderded by timestamp.
* @return An array of byte arrays ordered by timestamp.
*/
public byte [][] get(final HStoreKey key, final int numVersions) {
List<byte []> results = new ArrayList<byte[]>();
@ -348,7 +342,7 @@ public class HMemcache {
}
/**
* Get the next value from the specified iterater.
* Get the next value from the specified iterator.
*
* @param i Which iterator to fetch next value from
* @return true if there is more data available

View File

@ -24,12 +24,12 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@ -75,10 +76,10 @@ public class HRegion implements HConstants {
static String SPLITDIR = "splits";
static String MERGEDIR = "merges";
static String TMPREGION_PREFIX = "tmpregion_";
static int MIN_COMMITS_FOR_COMPACTION = 10;
static Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false);
private long noFlushCount = 0;
/**
* Deletes all the files for a HRegion
@ -90,7 +91,7 @@ public class HRegion implements HConstants {
*/
static void deleteRegion(FileSystem fs, Path baseDirectory,
Text regionName) throws IOException {
LOG.debug("Deleting region " + regionName);
LOG.info("Deleting region " + regionName);
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
}
@ -263,7 +264,7 @@ public class HRegion implements HConstants {
Map<Long, TreeMap<Text, byte []>> targetColumns
= new HashMap<Long, TreeMap<Text, byte []>>();
HMemcache memcache;
final HMemcache memcache;
Path rootDir;
HLog log;
@ -275,19 +276,16 @@ public class HRegion implements HConstants {
static class WriteState {
volatile boolean writesOngoing;
volatile boolean writesEnabled;
volatile boolean closed;
WriteState() {
this.writesOngoing = true;
this.writesEnabled = true;
this.closed = false;
}
}
volatile WriteState writestate = new WriteState();
int recentCommits = 0;
volatile int commitsSinceFlush = 0;
int maxUnflushedEntries = 0;
final int memcacheFlushSize;
final int blockingMemcacheSize;
int compactionThreshold = 0;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
@ -330,7 +328,6 @@ public class HRegion implements HConstants {
this.writestate.writesOngoing = true;
this.writestate.writesEnabled = true;
this.writestate.closed = false;
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@ -349,7 +346,7 @@ public class HRegion implements HConstants {
this.regionInfo.tableDesc.families().entrySet()) {
Text colFamily = HStoreKey.extractFamily(e.getKey());
stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
e.getValue(), fs, oldLogFile, conf));
e.getValue(), fs, oldLogFile, conf));
}
// Get rid of any splits or merges that were lost in-progress
@ -362,14 +359,16 @@ public class HRegion implements HConstants {
fs.delete(merges);
}
// By default, we flush the cache after 10,000 commits
// By default, we flush the cache when 32M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*16);
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
this.maxUnflushedEntries = conf.getInt("hbase.hregion.maxunflushed", 10000);
// By default, we compact the region if an HStore has more than 10 map files
this.compactionThreshold =
conf.getInt("hbase.hregion.compactionThreshold", 10);
// By default, we compact the region if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold",
3);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
@ -387,22 +386,20 @@ public class HRegion implements HConstants {
/** returns true if region is closed */
boolean isClosed() {
boolean closed = false;
synchronized(writestate) {
closed = writestate.closed;
}
return closed;
return this.closed.get();
}
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
*
* This method could take some time to execute, so don't call it from a
* <p>This method could take some time to execute, so don't call it from a
* time-sensitive thread.
*
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of HStoreFile objects.
* HStores make use of. It's a list of HStoreFile objects. Returns empty
* vector if already closed and null if it is judged that it should not
* close.
*
* @throws IOException
*/
@ -424,14 +421,14 @@ public class HRegion implements HConstants {
* @throws IOException
*/
Vector<HStoreFile> close(boolean abort) throws IOException {
if (isClosed()) {
LOG.info("region " + this.regionInfo.regionName + " already closed");
return new Vector<HStoreFile>();
}
lock.obtainWriteLock();
try {
boolean shouldClose = false;
synchronized(writestate) {
if(writestate.closed) {
LOG.info("region " + this.regionInfo.regionName + " closed");
return new Vector<HStoreFile>();
}
while(writestate.writesOngoing) {
try {
writestate.wait();
@ -443,10 +440,16 @@ public class HRegion implements HConstants {
shouldClose = true;
}
if(! shouldClose) {
if(!shouldClose) {
return null;
}
LOG.info("closing region " + this.regionInfo.regionName);
// Write lock means no more row locks can be given out. Wait on
// outstanding row locks to come in before we close so we do not drop
// outstanding updates.
waitOnRowLocks();
Vector<HStoreFile> allHStoreFiles = null;
if (!abort) {
// Don't flush the cache if we are aborting during a test.
@ -459,9 +462,9 @@ public class HRegion implements HConstants {
return allHStoreFiles;
} finally {
synchronized (writestate) {
writestate.closed = true;
writestate.writesOngoing = false;
}
this.closed.set(true);
LOG.info("region " + this.regionInfo.regionName + " closed");
}
} finally {
@ -476,8 +479,7 @@ public class HRegion implements HConstants {
* Returns two brand-new (and open) HRegions
*/
HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
throws IOException {
throws IOException {
if(((regionInfo.startKey.getLength() != 0)
&& (regionInfo.startKey.compareTo(midKey) > 0))
|| ((regionInfo.endKey.getLength() != 0)
@ -486,8 +488,7 @@ public class HRegion implements HConstants {
"boundaries.");
}
LOG.info("Splitting region " + this.regionInfo.regionName);
long startTime = System.currentTimeMillis();
Path splits = new Path(regiondir, SPLITDIR);
if(! fs.exists(splits)) {
fs.mkdirs(splits);
@ -495,47 +496,44 @@ public class HRegion implements HConstants {
long regionAId = Math.abs(rand.nextLong());
HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc,
regionInfo.startKey, midKey);
regionInfo.startKey, midKey);
long regionBId = Math.abs(rand.nextLong());
HRegionInfo regionBInfo
= new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
HRegionInfo regionBInfo =
new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
if(fs.exists(dirA) || fs.exists(dirB)) {
throw new IOException("Cannot split; target file collision at " + dirA
+ " or " + dirB);
throw new IOException("Cannot split; target file collision at " + dirA +
" or " + dirB);
}
// We just copied most of the data. Now get whatever updates are up in
// the memcache (after shutting down new updates).
TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
// Notify the caller that we are about to close the region. This moves
// us ot the 'retiring' queue. Means no more updates coming in -- just
// whatever is outstanding.
listener.closing(this.getRegionName());
// Wait on the last row updates to come in.
LOG.debug("Starting wait on row locks.");
waitOnRowLocks();
// Flush this HRegion out to storage, and turn off flushes
// or compactions until close() is called.
// TODO: flushcache can come back null if it can't do the flush. FIX.
LOG.debug("Calling flushcache inside closeAndSplit");
Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
for(HStoreFile hsf: hstoreFilesToSplit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting HStore " + hsf.getRegionName() + "/" +
hsf.getColFamily() + "/" + hsf.fileId());
}
HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
alreadySplit.add(hsf);
if (hstoreFilesToSplit == null) {
// It should always return a list of hstore files even if memcache is
// empty. It will return null if concurrent compaction or splits which
// should not happen.
throw new NullPointerException("Flushcache did not return any files");
}
TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
for(HStoreFile hsf: hstoreFilesToSplit) {
alreadySplit.add(splitStoreFile(hsf, splits, regionAInfo,
regionBInfo, midKey));
}
// We just copied most of the data.
// Notify the caller that we are about to close the region
listener.closing(this.getRegionName());
// Wait on the last row updates to come in.
waitOnRowLocks();
// Now close the HRegion
hstoreFilesToSplit = close();
@ -546,43 +544,46 @@ public class HRegion implements HConstants {
// Copy the small remainder
for(HStoreFile hsf: hstoreFilesToSplit) {
if(! alreadySplit.contains(hsf)) {
if(LOG.isDebugEnabled()) {
LOG.debug("Splitting HStore " + hsf.getRegionName() + "/"
+ hsf.getColFamily() + "/" + hsf.fileId());
}
HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
if(!alreadySplit.contains(hsf)) {
splitStoreFile(hsf, splits, regionAInfo, regionBInfo, midKey);
}
}
// Done
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
// Cleanup
fs.delete(splits); // Get rid of splits directory
fs.delete(regiondir); // and the directory for the old region
fs.delete(splits); // Get rid of splits directory
fs.delete(regiondir); // and the directory for the old region
HRegion regions[] = new HRegion[2];
regions[0] = regionA;
regions[1] = regionB;
LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
"New regions are: " + regions[0].getRegionName() + ", " +
regions[1].getRegionName());
regions[1].getRegionName() + ". Took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
return regions;
}
private HStoreFile splitStoreFile(final HStoreFile hsf, final Path splits,
final HRegionInfo a, final HRegionInfo b, final Text midKey)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Started splitting HStore " + hsf.getRegionName() + "/" +
hsf.getColFamily() + "/" + hsf.fileId());
}
HStoreFile dstA = new HStoreFile(conf, splits, a.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, b.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Finished splitting HStore " + hsf.getRegionName() + "/" +
hsf.getColFamily() + "/" + hsf.fileId());
}
return hsf;
}
//////////////////////////////////////////////////////////////////////////////
// HRegion accessors
@ -646,10 +647,10 @@ public class HRegion implements HConstants {
//////////////////////////////////////////////////////////////////////////////
/**
* Iterates through all the HStores and finds the one with the largest MapFile
* size. If the size is greater than the (currently hard-coded) threshold,
* returns true indicating that the region should be split. The midKey for the
* largest MapFile is returned through the midKey parameter.
* Iterates through all the HStores and finds the one with the largest
* MapFile size. If the size is greater than the (currently hard-coded)
* threshold, returns true indicating that the region should be split. The
* midKey for the largest MapFile is returned through the midKey parameter.
*
* @param midKey - (return value) midKey of the largest MapFile
* @return - true if the region should be split
@ -659,16 +660,27 @@ public class HRegion implements HConstants {
try {
Text key = new Text();
long maxSize = 0;
long aggregateSize = 0;
for(HStore store: stores.values()) {
long size = store.getLargestFileSize(key);
aggregateSize += size;
if(size > maxSize) { // Largest so far
maxSize = size;
midKey.set(key);
}
}
return (maxSize >
(this.desiredMaxFileSize + (this.desiredMaxFileSize / 2)));
long triggerSize =
this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
boolean split = (maxSize >= triggerSize || aggregateSize >= triggerSize);
if (split) {
LOG.info("Splitting " + getRegionName().toString() +
" because largest file is " + StringUtils.humanReadableInt(maxSize) +
", aggregate size is " +
StringUtils.humanReadableInt(aggregateSize) +
" and desired size is " +
StringUtils.humanReadableInt(this.desiredMaxFileSize));
}
return split;
} finally {
lock.releaseReadLock();
}
@ -706,6 +718,9 @@ public class HRegion implements HConstants {
for(HStore store: stores.values()) {
if(store.getNMaps() > this.compactionThreshold) {
needsCompaction = true;
LOG.info(getRegionName().toString() + " needs compaction because " +
store.getNMaps() + " store files present and threshold is " +
this.compactionThreshold);
break;
}
}
@ -719,46 +734,50 @@ public class HRegion implements HConstants {
* Compact all the stores. This should be called periodically to make sure
* the stores are kept manageable.
*
* This operation could block for a long time, so don't call it from a
* <p>This operation could block for a long time, so don't call it from a
* time-sensitive thread.
*
* If it returns TRUE, the compaction has completed.
*
* If it returns FALSE, the compaction was not carried out, because the
* HRegion is busy doing something else storage-intensive (like flushing the
* cache). The caller should check back later.
* @return Returns TRUE if the compaction has completed. FALSE, if the
* compaction was not carried out, because the HRegion is busy doing
* something else storage-intensive (like flushing the cache). The caller
* should check back later.
*/
boolean compactStores() throws IOException {
boolean shouldCompact = false;
if (this.closed.get()) {
return shouldCompact;
}
lock.obtainReadLock();
try {
synchronized (writestate) {
if ((!writestate.writesOngoing) &&
writestate.writesEnabled &&
(!writestate.closed) &&
recentCommits > MIN_COMMITS_FOR_COMPACTION) {
writestate.writesEnabled) {
writestate.writesOngoing = true;
shouldCompact = true;
}
}
if (!shouldCompact) {
LOG.info("not compacting region " + this.regionInfo);
LOG.info("NOT compacting region " +
this.regionInfo.getRegionName().toString());
return false;
}
LOG.info("starting compaction on region " + this.regionInfo);
long startTime = System.currentTimeMillis();
LOG.info("starting compaction on region " +
this.regionInfo.getRegionName().toString());
for (HStore store : stores.values()) {
store.compact();
}
LOG.info("compaction completed on region " + this.regionInfo);
LOG.info("compaction completed on region " +
this.regionInfo.getRegionName().toString() + ". Took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
return true;
} finally {
lock.releaseReadLock();
synchronized (writestate) {
writestate.writesOngoing = false;
recentCommits = 0;
writestate.notifyAll();
}
}
@ -769,50 +788,64 @@ public class HRegion implements HConstants {
* only take if there have been a lot of uncommitted writes.
*/
void optionallyFlush() throws IOException {
if(commitsSinceFlush > maxUnflushedEntries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush);
}
if(this.memcache.getSize() > this.memcacheFlushSize) {
flushcache(false);
} else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
LOG.info("Optional flush called " + this.noFlushCount +
" times when data present without flushing. Forcing one.");
flushcache(false);
if (this.memcache.getSize() > 0) {
// Only increment if something in the cache.
// Gets zero'd when a flushcache is called.
this.noFlushCount++;
}
}
}
/**
* Flush the cache. This is called periodically to minimize the amount of log
* processing needed upon startup.
* Flush the cache. This is called periodically to minimize the amount of
* log processing needed upon startup.
*
* The returned Vector is a list of all the files used by the component HStores.
* It is a list of HStoreFile objects. If the returned value is NULL, then the
* flush could not be executed, because the HRegion is busy doing something
* else storage-intensive. The caller should check back later.
* <p>The returned Vector is a list of all the files used by the component
* HStores. It is a list of HStoreFile objects. If the returned value is
* NULL, then the flush could not be executed, because the HRegion is busy
* doing something else storage-intensive. The caller should check back
* later.
*
* The 'disableFutureWrites' boolean indicates that the caller intends to
* close() the HRegion shortly, so the HRegion should not take on any new and
* potentially long-lasting disk operations. This flush() should be the final
* pre-close() disk operation.
*
* This method may block for some time, so it should not be called from a
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
*
* @param disableFutureWrites indicates that the caller intends to
* close() the HRegion shortly, so the HRegion should not take on any new and
* potentially long-lasting disk operations. This flush() should be the final
* pre-close() disk operation.
*
* @return List of store files including new flushes, if any. If no flushes
* because memcache is null, returns all current store files. Returns
* null if no flush (Writes are going on elsewhere -- concurrently we are
* compacting or splitting).
*/
Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
Vector<HStoreFile> flushcache(boolean disableFutureWrites)
throws IOException {
if (this.closed.get()) {
return null;
}
this.noFlushCount = 0;
boolean shouldFlush = false;
synchronized(writestate) {
if((! writestate.writesOngoing)
&& writestate.writesEnabled
&& (! writestate.closed)) {
if((!writestate.writesOngoing) &&
writestate.writesEnabled) {
writestate.writesOngoing = true;
shouldFlush = true;
if(disableFutureWrites) {
writestate.writesEnabled = false;
}
}
}
if(! shouldFlush) {
if(!shouldFlush) {
if(LOG.isDebugEnabled()) {
LOG.debug("not flushing cache for region " +
LOG.debug("NOT flushing memcache for region " +
this.regionInfo.regionName);
}
return null;
@ -820,7 +853,6 @@ public class HRegion implements HConstants {
try {
return internalFlushcache();
} finally {
synchronized (writestate) {
writestate.writesOngoing = false;
@ -831,73 +863,69 @@ public class HRegion implements HConstants {
/**
* Flushing the cache is a little tricky. We have a lot of updates in the
* HMemcache, all of which have also been written to the log. We need to write
* those updates in the HMemcache out to disk, while being able to process
* reads/writes as much as possible during the flush operation. Also, the log
* has to state clearly the point in time at which the HMemcache was flushed.
* (That way, during recovery, we know when we can rely on the on-disk flushed
* structures and when we have to recover the HMemcache from the log.)
* HMemcache, all of which have also been written to the log. We need to
* write those updates in the HMemcache out to disk, while being able to
* process reads/writes as much as possible during the flush operation. Also,
* the log has to state clearly the point in time at which the HMemcache was
* flushed. (That way, during recovery, we know when we can rely on the
* on-disk flushed structures and when we have to recover the HMemcache from
* the log.)
*
* So, we have a three-step process:
* <p>So, we have a three-step process:
*
* A. Flush the memcache to the on-disk stores, noting the current sequence ID
* for the log.
* <ul><li>A. Flush the memcache to the on-disk stores, noting the current
* sequence ID for the log.<li>
*
* B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID
* that was current at the time of memcache-flush.
* <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
* ID that was current at the time of memcache-flush.</li>
*
* C. Get rid of the memcache structures that are now redundant, as they've
* been flushed to the on-disk HStores.
* <li>C. Get rid of the memcache structures that are now redundant, as
* they've been flushed to the on-disk HStores.</li>
* </ul>
* <p>This method is protected, but can be accessed via several public
* routes.
*
* This method is protected, but can be accessed via several public routes.
* <p> This method may block for some time.
*
* This method may block for some time.
* @return List of store files including just-made new flushes per-store. If
* not flush, returns list of all store files.
*/
Vector<HStoreFile> internalFlushcache() throws IOException {
Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
long startTime = -1;
if(LOG.isDebugEnabled()) {
LOG.debug("flushing cache for region " + this.regionInfo.regionName);
startTime = System.currentTimeMillis();
LOG.debug("Started memcache flush for region " +
this.regionInfo.regionName + ". Size " +
StringUtils.humanReadableInt(this.memcache.getSize()));
}
// We pass the log to the HMemcache, so we can lock down
// both simultaneously. We only have to do this for a moment:
// we need the HMemcache state at the time of a known log sequence
// number. Since multiple HRegions may write to a single HLog,
// the sequence numbers may zoom past unless we lock it.
// We pass the log to the HMemcache, so we can lock down both
// simultaneously. We only have to do this for a moment: we need the
// HMemcache state at the time of a known log sequence number. Since
// multiple HRegions may write to a single HLog, the sequence numbers may
// zoom past unless we lock it.
//
// When execution returns from snapshotMemcacheForLog()
// with a non-NULL value, the HMemcache will have a snapshot
// object stored that must be explicitly cleaned up using
// a call to deleteSnapshot().
if(LOG.isDebugEnabled()) {
LOG.debug("starting memcache snapshot");
}
// When execution returns from snapshotMemcacheForLog() with a non-NULL
// value, the HMemcache will have a snapshot object stored that must be
// explicitly cleaned up using a call to deleteSnapshot().
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
TreeMap<HStoreKey, byte []> memcacheSnapshot = retval.memcacheSnapshot;
if(memcacheSnapshot == null) {
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
allHStoreFiles.addAll(0, hstoreFiles);
}
return allHStoreFiles;
if(retval == null || retval.memcacheSnapshot == null) {
return getAllStoreFiles();
}
long logCacheFlushId = retval.sequenceId;
if(LOG.isDebugEnabled()) {
LOG.debug("Snapshotted memcache for region " +
this.regionInfo.regionName + ". Sequence id " + retval.sequenceId);
}
// A. Flush memcache to all the HStores.
if(LOG.isDebugEnabled()) {
LOG.debug("flushing memcache to HStores");
}
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
HStore hstore = it.next();
Vector<HStoreFile> hstoreFiles
= hstore.flushCache(memcacheSnapshot, logCacheFlushId);
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles
= hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
allHStoreFiles.addAll(0, hstoreFiles);
}
@ -906,27 +934,32 @@ public class HRegion implements HConstants {
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
if(LOG.isDebugEnabled()) {
LOG.debug("writing flush cache complete to log");
}
log.completeCacheFlush(this.regionInfo.regionName,
regionInfo.tableDesc.getName(), logCacheFlushId);
regionInfo.tableDesc.getName(), logCacheFlushId);
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores.
if(LOG.isDebugEnabled()) {
LOG.debug("deleting memcache snapshot");
}
memcache.deleteSnapshot();
if(LOG.isDebugEnabled()) {
LOG.debug("cache flush complete for region " + this.regionInfo.regionName);
// D. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
synchronized(this) {
notifyAll();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished memcache flush for region " +
this.regionInfo.regionName + " in " +
(System.currentTimeMillis() - startTime) + "ms");
}
return allHStoreFiles;
}
private Vector<HStoreFile> getAllStoreFiles() {
Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
allHStoreFiles.addAll(0, hstoreFiles);
}
this.commitsSinceFlush = 0;
return allHStoreFiles;
}
@ -947,9 +980,10 @@ public class HRegion implements HConstants {
/** Fetch multiple versions of a single data item, with timestamp. */
byte [][] get(Text row, Text column, long timestamp, int numVersions)
throws IOException {
if(writestate.closed) {
throw new IOException("HRegion is closed.");
throws IOException {
if (this.closed.get()) {
throw new IOException("Region " + this.getRegionName().toString() +
" closed");
}
// Make sure this is a valid row and valid column
@ -1065,17 +1099,75 @@ public class HRegion implements HConstants {
* after a specified quiet period.
*
* @param row Row to update
* @return lockid
* @return lock id
* @throws IOException
* @see #put(long, Text, byte[])
*/
public long startUpdate(Text row) throws IOException {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
// See HRegionServer#RegionListener for how the expire on HRegionServer
// invokes a HRegion#abort.
return obtainRowLock(row);
// Do a rough check that we have resources to accept a write. The check is
// 'rough' in that between the resource check and the call to obtain a
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
if (this.closed.get()) {
throw new IOException("Region " + this.getRegionName().toString() +
" closed");
}
// Get a read lock. We will not be able to get one if we are closing or
// if this region is being split. In neither case should we be allowing
// updates.
this.lock.obtainReadLock();
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
// See HRegionServer#RegionListener for how the expire on HRegionServer
// invokes a HRegion#abort.
return obtainRowLock(row);
} finally {
this.lock.releaseReadLock();
}
}
/*
* Check if resources to support an update.
*
* For now, just checks memcache saturation.
*
* Here we synchronize on HRegion, a broad scoped lock. Its appropriate
* given we're figuring in here whether this region is able to take on
* writes. This is only method with a synchronize (at time of writing),
* this and the synchronize on 'this' inside in internalFlushCache to send
* the notify.
*/
private synchronized void checkResources() {
if (checkCommitsSinceFlush()) {
return;
}
LOG.warn("Blocking updates for '" + Thread.currentThread().getName() +
"': Memcache size " +
StringUtils.humanReadableInt(this.memcache.getSize()) +
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
while (!checkCommitsSinceFlush()) {
try {
wait();
} catch (InterruptedException e) {
// continue;
}
}
LOG.warn("Unblocking updates for '" + Thread.currentThread().getName() +
"'");
}
/*
* @return True if commits since flush is under the blocking threshold.
*/
private boolean checkCommitsSinceFlush() {
return this.memcache.getSize() < this.blockingMemcacheSize;
}
/**
@ -1142,11 +1234,11 @@ public class HRegion implements HConstants {
throw new LockException("Locking error: put operation on lock " +
lockid + " unexpected aborted by another thread");
}
TreeMap<Text, byte []> targets = this.targetColumns.get(lockid);
Long lid = Long.valueOf(lockid);
TreeMap<Text, byte []> targets = this.targetColumns.get(lid);
if (targets == null) {
targets = new TreeMap<Text, byte []>();
this.targetColumns.put(lockid, targets);
this.targetColumns.put(lid, targets);
}
targets.put(targetCol, val);
}
@ -1207,18 +1299,17 @@ public class HRegion implements HConstants {
// hasn't aborted/committed the write-operation
synchronized(row) {
// Add updates to the log and add values to the memcache.
TreeMap<Text, byte []> columns = this.targetColumns.get(lockid);
Long lid = Long.valueOf(lockid);
TreeMap<Text, byte []> columns = this.targetColumns.get(lid);
if (columns != null && columns.size() > 0) {
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
row, columns, timestamp);
memcache.add(row, columns, timestamp);
// OK, all done!
}
targetColumns.remove(lockid);
targetColumns.remove(lid);
releaseRowLock(row);
}
recentCommits++;
this.commitsSinceFlush++;
}
//////////////////////////////////////////////////////////////////////////////
@ -1284,11 +1375,11 @@ public class HRegion implements HConstants {
}
}
long lockid = Math.abs(rand.nextLong());
rowsToLocks.put(row, lockid);
locksToRows.put(lockid, row);
Long lid = Long.valueOf(Math.abs(rand.nextLong()));
rowsToLocks.put(row, lid);
locksToRows.put(lid, row);
rowsToLocks.notifyAll();
return lockid;
return lid.longValue();
}
}
@ -1451,7 +1542,6 @@ public class HRegion implements HConstants {
// If we are doing a wild card match or there are multiple
// matchers per column, we need to scan all the older versions of
// this row to pick up the rest of the family members
if (!wildcardMatch
&& !multipleMatchers
&& (keys[i].getTimestamp() != chosenTimestamp)) {
@ -1467,7 +1557,6 @@ public class HRegion implements HConstants {
// but this had the effect of overwriting newer
// values with older ones. So now we only insert
// a result if the map does not contain the key.
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
if (!filtered && moreToFollow &&
!results.containsKey(e.getKey())) {
@ -1516,7 +1605,9 @@ public class HRegion implements HConstants {
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
if (this.dataFilter != null) {
LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
}
}
}
@ -1640,7 +1731,10 @@ public class HRegion implements HConstants {
client.put(lockid, COL_STARTCODE,
String.valueOf(startCode).getBytes(UTF8_ENCODING));
client.commit(lockid);
LOG.info("Added region " + region.getRegionName() + " to table " + table);
if (LOG.isDebugEnabled()) {
LOG.info("Added region " + region.getRegionName() + " to table " +
table);
}
}
/**
@ -1659,7 +1753,9 @@ public class HRegion implements HConstants {
client.delete(lockid, COL_SERVER);
client.delete(lockid, COL_STARTCODE);
client.commit(lockid);
LOG.info("Removed " + regionName + " from table " + table);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " + regionName + " from table " + table);
}
}
/**

View File

@ -114,7 +114,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + regionName + " to retiringRegions");
LOG.debug(regionName.toString() + "closing (" +
"Adding to retiringRegions)");
}
} finally {
lock.writeLock().unlock();
@ -129,7 +130,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try {
retiringRegions.remove(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing " + regionName + " from retiringRegions");
LOG.debug(regionName.toString() + " closed");
}
} finally {
lock.writeLock().unlock();
@ -140,7 +141,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* {@inheritDoc}
*/
public void run() {
while(! stopRequested) {
while(!stopRequested) {
long startTime = System.currentTimeMillis();
synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
// Grab a list of regions to check
@ -151,21 +152,19 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} finally {
lock.readLock().unlock();
}
try {
for(HRegion cur: regionsToCheck) {
if(cur.isClosed()) {
continue; // Skip if closed
}
if(cur.needsCompaction()) {
// Best time to split a region is right after compaction
if(cur.compactStores()) {
Text midKey = new Text();
if(cur.needsSplit(midKey)) {
split(cur, midKey);
}
}
if (cur.needsCompaction()) {
cur.compactStores();
}
// After compaction, it probably needs splitting. May also need
// splitting just because one of the memcache flushes was big.
Text midKey = new Text();
if (cur.needsSplit(midKey)) {
split(cur, midKey);
}
}
} catch(IOException e) {
@ -196,23 +195,21 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.closeAndSplit(midKey, this);
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
final Text tableToUpdate =
region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
LOG.info("Updating " + tableToUpdate + " with region split info");
// Remove old region from META
for (int tries = 0; tries < numRetries; tries++) {
try {
HRegion.removeRegionFromMETA(client, tableToUpdate,
region.getRegionName());
break;
} catch (IOException e) {
if(tries == numRetries - 1) {
if(e instanceof RemoteException) {
@ -224,13 +221,12 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
// Add new regions to META
for (int i = 0; i < newRegions.length; i++) {
for (int tries = 0; tries < numRetries; tries ++) {
try {
HRegion.addRegionToMETA(client, tableToUpdate, newRegions[i],
serverInfo.getServerAddress(), serverInfo.getStartCode());
break;
} catch(IOException e) {
if(tries == numRetries - 1) {
if(e instanceof RemoteException) {
@ -243,7 +239,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
// Now tell the master about the new regions
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting region split to master");
}
@ -253,9 +248,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
" successful. Old region=" + oldRegionInfo.getRegionName() +
", new regions: " + newRegions[0].getRegionName() + ", " +
newRegions[1].getRegionName());
// Finally, start serving the new regions
// Finally, start serving the new regions
lock.writeLock().lock();
try {
onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
@ -271,7 +265,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
private final Flusher cacheFlusher;
private final Thread cacheFlusherThread;
protected final Integer cacheFlusherLock = new Integer(0);
/** Runs periodically to flush the memcache */
/* Runs periodically to flush memcache.
*
* Memcache flush is also called just before compaction and just before
* split so memcache is best prepared for the the long trip across
* compactions/splits during which it will not be able to flush to disk.
*/
class Flusher implements Runnable {
/**
* {@inheritDoc}
@ -293,7 +293,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
// Flush them, if necessary
for(HRegion cur: toFlush) {
if(cur.isClosed()) { // Skip if closed
continue;
@ -305,7 +304,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if (iex instanceof RemoteException) {
try {
iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
} catch (IOException x) {
iex = x;
}
@ -316,9 +314,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
// Sleep
long waitTime = stopRequested ? 0
: threadWakeFrequency - (System.currentTimeMillis() - startTime);
long waitTime = stopRequested? 0:
threadWakeFrequency - (System.currentTimeMillis() - startTime);
if(waitTime > 0) {
try {
Thread.sleep(waitTime);
@ -358,9 +355,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
int nEntries = log.getNumEntries();
if(nEntries > this.maxLogEntries) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Rolling log. Number of entries is: " + nEntries);
}
LOG.info("Rolling hlog. Number of entries: " + nEntries);
log.rollWriter();
} catch (IOException iex) {
if (iex instanceof RemoteException) {
@ -439,7 +434,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
15 * 1000);
this.splitOrCompactCheckFrequency =
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
60 * 1000);
30 * 1000);
// Cache flushing
this.cacheFlusher = new Flusher();
@ -465,20 +460,21 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it.
String realIP = DNS.getDefaultIP(
conf.get("dfs.datanode.dns.interface","default"));
this.serverInfo = new HServerInfo(new HServerAddress(
new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
this.rand.nextLong());
new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
this.rand.nextLong());
Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" +
this.serverInfo.getServerAddress().getPort());
this.serverInfo.getServerAddress().getPort());
if (LOG.isDebugEnabled()) {
LOG.debug("Log dir " + logdir);
}
// Logging
this.fs = FileSystem.get(conf);
if(fs.exists(logdir)) {
throw new RegionServerRunningException("region server already running at " +
this.serverInfo.getServerAddress().toString() + " because logdir " +
logdir.toString() + " exists");
throw new RegionServerRunningException("region server already " +
"running at " + this.serverInfo.getServerAddress().toString() +
" because logdir " + logdir.toString() + " exists");
}
this.log = new HLog(fs, logdir, conf);
@ -579,7 +575,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try {
this.server.start();
LOG.info("HRegionServer started at: " + serverInfo.getServerAddress().toString());
LOG.info("HRegionServer started at: " +
serverInfo.getServerAddress().toString());
} catch(IOException e) {
stopRequested = true;
if (e instanceof RemoteException) {
@ -759,8 +756,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.getRegionInfo());
}
LOG.info("telling master that region server is shutting down at: "
+ serverInfo.getServerAddress().toString());
LOG.info("telling master that region server is shutting down at: " +
serverInfo.getServerAddress().toString());
hbaseMaster.regionServerReport(serverInfo, exitMsg);
} catch (IOException e) {
if (e instanceof RemoteException) {
@ -947,7 +944,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
try {
region.close(abortRequested);
LOG.debug("region closed " + region.getRegionName());
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
@ -978,7 +974,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/**
* {@inheritDoc}
*/
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException {
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException {
for(Map.Entry<Text, ArrayList<BatchOperation>> e: b) {
Text row = e.getKey();
long clientid = rand.nextLong();
@ -1027,7 +1024,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/**
* {@inheritDoc}
*/
public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
public KeyedData[] getRow(final Text regionName, final Text row)
throws IOException {
HRegion region = getRegion(regionName);
TreeMap<Text, byte[]> map = region.getFull(row);
KeyedData result[] = new KeyedData[map.size()];
@ -1066,14 +1064,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
byte [] val = e.getValue();
if (DELETE_BYTES.compareTo(val) == 0) {
// Column value is deleted. Don't return it.
if (LOG.isDebugEnabled()) {
LOG.debug("skipping deleted value for key: " + k.toString());
}
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("adding value for key: " + k.toString());
}
values.add(new KeyedData(k, val));
}
if(values.size() > 0) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.StringUtils;
import org.onelab.filter.*;
@ -78,6 +79,7 @@ class HStore implements HConstants {
Path loginfodir;
Path filterDir;
Filter bloomFilter;
private String storeName;
Integer compactLock = new Integer(0);
Integer flushLock = new Integer(0);
@ -129,6 +131,8 @@ class HStore implements HConstants {
this.family = family;
this.familyName = HStoreKey.extractFamily(this.family.getName());
this.compression = SequenceFile.CompressionType.NONE;
this.storeName = this.regionName.toString() + "/" +
this.familyName.toString();
if(family.getCompression() != HColumnDescriptor.CompressionType.NONE) {
if(family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
@ -161,7 +165,7 @@ class HStore implements HConstants {
}
if(LOG.isDebugEnabled()) {
LOG.debug("starting HStore for " + regionName + "/"+ familyName);
LOG.debug("Starting HStore for " + this.storeName);
}
// Either restart or get rid of any leftover compaction work. Either way,
@ -216,16 +220,11 @@ class HStore implements HConstants {
// Finally, start up all the map readers! (There should be just one at this
// point, as we've compacted them all.)
if(LOG.isDebugEnabled()) {
LOG.debug("starting map readers");
}
for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
// TODO - is this really necessary? Don't I do this inside compact()?
maps.put(e.getKey(),
getMapFileReader(e.getValue().getMapFilePath().toString()));
}
LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
}
/*
@ -239,9 +238,6 @@ class HStore implements HConstants {
private void doReconstructionLog(final Path reconstructionLog,
final long maxSeqID)
throws UnsupportedEncodingException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("reading reconstructionLog");
}
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
return;
}
@ -306,7 +302,7 @@ class HStore implements HConstants {
Path filterFile = new Path(filterDir, BLOOMFILTER_FILE_NAME);
if(fs.exists(filterFile)) {
if (LOG.isDebugEnabled()) {
LOG.debug("loading bloom filter for " + family.getName());
LOG.debug("loading bloom filter for " + this.storeName);
}
switch(family.bloomFilter.filterType) {
@ -328,7 +324,7 @@ class HStore implements HConstants {
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("creating bloom filter for " + family.getName());
LOG.debug("creating bloom filter for " + this.storeName);
}
switch(family.bloomFilter.filterType) {
@ -357,7 +353,7 @@ class HStore implements HConstants {
*/
private void flushBloomFilter() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("flushing bloom filter for " + family.getName());
LOG.debug("flushing bloom filter for " + this.storeName);
}
FSDataOutputStream out =
fs.create(new Path(filterDir, BLOOMFILTER_FILE_NAME));
@ -365,7 +361,7 @@ class HStore implements HConstants {
bloomFilter.write(out);
out.close();
if (LOG.isDebugEnabled()) {
LOG.debug("flushed bloom filter for " + family.getName());
LOG.debug("flushed bloom filter for " + this.storeName);
}
}
@ -494,7 +490,9 @@ class HStore implements HConstants {
* @throws IOException
*/
void close() throws IOException {
LOG.info("closing HStore for " + this.regionName + "/" + this.familyName);
if (LOG.isDebugEnabled()) {
LOG.info("closing HStore for " + this.storeName);
}
this.lock.obtainWriteLock();
try {
for (MapFile.Reader map: maps.values()) {
@ -503,7 +501,7 @@ class HStore implements HConstants {
maps.clear();
mapFiles.clear();
LOG.info("HStore closed for " + this.regionName + "/" + this.familyName);
LOG.info("HStore closed for " + this.storeName);
} finally {
this.lock.releaseWriteLock();
}
@ -524,13 +522,13 @@ class HStore implements HConstants {
*
* Return the entire list of HStoreFiles currently used by the HStore.
*
* @param inputCache - memcache to flush
* @param logCacheFlushId - flush sequence number
* @return - Vector of all the HStoreFiles in use
* @param inputCache memcache to flush
* @param logCacheFlushId flush sequence number
* @return Vector of all the HStoreFiles in use
* @throws IOException
*/
Vector<HStoreFile> flushCache(TreeMap<HStoreKey, byte []> inputCache,
long logCacheFlushId)
Vector<HStoreFile> flushCache(final TreeMap<HStoreKey, byte []> inputCache,
final long logCacheFlushId)
throws IOException {
return flushCacheHelper(inputCache, logCacheFlushId, true);
}
@ -538,64 +536,48 @@ class HStore implements HConstants {
Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
long logCacheFlushId, boolean addToAvailableMaps)
throws IOException {
synchronized(flushLock) {
if(LOG.isDebugEnabled()) {
LOG.debug("flushing HStore " + this.regionName + "/" + this.familyName);
}
// A. Write the TreeMap out to the disk
HStoreFile flushedFile
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
HStoreFile flushedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
regionName, familyName, fs);
Path mapfile = flushedFile.getMapFilePath();
if(LOG.isDebugEnabled()) {
LOG.debug("map file is: " + mapfile.toString());
LOG.debug("Flushing to " + mapfile.toString());
}
MapFile.Writer out = getMapFileWriter(mapfile.toString());
try {
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
if (this.familyName.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
if (this.familyName.
equals(HStoreKey.extractFamily(curkey.getColumn()))) {
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("HStore " + this.regionName + "/" + this.familyName + " flushed");
}
} finally {
out.close();
}
// B. Write out the log sequence number that corresponds to this output
// MapFile. The MapFile is current up to and including the log seq num.
if(LOG.isDebugEnabled()) {
LOG.debug("writing log cache flush id");
}
flushedFile.writeInfo(fs, logCacheFlushId);
// C. Flush the bloom filter if any
if(bloomFilter != null) {
flushBloomFilter();
}
// D. Finally, make the new MapFile available.
if(addToAvailableMaps) {
this.lock.obtainWriteLock();
try {
Long flushid = Long.valueOf(logCacheFlushId);
maps.put(flushid, getMapFileReader(mapfile.toString()));
mapFiles.put(flushid, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("HStore available for " + this.regionName + "/"
+ this.familyName + " flush id=" + logCacheFlushId);
LOG.debug("Added " + mapfile.toString() +
" with flush id " + logCacheFlushId + " and size " +
StringUtils.humanReadableInt(mapfile.getFileSystem(this.conf).
getContentLength(mapfile)));
}
} finally {
this.lock.releaseWriteLock();
@ -626,7 +608,7 @@ class HStore implements HConstants {
* Compact the back-HStores. This method may take some time, so the calling
* thread must be able to block for long periods.
*
* During this time, the HStore can work as usual, getting values from
* <p>During this time, the HStore can work as usual, getting values from
* MapFiles and writing new MapFiles from given memcaches.
*
* Existing MapFiles are not destroyed until the new compacted TreeMap is
@ -646,28 +628,25 @@ class HStore implements HConstants {
void compactHelper(boolean deleteSequenceInfo) throws IOException {
synchronized(compactLock) {
if(LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + this.regionName + "/" + this.familyName);
}
Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, familyName);
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
fs.mkdirs(curCompactStore);
if(LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + mapFiles.size() + " files in " +
curCompactStore.toString());
}
try {
// Grab a list of files to compact.
Vector<HStoreFile> toCompactFiles = null;
this.lock.obtainWriteLock();
try {
toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
} finally {
this.lock.releaseWriteLock();
}
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
// Compute the max-sequenceID seen in any of the to-be-compacted
// TreeMaps
long maxSeenSeqID = -1;
for (HStoreFile hsf: toCompactFiles) {
long seqid = hsf.loadInfo(fs);
@ -677,18 +656,13 @@ class HStore implements HConstants {
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("max sequence id: " + maxSeenSeqID);
}
HStoreFile compactedOutputFile
= new HStoreFile(conf, compactdir, regionName, familyName, -1);
if(toCompactFiles.size() == 1) {
if(LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.regionName + "/" + this.familyName);
LOG.debug("nothing to compact for " + this.storeName);
}
HStoreFile hsf = toCompactFiles.elementAt(0);
if(hsf.loadInfo(fs) == -1) {
return;
@ -699,7 +673,6 @@ class HStore implements HConstants {
MapFile.Writer compactedOut =
getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
try {
// We create a new set of MapFile.Reader objects so we don't screw up
// the caching associated with the currently-loaded ones.
//
@ -711,15 +684,13 @@ class HStore implements HConstants {
// lowest-ranked one. Updates to a single row/column will appear
// ranked by timestamp. This allows us to throw out deleted values or
// obsolete versions.
MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
ImmutableBytesWritable[] vals =
new ImmutableBytesWritable[toCompactFiles.size()];
boolean[] done = new boolean[toCompactFiles.size()];
int pos = 0;
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
for(HStoreFile hsf: toCompactFiles) {
readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
keys[pos] = new HStoreKey();
vals[pos] = new ImmutableBytesWritable();
@ -729,11 +700,6 @@ class HStore implements HConstants {
// Now, advance through the readers in order. This will have the
// effect of a run-time sort of the entire dataset.
if(LOG.isDebugEnabled()) {
LOG.debug("processing HStoreFile readers");
}
int numDone = 0;
for(int i = 0; i < readers.length; i++) {
readers[i].reset();
@ -747,9 +713,7 @@ class HStore implements HConstants {
Text lastRow = new Text();
Text lastColumn = new Text();
while(numDone < done.length) {
// Find the reader with the smallest key
int smallestKey = -1;
for(int i = 0; i < readers.length; i++) {
if(done[i]) {
@ -758,7 +722,6 @@ class HStore implements HConstants {
if(smallestKey < 0) {
smallestKey = i;
} else {
if(keys[i].compareTo(keys[smallestKey]) < 0) {
smallestKey = i;
@ -767,74 +730,60 @@ class HStore implements HConstants {
}
// Reflect the current key/val in the output
HStoreKey sk = keys[smallestKey];
if(lastRow.equals(sk.getRow())
&& lastColumn.equals(sk.getColumn())) {
timesSeen++;
} else {
timesSeen = 1;
}
if(timesSeen <= family.getMaxVersions()) {
// Keep old versions until we have maxVersions worth.
// Then just skip them.
if(sk.getRow().getLength() != 0
&& sk.getColumn().getLength() != 0) {
// Only write out objects which have a non-zero length key and value
// Only write out objects which have a non-zero length key and
// value
compactedOut.append(sk, vals[smallestKey]);
}
}
//TODO: I don't know what to do about deleted values. I currently
// TODO: I don't know what to do about deleted values. I currently
// include the fact that the item was deleted as a legitimate
// "version" of the data. Maybe it should just drop the deleted val?
// "version" of the data. Maybe it should just drop the deleted
// val?
// Update last-seen items
lastRow.set(sk.getRow());
lastColumn.set(sk.getColumn());
// Advance the smallest key. If that reader's all finished, then
// mark it as done.
if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
if(! readers[smallestKey].next(keys[smallestKey],
vals[smallestKey])) {
done[smallestKey] = true;
readers[smallestKey].close();
numDone++;
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("all HStores processed");
}
} finally {
compactedOut.close();
}
if(LOG.isDebugEnabled()) {
LOG.debug("writing new compacted HStore");
LOG.debug("writing new compacted HStore to " +
compactedOutputFile.getMapFilePath().toString());
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
compactedOutputFile.writeInfo(fs, maxSeenSeqID);
} else {
compactedOutputFile.writeInfo(fs, -1);
}
// Write out a list of data files that we're replacing
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
try {
@ -848,18 +797,11 @@ class HStore implements HConstants {
}
// Indicate that we're done.
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
(new DataOutputStream(fs.create(doneFile))).close();
// Move the compaction into place.
processReadyCompaction();
if(LOG.isDebugEnabled()) {
LOG.debug("compaction complete for " + this.regionName + "/" + this.familyName);
}
} finally {
fs.delete(compactdir);
}
@ -872,8 +814,8 @@ class HStore implements HConstants {
*
* It works by processing a compaction that's been written to disk.
*
* It is usually invoked at the end of a compaction, but might also be invoked
* at HStore startup, if the prior execution died midway through.
* It is usually invoked at the end of a compaction, but might also be
* invoked at HStore startup, if the prior execution died midway through.
*/
void processReadyCompaction() throws IOException {
@ -890,22 +832,22 @@ class HStore implements HConstants {
// 1. Acquiring the write-lock
Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, familyName);
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
this.lock.obtainWriteLock();
try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
if(! fs.exists(doneFile)) {
if(!fs.exists(doneFile)) {
// The last execution didn't finish the compaction, so there's nothing
// we can do. We'll just have to redo it. Abandon it and return.
LOG.warn("Redoing a failed compaction");
return;
}
// OK, there's actually compaction work that needs to be put into place.
if(LOG.isDebugEnabled()) {
LOG.debug("compaction starting");
LOG.debug("Process ready compaction starting");
}
// 2. Load in the files to be deleted.
@ -927,13 +869,14 @@ class HStore implements HConstants {
}
if(LOG.isDebugEnabled()) {
LOG.debug("loaded files to be deleted");
LOG.debug("loaded " + toCompactFiles.size() +
" file(s) to be deleted");
}
// 3. Unload all the replaced MapFiles.
Iterator<HStoreFile> it2 = mapFiles.values().iterator();
for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
for(Iterator<MapFile.Reader> it = maps.values().iterator();
it.hasNext(); ) {
MapFile.Reader curReader = it.next();
HStoreFile curMapFile = it2.next();
if(toCompactFiles.contains(curMapFile)) {
@ -948,24 +891,18 @@ class HStore implements HConstants {
it.remove();
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("unloaded existing MapFiles");
}
// What if we crash at this point? No big deal; we will restart
// processReadyCompaction(), and nothing has been lost.
// 4. Delete all the old files, no longer needed
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
for(HStoreFile hsf: toCompactFiles) {
fs.delete(hsf.getMapFilePath());
fs.delete(hsf.getInfoFilePath());
}
if(LOG.isDebugEnabled()) {
LOG.debug("old files deleted");
LOG.debug("old file(s) deleted");
}
// What if we fail now? The above deletes will fail silently. We'd better
@ -973,41 +910,32 @@ class HStore implements HConstants {
// something we delete, though.
// 5. Moving the new MapFile into place
if(LOG.isDebugEnabled()) {
LOG.debug("moving new MapFile into place");
}
HStoreFile compactedFile
= new HStoreFile(conf, compactdir, regionName, familyName, -1);
HStoreFile finalCompactedFile
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
if(LOG.isDebugEnabled()) {
LOG.debug("moving " + compactedFile.getMapFilePath().toString() +
" to " + finalCompactedFile.getMapFilePath().toString());
}
fs.rename(compactedFile.getMapFilePath(), finalCompactedFile.getMapFilePath());
fs.rename(compactedFile.getMapFilePath(),
finalCompactedFile.getMapFilePath());
// Fail here? No problem.
fs.rename(compactedFile.getInfoFilePath(), finalCompactedFile.getInfoFilePath());
fs.rename(compactedFile.getInfoFilePath(),
finalCompactedFile.getInfoFilePath());
// Fail here? No worries.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
// 6. Loading the new TreeMap.
if(LOG.isDebugEnabled()) {
LOG.debug("loading new TreeMap");
}
mapFiles.put(orderVal, finalCompactedFile);
maps.put(orderVal, getMapFileReader(
finalCompactedFile.getMapFilePath().toString()));
finalCompactedFile.getMapFilePath().toString()));
} finally {
// 7. Releasing the write-lock
this.lock.releaseWriteLock();
}
}

View File

@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
@ -36,6 +38,7 @@ import java.util.*;
* This class handles all that path-building stuff for you.
******************************************************************************/
public class HStoreFile implements HConstants, WritableComparable {
private static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
static final byte INFO_SEQ_NUM = 0;
static final String HSTORE_DATFILE_PREFIX = "mapfile.dat.";
static final String HSTORE_INFOFILE_PREFIX = "mapfile.info.";
@ -211,29 +214,34 @@ public class HStoreFile implements HConstants, WritableComparable {
* brand-new HRegions.
*/
void splitStoreFile(Text midKey, HStoreFile dstA, HStoreFile dstB,
FileSystem fs, Configuration conf) throws IOException {
FileSystem fs, Configuration c)
throws IOException {
// Copy the appropriate tuples to one MapFile or the other.
MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), conf);
MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), c);
try {
MapFile.Writer outA = new MapFile.Writer(conf, fs,
MapFile.Writer outA = new MapFile.Writer(c, fs,
dstA.getMapFilePath().toString(), HStoreKey.class,
ImmutableBytesWritable.class);
try {
MapFile.Writer outB = new MapFile.Writer(conf, fs,
MapFile.Writer outB = new MapFile.Writer(c, fs,
dstB.getMapFilePath().toString(), HStoreKey.class,
ImmutableBytesWritable.class);
try {
long count = 0;
HStoreKey readkey = new HStoreKey();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
while(in.next(readkey, readval)) {
Text key = readkey.getRow();
if(key.compareTo(midKey) < 0) {
if(readkey.getRow().compareTo(midKey) < 0) {
outA.append(readkey, readval);
} else {
outB.append(readkey, readval);
}
if (LOG.isDebugEnabled()) {
count++;
if ((count % 10000) == 0) {
LOG.debug("Write " + count + " records");
}
}
}
} finally {
outB.close();
@ -300,15 +308,12 @@ public class HStoreFile implements HConstants, WritableComparable {
long loadInfo(FileSystem fs) throws IOException {
Path p = getInfoFilePath();
DataInputStream in = new DataInputStream(fs.open(p));
try {
byte flag = in.readByte();
if(flag == INFO_SEQ_NUM) {
return in.readLong();
}
throw new IOException("Cannot process log file: " + p);
} finally {
in.close();
}

View File

@ -95,11 +95,10 @@ public class HStoreKey implements WritableComparable {
Text column;
long timestamp;
/** Default constructor used in conjunction with Writable interface */
public HStoreKey() {
this.row = new Text();
this.column = new Text();
this.timestamp = Long.MAX_VALUE;
this(new Text());
}
/**
@ -110,9 +109,7 @@ public class HStoreKey implements WritableComparable {
* @param row - row key
*/
public HStoreKey(Text row) {
this.row = new Text(row);
this.column = new Text();
this.timestamp = Long.MAX_VALUE;
this(row, Long.MAX_VALUE);
}
/**
@ -123,9 +120,7 @@ public class HStoreKey implements WritableComparable {
* @param timestamp timestamp value
*/
public HStoreKey(Text row, long timestamp) {
this.row = new Text(row);
this.column = new Text();
this.timestamp = timestamp;
this(row, new Text(), timestamp);
}
/**
@ -136,9 +131,7 @@ public class HStoreKey implements WritableComparable {
* @param column column key
*/
public HStoreKey(Text row, Text column) {
this.row = new Text(row);
this.column = new Text(column);
this.timestamp = Long.MAX_VALUE;
this(row, column, Long.MAX_VALUE);
}
/**
@ -154,16 +147,21 @@ public class HStoreKey implements WritableComparable {
this.timestamp = timestamp;
}
/**
* @return Approximate size in bytes of this key.
*/
public long getSize() {
return this.row.getLength() + this.column.getLength() +
8 /* There is no sizeof in java. Presume long is 8 (64bit machine)*/;
}
/**
* Construct a new HStoreKey from another
*
* @param other the source key
*/
public HStoreKey(HStoreKey other) {
this();
this.row.set(other.row);
this.column.set(other.column);
this.timestamp = other.timestamp;
this(other.row, other.column, other.timestamp);
}
/**

View File

@ -51,16 +51,10 @@ public class TestHMemcache extends TestCase {
@Override
protected void setUp() throws Exception {
super.setUp();
this.hmemcache = new HMemcache();
// Set up a configuration that has configuration for a file
// filesystem implementation.
this.conf = new HBaseConfiguration();
// The test hadoop-site.xml doesn't have a default file fs
// implementation. Remove below when gets added.
this.conf.set("fs.file.impl",
"org.apache.hadoop.fs.LocalFileSystem");
}
/* (non-Javadoc)
@ -140,11 +134,14 @@ public class TestHMemcache extends TestCase {
// Add some rows, run a snapshot. Do it a few times.
for (int i = 0; i < snapshotCount; i++) {
addRows(this.hmemcache);
int historyInitialSize = this.hmemcache.history.size();
Snapshot s = runSnapshot(this.hmemcache, log);
log.completeCacheFlush(new Text(Integer.toString(i)),
tableName, s.sequenceId);
// Clean up snapshot now we are done with it.
this.hmemcache.deleteSnapshot();
assertTrue("History not being cleared",
historyInitialSize == this.hmemcache.history.size());
}
log.closeAndDelete();
}