HBASE-469 Streamline HStore startup and compactions

HMerge, HRegionServer

- changes that reflect changes to HRegion, CompactSplitThread and Flusher methods

ServerManager

- Return zero length array to region server if it is exiting or quiesced and Master is not yet ready to shut down.

QueueEntry

- removed. no longer used.

CompactSplitThread

- make compactionQueue a queue of HRegion.
- Add Set<HRegion> so we can quickly determine if a region is in the queue. BlockingQueue.contains() does a linear scan of the queue.
- Add a lock and interruptPolitely methods so that compactions/splits in progress are not interrupted.
- Don't add a region to the queue if it is already present.

Flusher

- change queue from DelayQueue to BlockingQueue, with HRegion entries instead of QueueEntry.
- Add Set<HRegion> to quickly determine if a region is already in the queue to avoid linear scan of BlockingQueue.contains().
- Only put regions in the queue for optional cache flush if the last time they were flushed is older than now - optionalFlushInterval.
- Only add regions to the queue if it is not already present.

HRegion

- don't request a cache flush if one has already been requested.
- Add setLastFlushTime so flusher can set it once it has queued an optional flush.
- Replace largestHStore with getLargestHStoreSize: returns long instead of HStoreSize object.
- Add midKey as parameter to splitRegion.
- Reorder start of splitRegion so it doesn't do any work before validating parameters.
- Remove needsSplit and compactIfNeeded - no longer needed.
- compactStores now returns midKey if split is needed.
- snapshotMemcaches now sets flushRequested to false and sets lastFlushTime to now.
- update does not request a cache flush if one has already been requested.
- Override equals and hashCode so HRegions can be stored in a HashSet.

HStore

- loadHStoreFiles now computes max sequence id and the initial size of the store.
- Add getter for family.
- internalCacheFlush updates store size, and logs both size of cache flush and resulting map file size (with debug logging enabled).
- Remove needsCompaction and hasReferences - no longer needed.
- compact() returns midKey if store needs to be split.
- compact() does all checking before actually starting a compaction.
- If store size is greater than desiredMaxFileSize, compact returns the midKey for the store regardless of whether a compaction was actually done.
- Added more synchronization in completeCompaction while iterating over storeFiles.
- completeCompaction computes new store size.
- New method checkSplit replaces method size. Returns midKey if store needs to be split and can be split.

HStoreSize

- removed. No longer needed.

HBaseTestCase

- only set fs if it has not already been set by a subclass.

TestTableIndex, TestTableMapReduce

- call FSUtil.deleteFully to clean up cruft left in local fs, by MapReduce


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@643761 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-04-02 06:58:26 +00:00
parent 0e825ebaff
commit 6e225dd7f1
23 changed files with 679 additions and 756 deletions

View File

@ -12,6 +12,9 @@ Hbase Change Log
NEW FEATURES
HBASE-548 Tool to online single region
IMPROVEMENTS
HBASE-469 Streamline HStore startup and compactions
Release 0.1.0
INCOMPATIBLE CHANGES

View File

@ -144,17 +144,16 @@ class HMerge implements HConstants {
long currentSize = 0;
HRegion nextRegion = null;
long nextSize = 0;
Text midKey = new Text();
for (int i = 0; i < info.length - 1; i++) {
if (currentRegion == null) {
currentRegion =
new HRegion(tabledir, hlog, fs, conf, info[i], null, null);
currentSize = currentRegion.largestHStore(midKey).getAggregate();
currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion =
new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null);
nextSize = nextRegion.largestHStore(midKey).getAggregate();
nextSize = nextRegion.getLargestHStoreSize();
if ((currentSize + nextSize) <= (maxFilesize / 2)) {
// We merge two adjacent regions if their total size is less than

View File

@ -128,6 +128,9 @@ class ServerManager implements HConstants {
}
/**
* Called to process the messages sent from the region server to the master
* along with the heart beat.
*
* @param serverInfo
* @param msgs
* @return messages from master to region server indicating what region
@ -142,7 +145,7 @@ class ServerManager implements HConstants {
if (msgs.length > 0) {
if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
processRegionServerExit(serverName, msgs);
return new HMsg[]{msgs[0]};
return new HMsg[0];
} else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
LOG.info("Region server " + serverName + " quiesced");
master.quiescedMetaServers.incrementAndGet();
@ -157,6 +160,11 @@ class ServerManager implements HConstants {
}
if (master.shutdownRequested && !master.closed.get()) {
if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
// Server is already quiesced, but we aren't ready to shut down
// return empty response
return new HMsg[0];
}
// Tell the server to stop serving any user regions
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -45,16 +46,19 @@ implements RegionUnavailableListener, HConstants {
private HTable root = null;
private HTable meta = null;
private long startTime;
private volatile long startTime;
private final long frequency;
private final Integer lock = new Integer(0);
private HRegionServer server;
private HBaseConfiguration conf;
private final HRegionServer server;
private final HBaseConfiguration conf;
private final BlockingQueue<QueueEntry> compactionQueue =
new LinkedBlockingQueue<QueueEntry>();
private final BlockingQueue<HRegion> compactionQueue =
new LinkedBlockingQueue<HRegion>();
/** constructor */
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
/** @param server */
public CompactSplitThread(HRegionServer server) {
super();
this.server = server;
@ -68,19 +72,26 @@ implements RegionUnavailableListener, HConstants {
@Override
public void run() {
while (!server.isStopRequested()) {
QueueEntry e = null;
HRegion r = null;
try {
e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (r != null) {
synchronized (regionsInQueue) {
regionsInQueue.remove(r);
}
synchronized (lock) {
// Don't interrupt us while we are working
Text midKey = r.compactStores();
if (midKey != null) {
split(r, midKey);
}
}
}
e.getRegion().compactIfNeeded();
split(e.getRegion());
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
(r != null ? (" for region " + r.getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
@ -88,30 +99,35 @@ implements RegionUnavailableListener, HConstants {
} catch (Exception ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
(r != null ? (" for region " + r.getRegionName()) : ""),
ex);
if (!server.checkFileSystem()) {
break;
}
}
}
regionsInQueue.clear();
compactionQueue.clear();
LOG.info(getName() + " exiting");
}
/**
* @param e QueueEntry for region to be compacted
* @param r HRegion store belongs to
*/
public void compactionRequested(QueueEntry e) {
compactionQueue.add(e);
public synchronized void compactionRequested(HRegion r) {
LOG.debug("Compaction requested for region: " + r.getRegionName());
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
compactionQueue.add(r);
regionsInQueue.add(r);
}
}
}
void compactionRequested(final HRegion r) {
compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
}
private void split(final HRegion region) throws IOException {
private void split(final HRegion region, final Text midKey)
throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.splitRegion(this);
final HRegion[] newRegions = region.splitRegion(this, midKey);
if (newRegions == null) {
// Didn't need to be split
return;
@ -198,4 +214,13 @@ implements RegionUnavailableListener, HConstants {
server.getWriteLock().unlock();
}
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptPolitely() {
synchronized (lock) {
interrupt();
}
}
}

View File

@ -20,14 +20,16 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
import java.util.ConcurrentModificationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@ -35,60 +37,60 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** Flush cache upon request */
class Flusher extends Thread implements CacheFlushListener {
static final Log LOG = LogFactory.getLog(Flusher.class);
private final DelayQueue<QueueEntry> flushQueue =
new DelayQueue<QueueEntry>();
private final BlockingQueue<HRegion> flushQueue =
new LinkedBlockingQueue<HRegion>();
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
private final long threadWakeFrequency;
private final long optionalFlushPeriod;
private final HRegionServer server;
private final HBaseConfiguration conf;
private final Integer lock = new Integer(0);
/** constructor */
public Flusher(final HRegionServer server) {
/**
* @param conf
* @param server
*/
public Flusher(final HBaseConfiguration conf, final HRegionServer server) {
super();
this.server = server;
conf = server.conf;
this.optionalFlushPeriod = conf.getLong(
"hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
"hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
this.threadWakeFrequency = conf.getLong(
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
}
/** {@inheritDoc} */
@Override
public void run() {
long lastOptionalCheck = System.currentTimeMillis();
while (!server.isStopRequested()) {
QueueEntry e = null;
HRegion r = null;
try {
e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
synchronized(lock) { // Don't interrupt while we're working
if (e.getRegion().flushcache()) {
server.compactionRequested(e);
}
e.setExpirationTime(System.currentTimeMillis() +
optionalFlushPeriod);
flushQueue.add(e);
}
// Now ensure that all the active regions are in the queue
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion r: regions) {
e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
synchronized (flushQueue) {
if (!flushQueue.contains(e)) {
flushQueue.add(e);
long now = System.currentTimeMillis();
if (now - threadWakeFrequency > lastOptionalCheck) {
lastOptionalCheck = now;
// Queue up regions for optional flush if they need it
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion region: regions) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(region) &&
(now - optionalFlushPeriod) > region.getLastFlushTime()) {
regionsInQueue.add(region);
flushQueue.add(region);
region.setLastFlushTime(now);
}
}
}
}
// Now make sure that the queue only contains active regions
synchronized (flushQueue) {
for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) {
e = i.next();
if (!regions.contains(e.getRegion())) {
i.remove();
r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (r != null) {
synchronized (regionsInQueue) {
regionsInQueue.remove(r);
}
synchronized (lock) { // Don't interrupt while we're working
if (r.flushcache()) {
server.compactSplitThread.compactionRequested(r);
}
}
}
@ -108,32 +110,32 @@ class Flusher extends Thread implements CacheFlushListener {
server.stop();
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
(r != null ? (" for region " + r.getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Cache flush failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
(r != null ? (" for region " + r.getRegionName()) : ""),
ex);
if (!server.checkFileSystem()) {
break;
}
}
}
regionsInQueue.clear();
flushQueue.clear();
LOG.info(getName() + " exiting");
}
/** {@inheritDoc} */
public void flushRequested(HRegion region) {
QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
synchronized (flushQueue) {
if (flushQueue.contains(e)) {
flushQueue.remove(e);
public void flushRequested(HRegion r) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
regionsInQueue.add(r);
flushQueue.add(r);
}
flushQueue.add(e);
}
}

View File

@ -316,6 +316,7 @@ public class HRegion implements HConstants {
new ConcurrentHashMap<Long, TreeMap<HStoreKey, byte []>>();
final AtomicLong memcacheSize = new AtomicLong(0);
private volatile boolean flushRequested;
final Path basedir;
final HLog log;
@ -348,7 +349,6 @@ public class HRegion implements HConstants {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0);
private final Integer splitLock = new Integer(0);
private final long desiredMaxFileSize;
private final long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
@ -359,6 +359,8 @@ public class HRegion implements HConstants {
/**
* HRegion constructor.
*
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
* The log file is a logfile from the previous execution that's
@ -366,20 +368,19 @@ public class HRegion implements HConstants {
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
* @param fs is the filesystem.
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
* @param listener an object that implements CacheFlushListener or null
* @param flushListener an object that implements CacheFlushListener or null
* or null
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
throws IOException {
this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null);
HRegionInfo regionInfo, Path initialFiles,
CacheFlushListener flushListener) throws IOException {
this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
}
/**
@ -399,15 +400,15 @@ public class HRegion implements HConstants {
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
* @param listener an object that implements CacheFlushListener or null
* @param flushListener an object that implements CacheFlushListener or null
* @param reporter Call on a period so hosting server can report we're
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener,
final Progressable reporter)
HRegionInfo regionInfo, Path initialFiles,
CacheFlushListener flushListener, final Progressable reporter)
throws IOException {
this.basedir = basedir;
@ -415,6 +416,8 @@ public class HRegion implements HConstants {
this.fs = fs;
this.conf = conf;
this.regionInfo = regionInfo;
this.flushListener = flushListener;
this.flushRequested = false;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.regiondir = new Path(basedir, this.regionInfo.getEncodedName());
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
@ -466,14 +469,10 @@ public class HRegion implements HConstants {
// By default, we flush the cache when 64M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*64);
this.flushListener = listener;
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
// HRegion is ready to go!
this.writestate.compacting = false;
this.lastFlushTime = System.currentTimeMillis();
@ -543,7 +542,7 @@ public class HRegion implements HConstants {
// region.
writestate.writesEnabled = false;
LOG.debug("compactions and cache flushes disabled for region " +
regionName);
regionName);
while (writestate.compacting || writestate.flushing) {
LOG.debug("waiting for" +
(writestate.compacting ? " compaction" : "") +
@ -672,6 +671,11 @@ public class HRegion implements HConstants {
return this.lastFlushTime;
}
/** @param t the lastFlushTime */
void setLastFlushTime(long t) {
this.lastFlushTime = t;
}
//////////////////////////////////////////////////////////////////////////////
// HRegion maintenance.
//
@ -679,34 +683,16 @@ public class HRegion implements HConstants {
// upkeep.
//////////////////////////////////////////////////////////////////////////////
/**
* @param midkey
* @return returns size of largest HStore. Also returns whether store is
* splitable or not (Its not splitable if region has a store that has a
* reference store file).
*/
public HStoreSize largestHStore(Text midkey) {
HStoreSize biggest = null;
boolean splitable = true;
/** @return returns size of largest HStore. */
public long getLargestHStoreSize() {
long size = 0;
for (HStore h: stores.values()) {
HStoreSize size = h.size(midkey);
// If we came across a reference down in the store, then propagate
// fact that region is not splitable.
if (splitable) {
splitable = size.splitable;
}
if (biggest == null) {
biggest = size;
continue;
}
if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
biggest = size;
long storeSize = h.getSize();
if (storeSize > size) {
size = storeSize;
}
}
if (biggest != null) {
biggest.setSplitable(splitable);
}
return biggest;
return size;
}
/*
@ -715,21 +701,17 @@ public class HRegion implements HConstants {
* but instead create new 'reference' store files that read off the top and
* bottom ranges of parent store files.
* @param listener May be null.
* @param midKey key on which to split region
* @return two brand-new (and open) HRegions or null if a split is not needed
* @throws IOException
*/
HRegion[] splitRegion(final RegionUnavailableListener listener)
throws IOException {
HRegion[] splitRegion(final RegionUnavailableListener listener,
final Text midKey) throws IOException {
synchronized (splitLock) {
Text midKey = new Text();
if (closed.get() || !needsSplit(midKey)) {
if (closed.get()) {
return null;
}
Path splits = new Path(this.regiondir, SPLITDIR);
if(!this.fs.exists(splits)) {
this.fs.mkdirs(splits);
}
// Make copies just in case and add start/end key checking: hbase-428.
// Add start/end key checking: hbase-428.
Text startKey = new Text(this.regionInfo.getStartKey());
Text endKey = new Text(this.regionInfo.getEndKey());
if (startKey.equals(midKey)) {
@ -740,6 +722,11 @@ public class HRegion implements HConstants {
LOG.debug("Endkey and midkey are same, not splitting");
return null;
}
LOG.info("Starting split of region " + getRegionName());
Path splits = new Path(this.regiondir, SPLITDIR);
if(!this.fs.exists(splits)) {
this.fs.mkdirs(splits);
}
HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
startKey, midKey);
Path dirA = new Path(splits, regionAInfo.getEncodedName());
@ -805,71 +792,6 @@ public class HRegion implements HConstants {
}
}
/*
* Iterates through all the HStores and finds the one with the largest
* MapFile size. If the size is greater than the (currently hard-coded)
* threshold, returns true indicating that the region should be split. The
* midKey for the largest MapFile is returned through the midKey parameter.
* It is possible for us to rule the region non-splitable even in excess of
* configured size. This happens if region contains a reference file. If
* a reference file, the region can not be split.
*
* Note that there is no need to do locking in this method because it calls
* largestHStore which does the necessary locking.
*
* @param midKey midKey of the largest MapFile
* @return true if the region should be split. midKey is set by this method.
* Check it for a midKey value on return.
*/
boolean needsSplit(Text midKey) {
HStoreSize biggest = largestHStore(midKey);
if (biggest == null || midKey.getLength() == 0 ||
(midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) {
return false;
}
boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize);
if (split) {
if (!biggest.isSplitable()) {
LOG.warn("Region " + getRegionName().toString() +
" is NOT splitable though its aggregate size is " +
StringUtils.humanReadableInt(biggest.getAggregate()) +
" and desired size is " +
StringUtils.humanReadableInt(this.desiredMaxFileSize));
split = false;
} else {
LOG.info("Splitting " + getRegionName().toString() +
" because largest aggregate size is " +
StringUtils.humanReadableInt(biggest.getAggregate()) +
" and desired size is " +
StringUtils.humanReadableInt(this.desiredMaxFileSize));
}
}
return split;
}
/**
* Only do a compaction if it is necessary
*
* @return whether or not there was a compaction
* @throws IOException
*/
public boolean compactIfNeeded() throws IOException {
boolean needsCompaction = false;
for (HStore store: stores.values()) {
if (store.needsCompaction()) {
needsCompaction = true;
if (LOG.isDebugEnabled()) {
LOG.debug(store.toString() + " needs compaction");
}
break;
}
}
if (!needsCompaction) {
return false;
}
return compactStores();
}
/*
* @param dir
* @return compaction directory for the passed in <code>dir</code>
@ -893,59 +815,53 @@ public class HRegion implements HConstants {
*/
private void doRegionCompactionCleanup() throws IOException {
if (this.fs.exists(this.regionCompactionDir)) {
this.fs.delete(this.regionCompactionDir);
FileUtil.fullyDelete(this.fs, this.regionCompactionDir);
}
}
/**
* Compact all the stores. This should be called periodically to make sure
* the stores are kept manageable.
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
*
* <p>This operation could block for a long time, so don't call it from a
* time-sensitive thread.
*
* @return Returns TRUE if the compaction has completed. FALSE, if the
* compaction was not carried out, because the HRegion is busy doing
* something else storage-intensive (like flushing the cache). The caller
* should check back later.
*
* Note that no locking is necessary at this level because compaction only
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*
* @return mid key if split is needed
* @throws IOException
*/
public boolean compactStores() throws IOException {
public Text compactStores() throws IOException {
Text midKey = null;
if (this.closed.get()) {
return false;
return midKey;
}
try {
synchronized (writestate) {
if (!writestate.compacting && writestate.writesEnabled) {
writestate.compacting = true;
} else {
LOG.info("NOT compacting region " +
this.regionInfo.getRegionName().toString() + ": compacting=" +
writestate.compacting + ", writesEnabled=" +
LOG.info("NOT compacting region " + getRegionName() +
": compacting=" + writestate.compacting + ", writesEnabled=" +
writestate.writesEnabled);
return false;
return midKey;
}
}
LOG.info("starting compaction on region " + getRegionName());
long startTime = System.currentTimeMillis();
LOG.info("starting compaction on region " +
this.regionInfo.getRegionName().toString());
boolean status = true;
doRegionCompactionPrep();
for (HStore store : stores.values()) {
if(!store.compact()) {
status = false;
for (HStore store: stores.values()) {
Text key = store.compact();
if (key != null && midKey == null) {
midKey = key;
}
}
doRegionCompactionCleanup();
LOG.info("compaction completed on region " +
this.regionInfo.getRegionName().toString() + ". Took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
return status;
LOG.info("compaction completed on region " + getRegionName() +
". Took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
} finally {
synchronized (writestate) {
@ -953,6 +869,7 @@ public class HRegion implements HConstants {
writestate.notifyAll();
}
}
return midKey;
}
/**
@ -1030,6 +947,10 @@ public class HRegion implements HConstants {
// will add to the unflushed size
this.memcacheSize.set(0L);
this.flushRequested = false;
// Record latest flush time
this.lastFlushTime = System.currentTimeMillis();
for (HStore hstore: stores.values()) {
hstore.snapshotMemcache();
@ -1121,11 +1042,12 @@ public class HRegion implements HConstants {
this.log.completeCacheFlush(this.regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), sequenceId);
// D. Finally notify anyone waiting on memcache to clear:
// C. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
synchronized (this) {
notifyAll();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished memcache flush for region " +
this.regionInfo.getRegionName() + " in " +
@ -1374,8 +1296,8 @@ public class HRegion implements HConstants {
Text row = b.getRow();
long lockid = obtainRowLock(row);
long commitTime =
(b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp();
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
System.currentTimeMillis() : b.getTimestamp();
try {
List<Text> deletes = null;
@ -1612,9 +1534,11 @@ public class HRegion implements HConstants {
(val == null ? 0 : val.length));
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
}
if (this.flushListener != null && size > this.memcacheFlushSize) {
if (this.flushListener != null && !this.flushRequested &&
size > this.memcacheFlushSize) {
// Request a cache flush
this.flushListener.flushRequested(this);
this.flushRequested = true;
}
}
}
@ -1730,6 +1654,18 @@ public class HRegion implements HConstants {
}
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
return this.hashCode() == ((HRegion)o).hashCode();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
return this.regionInfo.getRegionName().hashCode();
}
/** {@inheritDoc} */
@Override
public String toString() {
@ -2011,8 +1947,7 @@ public class HRegion implements HConstants {
* @throws IOException
*/
public static void removeRegionFromMETA(final HRegionInterface srvr,
final Text metaRegionName, final Text regionName)
throws IOException {
final Text metaRegionName, final Text regionName) throws IOException {
srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
}
@ -2025,8 +1960,7 @@ public class HRegion implements HConstants {
* @throws IOException
*/
public static void offlineRegionInMETA(final HRegionInterface srvr,
final Text metaRegionName, final HRegionInfo info)
throws IOException {
final Text metaRegionName, final HRegionInfo info) throws IOException {
BatchUpdate b = new BatchUpdate(info.getRegionName());
info.setOffline(true);
b.put(COL_REGIONINFO, Writables.getBytes(info));

View File

@ -220,7 +220,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
conf.getInt("hbase.master.lease.period", 30 * 1000);
// Cache flushing thread.
this.cacheFlusher = new Flusher(this);
this.cacheFlusher = new Flusher(conf, this);
// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
@ -295,6 +295,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
LOG.info("Server quiesced and not serving any regions. " +
"Starting shutdown");
stopRequested.set(true);
this.outboundMsgs.clear();
continue;
}
@ -412,7 +413,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
cacheFlusher.interruptPolitely();
compactSplitThread.interrupt();
compactSplitThread.interruptPolitely();
synchronized (logRollerLock) {
this.logRoller.interrupt();
}
@ -828,8 +829,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} finally {
this.lock.writeLock().unlock();
}
reportOpen(regionInfo);
}
reportOpen(regionInfo);
}
/*
@ -1228,10 +1229,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return lock.writeLock();
}
void compactionRequested(QueueEntry e) {
compactSplitThread.compactionRequested(e);
}
/**
* @return Immutable list of this servers regions.
*/

View File

@ -24,15 +24,11 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
@ -44,7 +40,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@ -53,7 +48,6 @@ import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.BloomFilterDescriptor;
@ -69,7 +63,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
/**
* HStore maintains a bunch of data files. It is responsible for maintaining
@ -87,7 +80,7 @@ public class HStore implements HConstants {
* If reference, then the regex has more than just one group. Group 1 is
* this files id. Group 2 the referenced region name, etc.
*/
private static Pattern REF_NAME_PARSER =
private static final Pattern REF_NAME_PARSER =
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
private static final String BLOOMFILTER_FILE_NAME = "filter";
@ -101,15 +94,16 @@ public class HStore implements HConstants {
private final HBaseConfiguration conf;
private final Path filterDir;
final Filter bloomFilter;
private final Path compactionDir;
private final Integer compactLock = new Integer(0);
private final long desiredMaxFileSize;
private volatile long storeSize;
private final Integer flushLock = new Integer(0);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final AtomicInteger activeScanners = new AtomicInteger(0);
final String storeName;
final Text storeName;
/*
* Sorted Map of readers keyed by sequence id (Most recent should be last in
@ -125,8 +119,15 @@ public class HStore implements HConstants {
private final SortedMap<Long, MapFile.Reader> readers =
new TreeMap<Long, MapFile.Reader>();
// The most-recent log-seq-ID that's present. The most-recent such ID means
// we can ignore all log messages up to and including that ID (because they're
// already reflected in the TreeMaps).
private volatile long maxSeqId;
private final Path compactionDir;
private final Integer compactLock = new Integer(0);
private final int compactionThreshold;
private final ReentrantReadWriteLock newScannerLock =
new ReentrantReadWriteLock();
@ -177,7 +178,17 @@ public class HStore implements HConstants {
this.compactionDir = HRegion.getCompactionDir(basedir);
this.storeName =
this.info.getEncodedName() + "/" + this.family.getFamilyName();
new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName());
// By default, we compact if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold =
conf.getInt("hbase.hstore.compactionThreshold", 3);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
this.storeSize = 0L;
if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
this.compression = SequenceFile.CompressionType.BLOCK;
@ -219,21 +230,10 @@ public class HStore implements HConstants {
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
// corresponding one in 'loginfodir'. Without a corresponding log info
// file, the entry in 'mapdir' must be deleted.
List<HStoreFile> hstoreFiles = loadHStoreFiles(infodir, mapdir);
for(HStoreFile hsf: hstoreFiles) {
this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
}
// loadHStoreFiles also computes the max sequence id
this.maxSeqId = -1L;
this.storefiles.putAll(loadHStoreFiles(infodir, mapdir));
// Now go through all the HSTORE_LOGINFOFILEs and figure out the
// most-recent log-seq-ID that's present. The most-recent such ID means we
// can ignore all log messages up to and including that ID (because they're
// already reflected in the TreeMaps).
//
// If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That
// means it was built prior to the previous run of HStore, and so it cannot
// contain any updates also contained in the log.
this.maxSeqId = getMaxSequenceId(hstoreFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("maximum sequence id for hstore " + storeName + " is " +
this.maxSeqId);
@ -250,16 +250,6 @@ public class HStore implements HConstants {
" -- continuing. Probably DATA LOSS!", e);
}
// By default, we compact if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold =
conf.getInt("hbase.hstore.compactionThreshold", 3);
// We used to compact in here before bringing the store online. Instead
// get it online quick even if it needs compactions so we can start
// taking updates as soon as possible (Once online, can take updates even
// during a compaction).
// Move maxSeqId on by one. Why here? And not in HRegion?
this.maxSeqId += 1;
@ -276,28 +266,13 @@ public class HStore implements HConstants {
first = false;
} else {
this.readers.put(e.getKey(),
e.getValue().getReader(this.fs, this.bloomFilter));
e.getValue().getReader(this.fs, this.bloomFilter));
}
}
}
/*
* @param hstoreFiles
* @return Maximum sequence number found or -1.
* @throws IOException
*/
private long getMaxSequenceId(final List<HStoreFile> hstoreFiles)
throws IOException {
long maxSeqID = -1;
for (HStoreFile hsf : hstoreFiles) {
long seqid = hsf.loadInfo(fs);
if (seqid > 0) {
if (seqid > maxSeqID) {
maxSeqID = seqid;
}
}
}
return maxSeqID;
HColumnDescriptor getFamily() {
return this.family;
}
long getMaxSequenceId() {
@ -388,7 +363,7 @@ public class HStore implements HConstants {
* @param mapdir qualified path for map file directory
* @throws IOException
*/
private List<HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
private SortedMap<Long, HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("infodir: " + infodir.toString() + " mapdir: " +
@ -397,7 +372,7 @@ public class HStore implements HConstants {
// Look first at info files. If a reference, these contain info we need
// to create the HStoreFile.
FileStatus infofiles[] = fs.listStatus(infodir);
ArrayList<HStoreFile> results = new ArrayList<HStoreFile>(infofiles.length);
SortedMap<Long, HStoreFile> results = new TreeMap<Long, HStoreFile>();
ArrayList<Path> mapfiles = new ArrayList<Path>(infofiles.length);
for (int i = 0; i < infofiles.length; i++) {
Path p = infofiles[i].getPath();
@ -414,6 +389,11 @@ public class HStore implements HConstants {
boolean isReference = isReference(p, m);
long fid = Long.parseLong(m.group(1));
if (LOG.isDebugEnabled()) {
LOG.debug("loading file " + p.toString() + ", isReference=" +
isReference + ", file id=" + fid);
}
HStoreFile curfile = null;
HStoreFile.Reference reference = null;
if (isReference) {
@ -421,6 +401,22 @@ public class HStore implements HConstants {
}
curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(),
family.getFamilyName(), fid, reference);
storeSize += curfile.length();
long storeSeqId = -1;
try {
storeSeqId = curfile.loadInfo(fs);
if (storeSeqId > this.maxSeqId) {
this.maxSeqId = storeSeqId;
}
} catch (IOException e) {
// If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it.
// That means it was built prior to the previous run of HStore, and so
// it cannot contain any updates also contained in the log.
LOG.info("HSTORE_LOGINFOFILE " + curfile +
" does not contain a sequence number - ignoring");
}
Path mapfile = curfile.getMapFilePath();
if (!fs.exists(mapfile)) {
fs.delete(curfile.getInfoFilePath());
@ -432,7 +428,7 @@ public class HStore implements HConstants {
// TODO: Confirm referent exists.
// Found map and sympathetic info file. Add this hstorefile to result.
results.add(curfile);
results.put(storeSeqId, curfile);
// Keep list of sympathetic data mapfiles for cleaning info dir in next
// section. Make sure path is fully qualified for compare.
mapfiles.add(mapfile);
@ -581,9 +577,7 @@ public class HStore implements HConstants {
for (MapFile.Reader reader: this.readers.values()) {
reader.close();
}
this.readers.clear();
result = new ArrayList<HStoreFile>(storefiles.values());
this.storefiles.clear();
LOG.debug("closed " + this.storeName);
return result;
} finally {
@ -591,7 +585,6 @@ public class HStore implements HConstants {
}
}
//////////////////////////////////////////////////////////////////////////////
// Flush changes to disk
//////////////////////////////////////////////////////////////////////////////
@ -627,10 +620,10 @@ public class HStore implements HConstants {
synchronized(flushLock) {
// A. Write the Maps out to the disk
HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
info.getEncodedName(), family.getFamilyName(), -1L, null);
info.getEncodedName(), family.getFamilyName(), -1L, null);
String name = flushedFile.toString();
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
this.bloomFilter);
this.bloomFilter);
// Here we tried picking up an existing HStoreFile from disk and
// interlacing the memcache flush compacting as we go. The notion was
@ -644,18 +637,23 @@ public class HStore implements HConstants {
// Related, looks like 'merging compactions' in BigTable paper interlaces
// a memcache flush. We don't.
int entries = 0;
long cacheSize = 0;
try {
for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
HStoreKey curkey = es.getKey();
byte[] bytes = es.getValue();
TextSequence f = HStoreKey.extractFamily(curkey.getColumn());
if (f.equals(this.family.getFamilyName())) {
entries++;
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
out.append(curkey, new ImmutableBytesWritable(bytes));
cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0);
}
}
} finally {
out.close();
}
long newStoreSize = flushedFile.length();
storeSize += newStoreSize;
// B. Write out the log sequence number that corresponds to this output
// MapFile. The MapFile is current up to and including the log seq num.
@ -676,14 +674,14 @@ public class HStore implements HConstants {
this.storefiles.put(flushid, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("Added " + name + " with " + entries +
" entries, sequence id " + logCacheFlushId + ", and size " +
StringUtils.humanReadableInt(flushedFile.length()) + " for " +
" entries, sequence id " + logCacheFlushId + ", data size " +
StringUtils.humanReadableInt(cacheSize) + ", file size " +
StringUtils.humanReadableInt(newStoreSize) + " for " +
this.storeName);
}
} finally {
this.lock.writeLock().unlock();
}
return;
}
}
@ -691,28 +689,6 @@ public class HStore implements HConstants {
// Compaction
//////////////////////////////////////////////////////////////////////////////
/**
* @return True if this store needs compaction.
*/
boolean needsCompaction() {
return this.storefiles != null &&
(this.storefiles.size() >= this.compactionThreshold || hasReferences());
}
/*
* @return True if this store has references.
*/
private boolean hasReferences() {
if (this.storefiles != null) {
for (HStoreFile hsf: this.storefiles.values()) {
if (hsf.isReference()) {
return true;
}
}
}
return false;
}
/**
* Compact the back-HStores. This method may take some time, so the calling
* thread must be able to block for long periods.
@ -728,42 +704,66 @@ public class HStore implements HConstants {
*
* We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
* @throws IOException
*
* @return true if compaction completed successfully
* @return mid key if a split is needed, null otherwise
* @throws IOException
*/
boolean compact() throws IOException {
Text compact() throws IOException {
synchronized (compactLock) {
if (LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + storefiles.size() +
" files using " + compactionDir.toString() + " for " +
this.storeName);
}
// Storefiles are keyed by sequence id. The oldest file comes first.
// We need to return out of here a List that has the newest file first.
List<HStoreFile> filesToCompact =
new ArrayList<HStoreFile>(this.storefiles.values());
Collections.reverse(filesToCompact);
if (filesToCompact.size() < 1 ||
(filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) {
if (LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
long maxId = -1;
List<HStoreFile> filesToCompact = null;
synchronized (storefiles) {
filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
if (filesToCompact.size() < 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + this.storeName +
" because no store files to compact.");
}
return checkSplit();
} else if (filesToCompact.size() == 1) {
if (!filesToCompact.get(0).isReference()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + this.storeName +
" because only one store file and it is not a reference");
}
return checkSplit();
}
} else if (filesToCompact.size() < compactionThreshold) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + this.storeName +
" because number of stores " + filesToCompact.size() +
" < compaction threshold " + compactionThreshold);
}
return checkSplit();
}
return false;
}
if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
return false;
if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
return checkSplit();
}
if (LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + filesToCompact.size() +
" files using " + compactionDir.toString() + " for " +
this.storeName);
}
// Storefiles are keyed by sequence id. The oldest file comes first.
// We need to return out of here a List that has the newest file first.
Collections.reverse(filesToCompact);
// The max-sequenceID in any of the to-be-compacted TreeMaps is the
// last key of storefiles.
maxId = this.storefiles.lastKey();
}
// Step through them, writing to the brand-new MapFile
HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
this.compactionDir, info.getEncodedName(), family.getFamilyName(),
-1L, null);
this.compactionDir, info.getEncodedName(), family.getFamilyName(),
-1L, null);
MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
this.compression, this.bloomFilter);
this.compression, this.bloomFilter);
try {
compactHStoreFiles(compactedOut, filesToCompact);
} finally {
@ -771,14 +771,17 @@ public class HStore implements HConstants {
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
// Compute max-sequenceID seen in any of the to-be-compacted TreeMaps.
long maxId = getMaxSequenceId(filesToCompact);
compactedOutputFile.writeInfo(fs, maxId);
// Move the compaction into place.
completeCompaction(filesToCompact, compactedOutputFile);
return true;
if (LOG.isDebugEnabled()) {
LOG.debug("Completed compaction of " + this.storeName +
" store size is " + StringUtils.humanReadableInt(storeSize));
}
}
return checkSplit();
}
/*
@ -975,11 +978,11 @@ public class HStore implements HConstants {
* <pre>
* 1) Wait for active scanners to exit
* 2) Acquiring the write-lock
* 3) Figuring out what MapFiles are going to be replaced
* 4) Moving the new compacted MapFile into place
* 5) Unloading all the replaced MapFiles.
* 6) Deleting all the old MapFile files.
* 7) Loading the new TreeMap.
* 3) Moving the new compacted MapFile into place
* 4) Unloading all the replaced MapFiles and close.
* 5) Deleting all the replaced MapFile files.
* 6) Loading the new TreeMap.
* 7) Compute new store size
* 8) Releasing the write-lock
* 9) Allow new scanners to proceed.
* </pre>
@ -1027,46 +1030,53 @@ public class HStore implements HConstants {
// 4. and 5. Unload all the replaced MapFiles, close and delete.
List<Long> toDelete = new ArrayList<Long>();
for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
if (!compactedFiles.contains(e.getValue())) {
continue;
}
Long key = e.getKey();
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
toDelete.add(key);
}
try {
for (Long key: toDelete) {
HStoreFile hsf = this.storefiles.remove(key);
hsf.delete();
synchronized (storefiles) {
List<Long> toDelete = new ArrayList<Long>();
for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
if (!compactedFiles.contains(e.getValue())) {
continue;
}
Long key = e.getKey();
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
toDelete.add(key);
}
// 6. Loading the new TreeMap.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal,
// Use a block cache (if configured) for this reader since
// it is the only one.
finalCompactedFile.getReader(this.fs, this.bloomFilter,
family.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files for " + this.storeName +
". Compacted file is " + finalCompactedFile.toString() +
". Files replaced are " + compactedFiles.toString() +
" some of which may have been already removed", e);
try {
for (Long key: toDelete) {
HStoreFile hsf = this.storefiles.remove(key);
hsf.delete();
}
// 6. Loading the new TreeMap.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal,
// Use a block cache (if configured) for this reader since
// it is the only one.
finalCompactedFile.getReader(this.fs, this.bloomFilter,
family.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files for " + this.storeName +
". Compacted file is " + finalCompactedFile.toString() +
". Files replaced are " + compactedFiles.toString() +
" some of which may have been already removed", e);
}
// 7. Compute new store size
storeSize = 0L;
for (HStoreFile hsf: storefiles.values()) {
storeSize += hsf.length();
}
}
} finally {
// 7. Releasing the write-lock
// 8. Releasing the write-lock
this.lock.writeLock().unlock();
}
} finally {
// 8. Allow new scanners to proceed.
// 9. Allow new scanners to proceed.
newScannerLock.writeLock().unlock();
}
}
@ -1304,7 +1314,7 @@ public class HStore implements HConstants {
do{
// if the row matches, we might want this one.
if(rowMatches(origin, readkey)){
if (rowMatches(origin, readkey)) {
// if the cell matches, then we definitely want this key.
if (cellMatches(origin, readkey)) {
// store the key if it isn't deleted or superceeded by what's
@ -1323,11 +1333,11 @@ public class HStore implements HConstants {
// timestamps, so move to the next key
continue;
}
} else{
} else {
// the row doesn't match, so we've gone too far.
break;
}
}while(map.next(readkey, readval)); // advance to the next key
} while (map.next(readkey, readval)); // advance to the next key
}
}
@ -1572,69 +1582,75 @@ public class HStore implements HConstants {
}
/**
* Gets size for the store.
* Determines if HStore can be split
*
* @param midKey Gets set to the middle key of the largest splitable store
* file or its set to empty if largest is not splitable.
* @return Sizes for the store and the passed <code>midKey</code> is
* set to midKey of largest splitable. Otherwise, its set to empty
* to indicate we couldn't find a midkey to split on
* @return midKey if store can be split, null otherwise
*/
HStoreSize size(Text midKey) {
long maxSize = 0L;
long aggregateSize = 0L;
// Not splitable if we find a reference store file present in the store.
boolean splitable = true;
Text checkSplit() {
if (this.storefiles.size() <= 0) {
return new HStoreSize(0, 0, splitable);
return null;
}
if (storeSize < this.desiredMaxFileSize) {
return null;
}
this.lock.readLock().lock();
try {
// Not splitable if we find a reference store file present in the store.
boolean splitable = true;
long maxSize = 0L;
Long mapIndex = Long.valueOf(0L);
// Iterate through all the MapFiles
for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
HStoreFile curHSF = e.getValue();
long size = curHSF.length();
aggregateSize += size;
if (maxSize == 0L || size > maxSize) {
// This is the largest one so far
maxSize = size;
mapIndex = e.getKey();
}
if (splitable) {
splitable = !curHSF.isReference();
synchronized (storefiles) {
for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
HStoreFile curHSF = e.getValue();
long size = curHSF.length();
if (size > maxSize) {
// This is the largest one so far
maxSize = size;
mapIndex = e.getKey();
}
if (splitable) {
splitable = !curHSF.isReference();
}
}
}
if (splitable) {
MapFile.Reader r = this.readers.get(mapIndex);
// seek back to the beginning of mapfile
r.reset();
// get the first and last keys
HStoreKey firstKey = new HStoreKey();
HStoreKey lastKey = new HStoreKey();
Writable value = new ImmutableBytesWritable();
r.next(firstKey, value);
r.finalKey(lastKey);
// get the midkey
HStoreKey mk = (HStoreKey)r.midKey();
if (mk != null) {
// if the midkey is the same as the first and last keys, then we cannot
// (ever) split this region.
if (mk.getRow().equals(firstKey.getRow()) &&
mk.getRow().equals(lastKey.getRow())) {
return new HStoreSize(aggregateSize, maxSize, false);
}
// Otherwise, set midKey
midKey.set(mk.getRow());
if (!splitable) {
return null;
}
MapFile.Reader r = this.readers.get(mapIndex);
// seek back to the beginning of mapfile
r.reset();
// get the first and last keys
HStoreKey firstKey = new HStoreKey();
HStoreKey lastKey = new HStoreKey();
Writable value = new ImmutableBytesWritable();
r.next(firstKey, value);
r.finalKey(lastKey);
// get the midkey
HStoreKey mk = (HStoreKey)r.midKey();
if (mk != null) {
// if the midkey is the same as the first and last keys, then we cannot
// (ever) split this region.
if (mk.getRow().equals(firstKey.getRow()) &&
mk.getRow().equals(lastKey.getRow())) {
return null;
}
return mk.getRow();
}
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this.storeName, e);
} finally {
this.lock.readLock().unlock();
}
return new HStoreSize(aggregateSize, maxSize, splitable);
return null;
}
/** @return aggregate size of HStore */
public long getSize() {
return storeSize;
}
//////////////////////////////////////////////////////////////////////////////
@ -1665,7 +1681,7 @@ public class HStore implements HConstants {
/** {@inheritDoc} */
@Override
public String toString() {
return this.storeName;
return this.storeName.toString();
}
/*

View File

@ -1,33 +0,0 @@
package org.apache.hadoop.hbase.regionserver;
/*
* Data structure to hold result of a look at store file sizes.
*/
public class HStoreSize {
final long aggregate;
final long largest;
boolean splitable;
HStoreSize(final long a, final long l, final boolean s) {
this.aggregate = a;
this.largest = l;
this.splitable = s;
}
public long getAggregate() {
return this.aggregate;
}
public long getLargest() {
return this.largest;
}
public boolean isSplitable() {
return this.splitable;
}
public void setSplitable(final boolean s) {
this.splitable = s;
}
}

View File

@ -28,7 +28,6 @@ import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -196,12 +195,9 @@ class Memcache {
/**
* @param row
* @param timestamp
* @return the key that matches <i>row</i> exactly, or the one that
* immediately preceeds it.
*/
void getRowKeyAtOrBefore(final Text row,
SortedMap<HStoreKey, Long> candidateKeys)
throws IOException {
SortedMap<HStoreKey, Long> candidateKeys) {
this.lock.readLock().lock();
try {

View File

@ -1,78 +0,0 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Delayed;
/** Queue entry passed to flusher, compactor and splitter threads */
class QueueEntry implements Delayed {
private final HRegion region;
private long expirationTime;
QueueEntry(HRegion region, long expirationTime) {
this.region = region;
this.expirationTime = expirationTime;
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
QueueEntry other = (QueueEntry) o;
return this.hashCode() == other.hashCode();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
return this.region.getRegionInfo().hashCode();
}
/** {@inheritDoc} */
public long getDelay(TimeUnit unit) {
return unit.convert(this.expirationTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
public int compareTo(Delayed o) {
long delta = this.getDelay(TimeUnit.MILLISECONDS) -
o.getDelay(TimeUnit.MILLISECONDS);
int value = 0;
if (delta > 0) {
value = 1;
} else if (delta < 0) {
value = -1;
}
return value;
}
/** @return the region */
public HRegion getRegion() {
return region;
}
/** @param expirationTime the expirationTime to set */
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}

View File

@ -35,12 +35,13 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.client.HTable;
/**
@ -316,13 +317,16 @@ public class MetaUtils {
throws IOException {
HTable t = new HTable(c, HConstants.META_TABLE_NAME);
Cell cell = t.get(row, HConstants.COL_REGIONINFO);
if (cell == null) {
throw new IOException("no information for row " + row);
}
// Throws exception if null.
HRegionInfo info = Writables.getHRegionInfo(cell);
long id = t.startUpdate(row);
BatchUpdate b = new BatchUpdate(row);
info.setOffline(onlineOffline);
t.put(id, HConstants.COL_REGIONINFO, Writables.getBytes(info));
t.delete(id, HConstants.COL_SERVER);
t.delete(id, HConstants.COL_STARTCODE);
t.commit(id);
b.put(HConstants.COL_REGIONINFO, Writables.getBytes(info));
b.delete(HConstants.COL_SERVER);
b.delete(HConstants.COL_STARTCODE);
t.commit(b);
}
}

View File

@ -45,6 +45,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
protected int regionServers;
protected boolean startDfs;
/** default constructor */
public HBaseClusterTestCase() {
this(1);
}
@ -53,6 +54,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
* Start a MiniHBaseCluster with regionServers region servers in-process to
* start with. Also, start a MiniDfsCluster before starting the hbase cluster.
* The configuration used will be edited so that this works correctly.
* @param regionServers number of region servers to start.
*/
public HBaseClusterTestCase(int regionServers) {
this(regionServers, true);
@ -65,6 +67,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
* configured in hbase-site.xml and is already started, or you have started a
* MiniDFSCluster on your own and edited the configuration in memory. (You
* can modify the config used by overriding the preHBaseClusterSetup method.)
* @param regionServers number of region servers to start.
* @param startDfs set to true if MiniDFS should be started
*/
public HBaseClusterTestCase(int regionServers, boolean startDfs) {
super();
@ -81,9 +85,11 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
/**
* Actually start the MiniHBase instance.
*/
@SuppressWarnings("unused")
protected void HBaseClusterSetup() throws Exception {
// start the mini cluster
this.cluster = new MiniHBaseCluster(conf, regionServers);
// opening the META table ensures that cluster is running
HTable meta = new HTable(conf, new Text(".META."));
}

View File

@ -102,11 +102,8 @@ public abstract class HBaseTestCase extends TestCase {
localfs =
(conf.get("fs.default.name", "file:///").compareTo("file::///") == 0);
try {
if (fs == null) {
this.fs = FileSystem.get(conf);
} catch (IOException e) {
LOG.fatal("error getting file system", e);
throw e;
}
try {
if (localfs) {
@ -509,54 +506,88 @@ public abstract class HBaseTestCase extends TestCase {
*/
public static class HTableIncommon implements Incommon {
final HTable table;
private BatchUpdate batch;
private void checkBatch() {
if (batch == null) {
throw new IllegalStateException("No batch update in progress.");
}
}
/**
* @param table
*/
public HTableIncommon(final HTable table) {
super();
this.table = table;
this.batch = null;
}
/** {@inheritDoc} */
public void abort(long lockid) {
this.table.abort(lockid);
public void abort(@SuppressWarnings("unused") long lockid) {
if (this.batch != null) {
this.batch = null;
}
}
/** {@inheritDoc} */
public void commit(long lockid) throws IOException {
this.table.commit(lockid);
public void commit(@SuppressWarnings("unused") long lockid)
throws IOException {
checkBatch();
this.table.commit(batch);
this.batch = null;
}
/** {@inheritDoc} */
public void commit(long lockid, final long ts) throws IOException {
this.table.commit(lockid, ts);
public void commit(@SuppressWarnings("unused") long lockid, final long ts)
throws IOException {
checkBatch();
this.batch.setTimestamp(ts);
this.table.commit(batch);
this.batch = null;
}
/** {@inheritDoc} */
public void put(long lockid, Text column, byte[] val) {
this.table.put(lockid, column, val);
public void put(@SuppressWarnings("unused") long lockid, Text column,
byte[] val) {
checkBatch();
this.batch.put(column, val);
}
/** {@inheritDoc} */
public void delete(long lockid, Text column) {
this.table.delete(lockid, column);
public void delete(@SuppressWarnings("unused") long lockid, Text column) {
checkBatch();
this.batch.delete(column);
}
/** {@inheritDoc} */
public void deleteAll(Text row, Text column, long ts) throws IOException {
this.table.deleteAll(row, column, ts);
}
/** {@inheritDoc} */
public long startUpdate(Text row) {
return this.table.startUpdate(row);
if (this.batch != null) {
throw new IllegalStateException("Batch update already in progress.");
}
this.batch = new BatchUpdate(row);
return 0L;
}
/** {@inheritDoc} */
public HScannerInterface getScanner(Text [] columns, Text firstRow,
long ts) throws IOException {
return this.table.obtainScanner(columns, firstRow, ts, null);
}
/** {@inheritDoc} */
public Cell get(Text row, Text column) throws IOException {
return this.table.get(row, column);
}
/** {@inheritDoc} */
public Cell[] get(Text row, Text column, int versions) throws IOException {
return this.table.get(row, column, versions);
}
/** {@inheritDoc} */
public Cell[] get(Text row, Text column, long ts, int versions)
throws IOException {
@ -576,8 +607,10 @@ public abstract class HBaseTestCase extends TestCase {
fail(column.toString() + " at timestamp " + timestamp +
"\" was expected to be \"" + value + " but was null");
}
assertEquals(column.toString() + " at timestamp "
+ timestamp, value, new String(cell_value.getValue()));
if (cell_value != null) {
assertEquals(column.toString() + " at timestamp "
+ timestamp, value, new String(cell_value.getValue()));
}
}
}
}

View File

@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate;
* Test HBase Master and Region servers, client API
*/
public class TestHBaseCluster extends HBaseClusterTestCase {
private static final Log LOG = LogFactory.getLog(TestHBaseCluster.class);
private HTableDescriptor desc;
private HBaseAdmin admin;
@ -104,7 +107,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
(ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING));
table.commit(b);
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
LOG.info("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Read them back in
@ -134,7 +137,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
teststr.compareTo(bodystr) == 0);
}
System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
LOG.info("Read " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
}
@ -175,7 +178,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
anchorFetched++;
} else {
System.out.println(col);
LOG.info(col);
}
}
curVals.clear();
@ -184,7 +187,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched);
assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched);
System.out.println("Scanned " + NUM_VALS
LOG.info("Scanned " + NUM_VALS
+ " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));

View File

@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.StaticTestEnvironment;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@ -76,9 +76,9 @@ public class TestTableIndex extends MultiRegionTable {
private HTableDescriptor desc;
private JobConf jobConf = null;
private Path dir;
/** default constructor */
public TestTableIndex() {
// Enable DEBUG-level MR logging.
Logger.getLogger("org.apache.hadoop.mapred").setLevel(Level.DEBUG);
@ -105,7 +105,6 @@ public class TestTableIndex extends MultiRegionTable {
// Create a table.
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
// Populate a table into multiple regions
makeMultiRegionTable(conf, cluster, dfsCluster.getFileSystem(), TABLE_NAME,
INPUT_COLUMN);
@ -116,6 +115,14 @@ public class TestTableIndex extends MultiRegionTable {
assertTrue(startKeys.length > 1);
}
/** {@inheritDoc} */
@Override
public void tearDown() throws Exception {
if (jobConf != null) {
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
}
}
/**
* Test HBase map/reduce
*
@ -135,7 +142,7 @@ public class TestTableIndex extends MultiRegionTable {
conf.set("hbase.index.conf", createIndexConfContent());
try {
JobConf jobConf = new JobConf(conf, TestTableIndex.class);
jobConf = new JobConf(conf, TestTableIndex.class);
jobConf.setJobName("index column contents");
jobConf.setNumMapTasks(2);
// number of indexes to partition into

View File

@ -20,13 +20,14 @@
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.StaticTestEnvironment;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapWritable;
@ -66,8 +66,6 @@ public class TestTableMapReduce extends MultiRegionTable {
TEXT_OUTPUT_COLUMN
};
private Path dir;
private static byte[][] values = null;
static {
@ -193,8 +191,9 @@ public class TestTableMapReduce extends MultiRegionTable {
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
JobConf jobConf = null;
try {
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(1);
jobConf.setNumReduceTasks(1);
@ -215,6 +214,9 @@ public class TestTableMapReduce extends MultiRegionTable {
verify(SINGLE_REGION_TABLE_NAME);
} finally {
mrCluster.shutdown();
if (jobConf != null) {
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
}
}
}
@ -244,8 +246,9 @@ public class TestTableMapReduce extends MultiRegionTable {
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
JobConf jobConf = null;
try {
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(2);
jobConf.setNumReduceTasks(1);
@ -262,6 +265,9 @@ public class TestTableMapReduce extends MultiRegionTable {
verify(MULTI_REGION_TABLE_NAME);
} finally {
mrCluster.shutdown();
if (jobConf != null) {
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
}
}
}

View File

@ -93,7 +93,6 @@ public class TestCompaction extends HBaseTestCase {
*/
public void testCompaction() throws Exception {
createStoreFile(r);
assertFalse(r.compactIfNeeded());
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
createStoreFile(r);
}
@ -106,35 +105,8 @@ public class TestCompaction extends HBaseTestCase {
r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
// Assert that I can get > 5 versions (Should be at least 5 in there).
assertTrue(cellValues.length >= 5);
// Try to run compaction concurrent with a thread flush just to see that
// we can.
final HRegion region = this.r;
Thread t1 = new Thread() {
@Override
public void run() {
try {
region.flushcache();
} catch (IOException e) {
e.printStackTrace();
}
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
assertTrue(region.compactIfNeeded());
} catch (IOException e) {
e.printStackTrace();
}
}
};
t1.setDaemon(true);
t1.start();
t2.setDaemon(true);
t2.start();
t1.join();
t2.join();
r.flushcache();
r.compactStores();
// Now assert that there are 4 versions of a record only: thats the
// 3 versions that should be in the compacted store and then the one more
// we added when we flushed. But could be 3 only if the flush happened
@ -170,7 +142,8 @@ public class TestCompaction extends HBaseTestCase {
// compacted store and the flush above when we added deletes. Add more
// content to be certain.
createSmallerStoreFile(this.r);
assertTrue(r.compactIfNeeded());
r.flushcache();
r.compactStores();
// Assert that the first row is still deleted.
cellValues = r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
assertNull(cellValues);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@ -44,6 +45,13 @@ public class TestHMemcache extends TestCase {
private static final String COLUMN_FAMILY = "column";
private static final int FIRST_ROW = 1;
private static final int NUM_VALS = 1000;
private static final Text CONTENTS_BASIC = new Text("contents:basic");
private static final String CONTENTSTR = "contentstr";
private static final String ANCHORNUM = "anchor:anchornum-";
private static final String ANCHORSTR = "anchorstr";
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
@ -51,6 +59,50 @@ public class TestHMemcache extends TestCase {
this.hmemcache = new Memcache();
}
/**
* @throws UnsupportedEncodingException
*/
public void testMemcache() throws UnsupportedEncodingException {
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
Text row = new Text("row_" + k);
HStoreKey key =
new HStoreKey(row, CONTENTS_BASIC, System.currentTimeMillis());
hmemcache.add(key, (CONTENTSTR + k).getBytes(HConstants.UTF8_ENCODING));
key =
new HStoreKey(row, new Text(ANCHORNUM + k), System.currentTimeMillis());
hmemcache.add(key, (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING));
}
// Read them back
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
List<Cell> results;
Text row = new Text("row_" + k);
HStoreKey key = new HStoreKey(row, CONTENTS_BASIC, Long.MAX_VALUE);
results = hmemcache.get(key, 1);
assertNotNull("no data for " + key.toString(), results);
assertEquals(1, results.size());
String bodystr = new String(results.get(0).getValue(),
HConstants.UTF8_ENCODING);
String teststr = CONTENTSTR + k;
assertTrue("Incorrect value for key: (" + key.toString() +
"), expected: '" + teststr + "' got: '" +
bodystr + "'", teststr.compareTo(bodystr) == 0);
key = new HStoreKey(row, new Text(ANCHORNUM + k), Long.MAX_VALUE);
results = hmemcache.get(key, 1);
assertNotNull("no data for " + key.toString(), results);
assertEquals(1, results.size());
bodystr = new String(results.get(0).getValue(),
HConstants.UTF8_ENCODING);
teststr = ANCHORSTR + k;
assertTrue("Incorrect value for key: (" + key.toString() +
"), expected: '" + teststr + "' got: '" + bodystr + "'",
teststr.compareTo(bodystr) == 0);
}
}
private Text getRowName(final int index) {
return new Text("row" + Integer.toString(index));
}
@ -175,8 +227,8 @@ public class TestHMemcache extends TestCase {
}
}
/** For HBASE-528 **/
public void testGetRowKeyAtOrBefore() throws IOException {
/** For HBASE-528 */
public void testGetRowKeyAtOrBefore() {
// set up some test data
Text t10 = new Text("010");
Text t20 = new Text("020");

View File

@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@ -57,7 +56,7 @@ implements RegionUnavailableListener {
*/
public void testHRegion() throws IOException {
try {
setup();
init();
locks();
badPuts();
basic();
@ -65,14 +64,7 @@ implements RegionUnavailableListener {
batchWrite();
splitAndMerge();
read();
cleanup();
} finally {
if (r != null) {
r.close();
}
if (log != null) {
log.closeAndDelete();
}
StaticTestEnvironment.shutdownDfs(cluster);
}
}
@ -97,21 +89,35 @@ implements RegionUnavailableListener {
private static int numInserted = 0;
// Create directories, start mini cluster, etc.
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
this.conf.set("hbase.hstore.compactionThreshold", "2");
private void setup() throws IOException {
if (!StaticTestEnvironment.debugging) {
conf.setLong("hbase.hregion.max.filesize", 65536);
}
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
fs = cluster.getFileSystem();
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR,
this.cluster.getFileSystem().getHomeDirectory().toString());
super.setUp();
}
// Create directories, start mini cluster, etc.
private void init() throws IOException {
desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor("contents:"));
desc.addFamily(new HColumnDescriptor("anchor:"));
r = createNewHRegion(desc, null, null);
log = r.getLog();
region = new HRegionIncommon(r);
LOG.info("setup completed.");
}
// Test basic functionality. Writes to contents:basic and anchor:anchornum-*
@ -129,7 +135,7 @@ implements RegionUnavailableListener {
(ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING));
region.commit(writeid, System.currentTimeMillis());
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
LOG.info("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Flush cache
@ -138,7 +144,7 @@ implements RegionUnavailableListener {
region.flushcache();
System.out.println("Cache flush elapsed time: "
LOG.info("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Read them back in
@ -165,8 +171,10 @@ implements RegionUnavailableListener {
bodystr, teststr);
}
System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
LOG.info("Read " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
LOG.info("basic completed.");
}
private void badPuts() {
@ -198,6 +206,7 @@ implements RegionUnavailableListener {
}
}
assertTrue("Bad family", exceptionThrown);
LOG.info("badPuts completed.");
}
/**
@ -253,6 +262,7 @@ implements RegionUnavailableListener {
}
}
}
LOG.info("locks completed.");
}
// Test scanners. Writes contents:firstcol and anchor:secondcol
@ -283,7 +293,7 @@ implements RegionUnavailableListener {
numInserted += 2;
}
System.out.println("Write " + (vals1.length / 2) + " elapsed time: "
LOG.info("Write " + (vals1.length / 2) + " elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// 2. Scan from cache
@ -321,7 +331,7 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched);
System.out.println("Scanned " + (vals1.length / 2)
LOG.info("Scanned " + (vals1.length / 2)
+ " rows from cache. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -331,7 +341,7 @@ implements RegionUnavailableListener {
region.flushcache();
System.out.println("Cache flush elapsed time: "
LOG.info("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// 4. Scan from disk
@ -368,7 +378,7 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched);
System.out.println("Scanned " + (vals1.length / 2)
LOG.info("Scanned " + (vals1.length / 2)
+ " rows from disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -386,7 +396,7 @@ implements RegionUnavailableListener {
numInserted += 2;
}
System.out.println("Write " + (vals1.length / 2) + " rows. Elapsed time: "
LOG.info("Write " + (vals1.length / 2) + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// 6. Scan from cache and disk
@ -423,7 +433,7 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched);
System.out.println("Scanned " + vals1.length
LOG.info("Scanned " + vals1.length
+ " rows from cache and disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -433,7 +443,7 @@ implements RegionUnavailableListener {
region.flushcache();
System.out.println("Cache flush elapsed time: "
LOG.info("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// 8. Scan from disk
@ -468,7 +478,7 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched);
System.out.println("Scanned " + vals1.length
LOG.info("Scanned " + vals1.length
+ " rows from disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -506,9 +516,11 @@ implements RegionUnavailableListener {
}
assertEquals("Should have fetched " + (numInserted / 2) + " values, but fetched " + numFetched, (numInserted / 2), numFetched);
System.out.println("Scanned " + (numFetched / 2)
LOG.info("Scanned " + (numFetched / 2)
+ " rows from disk with specified start point. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
LOG.info("scan completed.");
}
// Do a large number of writes. Disabled if not debugging because it takes a
@ -517,6 +529,7 @@ implements RegionUnavailableListener {
private void batchWrite() throws IOException {
if(! StaticTestEnvironment.debugging) {
LOG.info("batchWrite completed.");
return;
}
@ -542,7 +555,7 @@ implements RegionUnavailableListener {
buf1.toString().getBytes(HConstants.UTF8_ENCODING));
region.commit(writeid, System.currentTimeMillis());
if (k > 0 && k % (N_ROWS / 100) == 0) {
System.out.println("Flushing write #" + k);
LOG.info("Flushing write #" + k);
long flushStart = System.currentTimeMillis();
region.flushcache();
@ -550,51 +563,55 @@ implements RegionUnavailableListener {
totalFlush += (flushEnd - flushStart);
if (k % (N_ROWS / 10) == 0) {
System.out.print("Rolling log...");
System.err.print("Rolling log...");
long logStart = System.currentTimeMillis();
log.rollWriter();
long logEnd = System.currentTimeMillis();
totalLog += (logEnd - logStart);
System.out.println(" elapsed time: " + ((logEnd - logStart) / 1000.0));
LOG.info(" elapsed time: " + ((logEnd - logStart) / 1000.0));
}
}
}
long startCompact = System.currentTimeMillis();
if(r.compactIfNeeded()) {
totalCompact = System.currentTimeMillis() - startCompact;
System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0));
} else {
System.out.println("No compaction required.");
}
r.compactStores();
totalCompact = System.currentTimeMillis() - startCompact;
LOG.info("Region compacted - elapsedTime: " + (totalCompact / 1000.0));
long endTime = System.currentTimeMillis();
long totalElapsed = (endTime - startTime);
System.out.println();
System.out.println("Batch-write complete.");
System.out.println("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes");
System.out.println("Total flush-time: " + (totalFlush / 1000.0));
System.out.println("Total compact-time: " + (totalCompact / 1000.0));
System.out.println("Total log-time: " + (totalLog / 1000.0));
System.out.println("Total time elapsed: " + (totalElapsed / 1000.0));
System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0)));
System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0));
System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)));
System.out.println();
LOG.info("");
LOG.info("Batch-write complete.");
LOG.info("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes");
LOG.info("Total flush-time: " + (totalFlush / 1000.0));
LOG.info("Total compact-time: " + (totalCompact / 1000.0));
LOG.info("Total log-time: " + (totalLog / 1000.0));
LOG.info("Total time elapsed: " + (totalElapsed / 1000.0));
LOG.info("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0)));
LOG.info("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0));
LOG.info("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)));
LOG.info("");
LOG.info("batchWrite completed.");
}
// NOTE: This test depends on testBatchWrite succeeding
private void splitAndMerge() throws IOException {
Path oldRegionPath = r.getRegionDir();
Text midKey = r.compactStores();
assertNotNull(midKey);
long startTime = System.currentTimeMillis();
HRegion subregions[] = r.splitRegion(this);
HRegion subregions[] = r.splitRegion(this, midKey);
if (subregions != null) {
System.out.println("Split region elapsed time: "
LOG.info("Split region elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
assertEquals("Number of subregions", subregions.length, 2);
for (int i = 0; i < subregions.length; i++) {
subregions[i] = openClosedRegion(subregions[i]);
subregions[i].compactStores();
}
// Now merge it back together
Path oldRegion1 = subregions[0].getRegionDir();
@ -602,12 +619,13 @@ implements RegionUnavailableListener {
startTime = System.currentTimeMillis();
r = HRegion.mergeAdjacent(subregions[0], subregions[1]);
region = new HRegionIncommon(r);
System.out.println("Merge regions elapsed time: "
LOG.info("Merge regions elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
fs.delete(oldRegion1);
fs.delete(oldRegion2);
fs.delete(oldRegionPath);
}
LOG.info("splitAndMerge completed.");
}
/**
@ -668,7 +686,7 @@ implements RegionUnavailableListener {
anchorFetched++;
} else {
System.out.println("UNEXPECTED COLUMN " + col);
LOG.info("UNEXPECTED COLUMN " + col);
}
}
curVals.clear();
@ -677,7 +695,7 @@ implements RegionUnavailableListener {
assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched);
assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched);
System.out.println("Scanned " + NUM_VALS
LOG.info("Scanned " + NUM_VALS
+ " rows from disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -720,7 +738,7 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched);
System.out.println("Scanned " + (numFetched / 2)
LOG.info("Scanned " + (numFetched / 2)
+ " rows from disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -753,7 +771,7 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + N_ROWS + " values, but fetched " + numFetched, N_ROWS, numFetched);
System.out.println("Scanned " + N_ROWS
LOG.info("Scanned " + N_ROWS
+ " rows from disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -785,37 +803,14 @@ implements RegionUnavailableListener {
}
assertEquals("Inserted " + (NUM_VALS + numInserted/2) + " values, but fetched " + fetched, (NUM_VALS + numInserted/2), fetched);
System.out.println("Scanned " + fetched
LOG.info("Scanned " + fetched
+ " rows from disk. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
} finally {
s.close();
}
LOG.info("read completed.");
}
private static void deleteFile(File f) {
if(f.isDirectory()) {
File[] children = f.listFiles();
for(int i = 0; i < children.length; i++) {
deleteFile(children[i]);
}
}
f.delete();
}
private void cleanup() {
try {
r.close();
r = null;
log.closeAndDelete();
log = null;
} catch (IOException e) {
e.printStackTrace();
}
// Delete all the DFS files
deleteFile(new File(System.getProperty("test.build.data"), "dfs"));
}
}

View File

@ -24,16 +24,15 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.StaticTestEnvironment;
import org.apache.hadoop.hbase.io.Cell;
/**
@ -75,6 +74,7 @@ public class TestSplit extends MultiRegionTable {
HRegion region = null;
try {
HTableDescriptor htd = createTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
region = createNewHRegion(htd, null, null);
basicSplit(region);
} finally {
@ -88,9 +88,9 @@ public class TestSplit extends MultiRegionTable {
private void basicSplit(final HRegion region) throws Exception {
addContent(region, COLFAMILY_NAME3);
region.flushcache();
Text midkey = new Text();
assertTrue(region.needsSplit(midkey));
HRegion [] regions = split(region);
Text midkey = region.compactStores();
assertNotNull(midkey);
HRegion [] regions = split(region, midkey);
try {
// Need to open the regions.
// TODO: Add an 'open' to HRegion... don't do open by constructing
@ -106,17 +106,9 @@ public class TestSplit extends MultiRegionTable {
assertScan(regions[0], COLFAMILY_NAME3, new Text(START_KEY));
assertScan(regions[1], COLFAMILY_NAME3, midkey);
// Now prove can't split regions that have references.
Text[] midkeys = new Text[regions.length];
for (int i = 0; i < regions.length; i++) {
midkeys[i] = new Text();
// Even after above splits, still needs split but after splits its
// unsplitable because biggest store file is reference. References
// make the store unsplittable, until something bigger comes along.
assertFalse(regions[i].needsSplit(midkeys[i]));
// Add so much data to this region, we create a store file that is >
// than
// one of our unsplitable references.
// it will.
// than one of our unsplitable references. it will.
for (int j = 0; j < 2; j++) {
addContent(regions[i], COLFAMILY_NAME3);
}
@ -125,30 +117,23 @@ public class TestSplit extends MultiRegionTable {
regions[i].flushcache();
}
// Assert that even if one store file is larger than a reference, the
// region is still deemed unsplitable (Can't split region if references
// presen).
for (int i = 0; i < regions.length; i++) {
midkeys[i] = new Text();
// Even after above splits, still needs split but after splits its
// unsplitable because biggest store file is reference. References
// make the store unsplittable, until something bigger comes along.
assertFalse(regions[i].needsSplit(midkeys[i]));
}
Text[] midkeys = new Text[regions.length];
// To make regions splitable force compaction.
for (int i = 0; i < regions.length; i++) {
regions[i].compactStores();
midkeys[i] = regions[i].compactStores();
}
TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
// Split these two daughter regions so then I'll have 4 regions. Will
// split because added data above.
for (int i = 0; i < regions.length; i++) {
HRegion[] rs = split(regions[i]);
for (int j = 0; j < rs.length; j++) {
sortedMap.put(rs[j].getRegionName().toString(),
openClosedRegion(rs[j]));
HRegion[] rs = null;
if (midkeys[i] != null) {
rs = split(regions[i], midkeys[i]);
for (int j = 0; j < rs.length; j++) {
sortedMap.put(rs[j].getRegionName().toString(),
openClosedRegion(rs[j]));
}
}
}
LOG.info("Made 4 regions");
@ -219,12 +204,11 @@ public class TestSplit extends MultiRegionTable {
}
}
private HRegion [] split(final HRegion r) throws IOException {
Text midKey = new Text();
assertTrue(r.needsSplit(midKey));
private HRegion [] split(final HRegion r, final Text midKey)
throws IOException {
// Assert can get mid key from passed region.
assertGet(r, COLFAMILY_NAME3, midKey);
HRegion [] regions = r.splitRegion(null);
HRegion [] regions = r.splitRegion(null, midKey);
assertEquals(regions.length, 2);
return regions;
}

View File

@ -19,20 +19,12 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.TimestampTestBase;
import org.apache.commons.logging.Log;
@ -43,7 +35,7 @@ import org.apache.commons.logging.LogFactory;
* tests same in presence of deletes. Test cores are written so can be
* run against an HRegion and against an HTable: i.e. both local and remote.
*/
public class TestTimestamp extends HBaseTestCase {
public class TestTimestamp extends HBaseClusterTestCase {
private static final Log LOG =
LogFactory.getLog(TestTimestamp.class.getName());
@ -51,11 +43,6 @@ public class TestTimestamp extends HBaseTestCase {
private static final Text COLUMN = new Text(COLUMN_NAME);
private static final int VERSIONS = 3;
/** constructor */
public TestTimestamp() {
super();
}
/**
* Test that delete works according to description in <a
* href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>.

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.StaticTestEnvironment;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
/** Test stand alone merge tool that can merge arbitrary regions */
@ -54,6 +55,7 @@ public class TestMergeTool extends HBaseTestCase {
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
this.conf.set("hbase.hstore.compactionThreshold", "2");
// Create table description
@ -237,7 +239,9 @@ public class TestMergeTool extends HBaseTestCase {
for (int i = 0; i < 3 ; i++) {
for (int j = 0; j < rows[i].length; j++) {
byte[] bytes = merged.get(rows[i][j], COLUMN_NAME).getValue();
Cell cell = merged.get(rows[i][j], COLUMN_NAME);
assertNotNull(cell);
byte[] bytes = cell.getValue();
assertNotNull(bytes);
Text value = new Text(bytes);
assertTrue(value.equals(rows[i][j]));