From 6e225dd7f18ae8724fea0037f7099b3f12cce8f2 Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Wed, 2 Apr 2008 06:58:26 +0000 Subject: [PATCH] HBASE-469 Streamline HStore startup and compactions HMerge, HRegionServer - changes that reflect changes to HRegion, CompactSplitThread and Flusher methods ServerManager - Return zero length array to region server if it is exiting or quiesced and Master is not yet ready to shut down. QueueEntry - removed. no longer used. CompactSplitThread - make compactionQueue a queue of HRegion. - Add Set so we can quickly determine if a region is in the queue. BlockingQueue.contains() does a linear scan of the queue. - Add a lock and interruptPolitely methods so that compactions/splits in progress are not interrupted. - Don't add a region to the queue if it is already present. Flusher - change queue from DelayQueue to BlockingQueue, with HRegion entries instead of QueueEntry. - Add Set to quickly determine if a region is already in the queue to avoid linear scan of BlockingQueue.contains(). - Only put regions in the queue for optional cache flush if the last time they were flushed is older than now - optionalFlushInterval. - Only add regions to the queue if it is not already present. HRegion - don't request a cache flush if one has already been requested. - Add setLastFlushTime so flusher can set it once it has queued an optional flush. - Replace largestHStore with getLargestHStoreSize: returns long instead of HStoreSize object. - Add midKey as parameter to splitRegion. - Reorder start of splitRegion so it doesn't do any work before validating parameters. - Remove needsSplit and compactIfNeeded - no longer needed. - compactStores now returns midKey if split is needed. - snapshotMemcaches now sets flushRequested to false and sets lastFlushTime to now. - update does not request a cache flush if one has already been requested. - Override equals and hashCode so HRegions can be stored in a HashSet. HStore - loadHStoreFiles now computes max sequence id and the initial size of the store. - Add getter for family. - internalCacheFlush updates store size, and logs both size of cache flush and resulting map file size (with debug logging enabled). - Remove needsCompaction and hasReferences - no longer needed. - compact() returns midKey if store needs to be split. - compact() does all checking before actually starting a compaction. - If store size is greater than desiredMaxFileSize, compact returns the midKey for the store regardless of whether a compaction was actually done. - Added more synchronization in completeCompaction while iterating over storeFiles. - completeCompaction computes new store size. - New method checkSplit replaces method size. Returns midKey if store needs to be split and can be split. HStoreSize - removed. No longer needed. HBaseTestCase - only set fs if it has not already been set by a subclass. TestTableIndex, TestTableMapReduce - call FSUtil.deleteFully to clean up cruft left in local fs, by MapReduce git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@643761 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + src/java/org/apache/hadoop/hbase/HMerge.java | 5 +- .../hadoop/hbase/master/ServerManager.java | 12 +- .../regionserver/CompactSplitThread.java | 75 ++- .../hadoop/hbase/regionserver/Flusher.java | 94 ++-- .../hadoop/hbase/regionserver/HRegion.java | 234 ++++------ .../hbase/regionserver/HRegionServer.java | 11 +- .../hadoop/hbase/regionserver/HStore.java | 430 +++++++++--------- .../hadoop/hbase/regionserver/HStoreSize.java | 33 -- .../hadoop/hbase/regionserver/Memcache.java | 8 +- .../hadoop/hbase/regionserver/QueueEntry.java | 78 ---- .../apache/hadoop/hbase/util/MetaUtils.java | 16 +- .../hadoop/hbase/HBaseClusterTestCase.java | 8 +- .../apache/hadoop/hbase/HBaseTestCase.java | 67 ++- .../apache/hadoop/hbase/TestHBaseCluster.java | 11 +- .../hadoop/hbase/mapred/TestTableIndex.java | 17 +- .../hbase/mapred/TestTableMapReduce.java | 18 +- .../hbase/regionserver/TestCompaction.java | 35 +- .../hbase/regionserver/TestHMemcache.java | 56 ++- .../hbase/regionserver/TestHRegion.java | 149 +++--- .../hadoop/hbase/regionserver/TestSplit.java | 52 +-- .../hbase/regionserver/TestTimestamp.java | 17 +- .../hadoop/hbase/util/TestMergeTool.java | 6 +- 23 files changed, 679 insertions(+), 756 deletions(-) delete mode 100644 src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java delete mode 100644 src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java diff --git a/CHANGES.txt b/CHANGES.txt index 0e7fc2e0f0e..78cab35b1f4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,9 @@ Hbase Change Log NEW FEATURES HBASE-548 Tool to online single region + + IMPROVEMENTS + HBASE-469 Streamline HStore startup and compactions Release 0.1.0 diff --git a/src/java/org/apache/hadoop/hbase/HMerge.java b/src/java/org/apache/hadoop/hbase/HMerge.java index de31f32ec54..7f58a8587fd 100644 --- a/src/java/org/apache/hadoop/hbase/HMerge.java +++ b/src/java/org/apache/hadoop/hbase/HMerge.java @@ -144,17 +144,16 @@ class HMerge implements HConstants { long currentSize = 0; HRegion nextRegion = null; long nextSize = 0; - Text midKey = new Text(); for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = new HRegion(tabledir, hlog, fs, conf, info[i], null, null); - currentSize = currentRegion.largestHStore(midKey).getAggregate(); + currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null); - nextSize = nextRegion.largestHStore(midKey).getAggregate(); + nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { // We merge two adjacent regions if their total size is less than diff --git a/src/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/java/org/apache/hadoop/hbase/master/ServerManager.java index 74a26a97ced..9c8c6bdf26a 100644 --- a/src/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -128,6 +128,9 @@ class ServerManager implements HConstants { } /** + * Called to process the messages sent from the region server to the master + * along with the heart beat. + * * @param serverInfo * @param msgs * @return messages from master to region server indicating what region @@ -142,7 +145,7 @@ class ServerManager implements HConstants { if (msgs.length > 0) { if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { processRegionServerExit(serverName, msgs); - return new HMsg[]{msgs[0]}; + return new HMsg[0]; } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { LOG.info("Region server " + serverName + " quiesced"); master.quiescedMetaServers.incrementAndGet(); @@ -157,6 +160,11 @@ class ServerManager implements HConstants { } if (master.shutdownRequested && !master.closed.get()) { + if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { + // Server is already quiesced, but we aren't ready to shut down + // return empty response + return new HMsg[0]; + } // Tell the server to stop serving any user regions return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)}; } @@ -522,7 +530,7 @@ class ServerManager implements HConstants { public int averageLoad() { return 0; } - + /** @return the number of active servers */ public int numServers() { return serversToServerInfo.size(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index a2ba9ba0140..1293bd118c1 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.HashSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -42,19 +43,22 @@ import org.apache.hadoop.hbase.util.Writables; class CompactSplitThread extends Thread implements RegionUnavailableListener, HConstants { static final Log LOG = LogFactory.getLog(CompactSplitThread.class); - + private HTable root = null; private HTable meta = null; - private long startTime; + private volatile long startTime; private final long frequency; + private final Integer lock = new Integer(0); - private HRegionServer server; - private HBaseConfiguration conf; + private final HRegionServer server; + private final HBaseConfiguration conf; - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); - - /** constructor */ + private final BlockingQueue compactionQueue = + new LinkedBlockingQueue(); + + private final HashSet regionsInQueue = new HashSet(); + + /** @param server */ public CompactSplitThread(HRegionServer server) { super(); this.server = server; @@ -68,19 +72,26 @@ implements RegionUnavailableListener, HConstants { @Override public void run() { while (!server.isStopRequested()) { - QueueEntry e = null; + HRegion r = null; try { - e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; + r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); + if (r != null) { + synchronized (regionsInQueue) { + regionsInQueue.remove(r); + } + synchronized (lock) { + // Don't interrupt us while we are working + Text midKey = r.compactStores(); + if (midKey != null) { + split(r, midKey); + } + } } - e.getRegion().compactIfNeeded(); - split(e.getRegion()); } catch (InterruptedException ex) { continue; } catch (IOException ex) { LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (r != null ? (" for region " + r.getRegionName()) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { break; @@ -88,30 +99,35 @@ implements RegionUnavailableListener, HConstants { } catch (Exception ex) { LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (r != null ? (" for region " + r.getRegionName()) : ""), ex); if (!server.checkFileSystem()) { break; } } } + regionsInQueue.clear(); + compactionQueue.clear(); LOG.info(getName() + " exiting"); } /** - * @param e QueueEntry for region to be compacted + * @param r HRegion store belongs to */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); + public synchronized void compactionRequested(HRegion r) { + LOG.debug("Compaction requested for region: " + r.getRegionName()); + synchronized (regionsInQueue) { + if (!regionsInQueue.contains(r)) { + compactionQueue.add(r); + regionsInQueue.add(r); + } + } } - void compactionRequested(final HRegion r) { - compactionRequested(new QueueEntry(r, System.currentTimeMillis())); - } - - private void split(final HRegion region) throws IOException { + private void split(final HRegion region, final Text midKey) + throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); - final HRegion[] newRegions = region.splitRegion(this); + final HRegion[] newRegions = region.splitRegion(this, midKey); if (newRegions == null) { // Didn't need to be split return; @@ -198,4 +214,13 @@ implements RegionUnavailableListener, HConstants { server.getWriteLock().unlock(); } } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + void interruptPolitely() { + synchronized (lock) { + interrupt(); + } + } } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java b/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java index c6b92c572c8..bf9fd898f73 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java @@ -20,14 +20,16 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.concurrent.DelayQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.HashSet; import java.util.Set; -import java.util.Iterator; import java.util.ConcurrentModificationException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -35,60 +37,60 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; /** Flush cache upon request */ class Flusher extends Thread implements CacheFlushListener { static final Log LOG = LogFactory.getLog(Flusher.class); - private final DelayQueue flushQueue = - new DelayQueue(); + private final BlockingQueue flushQueue = + new LinkedBlockingQueue(); + + private final HashSet regionsInQueue = new HashSet(); + private final long threadWakeFrequency; private final long optionalFlushPeriod; private final HRegionServer server; - private final HBaseConfiguration conf; private final Integer lock = new Integer(0); - /** constructor */ - public Flusher(final HRegionServer server) { + /** + * @param conf + * @param server + */ + public Flusher(final HBaseConfiguration conf, final HRegionServer server) { super(); this.server = server; - conf = server.conf; this.optionalFlushPeriod = conf.getLong( - "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); + "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); + this.threadWakeFrequency = conf.getLong( + HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); } /** {@inheritDoc} */ @Override public void run() { + long lastOptionalCheck = System.currentTimeMillis(); while (!server.isStopRequested()) { - QueueEntry e = null; + HRegion r = null; try { - e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; - } - synchronized(lock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - server.compactionRequested(e); - } - - e.setExpirationTime(System.currentTimeMillis() + - optionalFlushPeriod); - flushQueue.add(e); - } - - // Now ensure that all the active regions are in the queue - Set regions = server.getRegionsToCheck(); - for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { - flushQueue.add(e); + long now = System.currentTimeMillis(); + if (now - threadWakeFrequency > lastOptionalCheck) { + lastOptionalCheck = now; + // Queue up regions for optional flush if they need it + Set regions = server.getRegionsToCheck(); + for (HRegion region: regions) { + synchronized (regionsInQueue) { + if (!regionsInQueue.contains(region) && + (now - optionalFlushPeriod) > region.getLastFlushTime()) { + regionsInQueue.add(region); + flushQueue.add(region); + region.setLastFlushTime(now); + } } } } - - // Now make sure that the queue only contains active regions - synchronized (flushQueue) { - for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { - e = i.next(); - if (!regions.contains(e.getRegion())) { - i.remove(); + r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (r != null) { + synchronized (regionsInQueue) { + regionsInQueue.remove(r); + } + synchronized (lock) { // Don't interrupt while we're working + if (r.flushcache()) { + server.compactSplitThread.compactionRequested(r); } } } @@ -108,32 +110,32 @@ class Flusher extends Thread implements CacheFlushListener { server.stop(); } catch (IOException ex) { LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (r != null ? (" for region " + r.getRegionName()) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { break; } } catch (Exception ex) { LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (r != null ? (" for region " + r.getRegionName()) : ""), ex); if (!server.checkFileSystem()) { break; } } } + regionsInQueue.clear(); flushQueue.clear(); LOG.info(getName() + " exiting"); } /** {@inheritDoc} */ - public void flushRequested(HRegion region) { - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); - synchronized (flushQueue) { - if (flushQueue.contains(e)) { - flushQueue.remove(e); + public void flushRequested(HRegion r) { + synchronized (regionsInQueue) { + if (!regionsInQueue.contains(r)) { + regionsInQueue.add(r); + flushQueue.add(r); } - flushQueue.add(e); } } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8480577c3b6..2369272e509 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -316,6 +316,7 @@ public class HRegion implements HConstants { new ConcurrentHashMap>(); final AtomicLong memcacheSize = new AtomicLong(0); + private volatile boolean flushRequested; final Path basedir; final HLog log; @@ -348,7 +349,6 @@ public class HRegion implements HConstants { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); private final Integer splitLock = new Integer(0); - private final long desiredMaxFileSize; private final long minSequenceId; final AtomicInteger activeScannerCount = new AtomicInteger(0); @@ -359,6 +359,8 @@ public class HRegion implements HConstants { /** * HRegion constructor. * + * @param basedir qualified path of directory where region should be located, + * usually the table directory. * @param log The HLog is the outbound log for any updates to the HRegion * (There's a single HLog for all the HRegions on a single HRegionServer.) * The log file is a logfile from the previous execution that's @@ -366,20 +368,19 @@ public class HRegion implements HConstants { * appropriate log info for this HRegion. If there is a previous log file * (implying that the HRegion has been written-to before), then read it from * the supplied path. - * @param basedir qualified path of directory where region should be located, - * usually the table directory. * @param fs is the filesystem. * @param conf is global configuration settings. * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. - * @param listener an object that implements CacheFlushListener or null + * @param flushListener an object that implements CacheFlushListener or null + * or null * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) - throws IOException { - this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null); + HRegionInfo regionInfo, Path initialFiles, + CacheFlushListener flushListener) throws IOException { + this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null); } /** @@ -399,15 +400,15 @@ public class HRegion implements HConstants { * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. - * @param listener an object that implements CacheFlushListener or null + * @param flushListener an object that implements CacheFlushListener or null * @param reporter Call on a period so hosting server can report we're * making progress to master -- otherwise master might think region deploy * failed. Can be null. * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener, - final Progressable reporter) + HRegionInfo regionInfo, Path initialFiles, + CacheFlushListener flushListener, final Progressable reporter) throws IOException { this.basedir = basedir; @@ -415,6 +416,8 @@ public class HRegion implements HConstants { this.fs = fs; this.conf = conf; this.regionInfo = regionInfo; + this.flushListener = flushListener; + this.flushRequested = false; this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.regiondir = new Path(basedir, this.regionInfo.getEncodedName()); Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); @@ -466,20 +469,16 @@ public class HRegion implements HConstants { // By default, we flush the cache when 64M. this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", 1024*1024*64); - this.flushListener = listener; + this.blockingMemcacheSize = this.memcacheFlushSize * conf.getInt("hbase.hregion.memcache.block.multiplier", 1); - // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. - this.desiredMaxFileSize = - conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); - // HRegion is ready to go! this.writestate.compacting = false; this.lastFlushTime = System.currentTimeMillis(); LOG.info("region " + this.regionInfo.getRegionName() + " available"); } - + /** * @return Updates to this region need to have a sequence id that is >= to * the this number. @@ -543,7 +542,7 @@ public class HRegion implements HConstants { // region. writestate.writesEnabled = false; LOG.debug("compactions and cache flushes disabled for region " + - regionName); + regionName); while (writestate.compacting || writestate.flushing) { LOG.debug("waiting for" + (writestate.compacting ? " compaction" : "") + @@ -617,7 +616,7 @@ public class HRegion implements HConstants { } } } - + ////////////////////////////////////////////////////////////////////////////// // HRegion accessors ////////////////////////////////////////////////////////////////////////////// @@ -672,6 +671,11 @@ public class HRegion implements HConstants { return this.lastFlushTime; } + /** @param t the lastFlushTime */ + void setLastFlushTime(long t) { + this.lastFlushTime = t; + } + ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -679,34 +683,16 @@ public class HRegion implements HConstants { // upkeep. ////////////////////////////////////////////////////////////////////////////// - /** - * @param midkey - * @return returns size of largest HStore. Also returns whether store is - * splitable or not (Its not splitable if region has a store that has a - * reference store file). - */ - public HStoreSize largestHStore(Text midkey) { - HStoreSize biggest = null; - boolean splitable = true; + /** @return returns size of largest HStore. */ + public long getLargestHStoreSize() { + long size = 0; for (HStore h: stores.values()) { - HStoreSize size = h.size(midkey); - // If we came across a reference down in the store, then propagate - // fact that region is not splitable. - if (splitable) { - splitable = size.splitable; - } - if (biggest == null) { - biggest = size; - continue; - } - if(size.getAggregate() > biggest.getAggregate()) { // Largest so far - biggest = size; + long storeSize = h.getSize(); + if (storeSize > size) { + size = storeSize; } } - if (biggest != null) { - biggest.setSplitable(splitable); - } - return biggest; + return size; } /* @@ -715,21 +701,17 @@ public class HRegion implements HConstants { * but instead create new 'reference' store files that read off the top and * bottom ranges of parent store files. * @param listener May be null. + * @param midKey key on which to split region * @return two brand-new (and open) HRegions or null if a split is not needed * @throws IOException */ - HRegion[] splitRegion(final RegionUnavailableListener listener) - throws IOException { + HRegion[] splitRegion(final RegionUnavailableListener listener, + final Text midKey) throws IOException { synchronized (splitLock) { - Text midKey = new Text(); - if (closed.get() || !needsSplit(midKey)) { + if (closed.get()) { return null; } - Path splits = new Path(this.regiondir, SPLITDIR); - if(!this.fs.exists(splits)) { - this.fs.mkdirs(splits); - } - // Make copies just in case and add start/end key checking: hbase-428. + // Add start/end key checking: hbase-428. Text startKey = new Text(this.regionInfo.getStartKey()); Text endKey = new Text(this.regionInfo.getEndKey()); if (startKey.equals(midKey)) { @@ -740,6 +722,11 @@ public class HRegion implements HConstants { LOG.debug("Endkey and midkey are same, not splitting"); return null; } + LOG.info("Starting split of region " + getRegionName()); + Path splits = new Path(this.regiondir, SPLITDIR); + if(!this.fs.exists(splits)) { + this.fs.mkdirs(splits); + } HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), startKey, midKey); Path dirA = new Path(splits, regionAInfo.getEncodedName()); @@ -805,71 +792,6 @@ public class HRegion implements HConstants { } } - /* - * Iterates through all the HStores and finds the one with the largest - * MapFile size. If the size is greater than the (currently hard-coded) - * threshold, returns true indicating that the region should be split. The - * midKey for the largest MapFile is returned through the midKey parameter. - * It is possible for us to rule the region non-splitable even in excess of - * configured size. This happens if region contains a reference file. If - * a reference file, the region can not be split. - * - * Note that there is no need to do locking in this method because it calls - * largestHStore which does the necessary locking. - * - * @param midKey midKey of the largest MapFile - * @return true if the region should be split. midKey is set by this method. - * Check it for a midKey value on return. - */ - boolean needsSplit(Text midKey) { - HStoreSize biggest = largestHStore(midKey); - if (biggest == null || midKey.getLength() == 0 || - (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) { - return false; - } - boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize); - if (split) { - if (!biggest.isSplitable()) { - LOG.warn("Region " + getRegionName().toString() + - " is NOT splitable though its aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - split = false; - } else { - LOG.info("Splitting " + getRegionName().toString() + - " because largest aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - } - } - return split; - } - - /** - * Only do a compaction if it is necessary - * - * @return whether or not there was a compaction - * @throws IOException - */ - public boolean compactIfNeeded() throws IOException { - boolean needsCompaction = false; - for (HStore store: stores.values()) { - if (store.needsCompaction()) { - needsCompaction = true; - if (LOG.isDebugEnabled()) { - LOG.debug(store.toString() + " needs compaction"); - } - break; - } - } - if (!needsCompaction) { - return false; - } - return compactStores(); - } - /* * @param dir * @return compaction directory for the passed in dir @@ -893,59 +815,53 @@ public class HRegion implements HConstants { */ private void doRegionCompactionCleanup() throws IOException { if (this.fs.exists(this.regionCompactionDir)) { - this.fs.delete(this.regionCompactionDir); + FileUtil.fullyDelete(this.fs, this.regionCompactionDir); } } - + /** - * Compact all the stores. This should be called periodically to make sure - * the stores are kept manageable. + * Called by compaction thread and after region is opened to compact the + * HStores if necessary. * *

This operation could block for a long time, so don't call it from a * time-sensitive thread. * - * @return Returns TRUE if the compaction has completed. FALSE, if the - * compaction was not carried out, because the HRegion is busy doing - * something else storage-intensive (like flushing the cache). The caller - * should check back later. - * * Note that no locking is necessary at this level because compaction only * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. * + * @return mid key if split is needed * @throws IOException */ - public boolean compactStores() throws IOException { + public Text compactStores() throws IOException { + Text midKey = null; if (this.closed.get()) { - return false; + return midKey; } try { synchronized (writestate) { if (!writestate.compacting && writestate.writesEnabled) { writestate.compacting = true; } else { - LOG.info("NOT compacting region " + - this.regionInfo.getRegionName().toString() + ": compacting=" + - writestate.compacting + ", writesEnabled=" + + LOG.info("NOT compacting region " + getRegionName() + + ": compacting=" + writestate.compacting + ", writesEnabled=" + writestate.writesEnabled); - return false; + return midKey; } } + LOG.info("starting compaction on region " + getRegionName()); long startTime = System.currentTimeMillis(); - LOG.info("starting compaction on region " + - this.regionInfo.getRegionName().toString()); - boolean status = true; doRegionCompactionPrep(); - for (HStore store : stores.values()) { - if(!store.compact()) { - status = false; + for (HStore store: stores.values()) { + Text key = store.compact(); + if (key != null && midKey == null) { + midKey = key; } } doRegionCompactionCleanup(); - LOG.info("compaction completed on region " + - this.regionInfo.getRegionName().toString() + ". Took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return status; + LOG.info("compaction completed on region " + getRegionName() + + ". Took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); } finally { synchronized (writestate) { @@ -953,6 +869,7 @@ public class HRegion implements HConstants { writestate.notifyAll(); } } + return midKey; } /** @@ -1030,6 +947,10 @@ public class HRegion implements HConstants { // will add to the unflushed size this.memcacheSize.set(0L); + this.flushRequested = false; + + // Record latest flush time + this.lastFlushTime = System.currentTimeMillis(); for (HStore hstore: stores.values()) { hstore.snapshotMemcache(); @@ -1121,11 +1042,12 @@ public class HRegion implements HConstants { this.log.completeCacheFlush(this.regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), sequenceId); - // D. Finally notify anyone waiting on memcache to clear: + // C. Finally notify anyone waiting on memcache to clear: // e.g. checkResources(). synchronized (this) { notifyAll(); } + if (LOG.isDebugEnabled()) { LOG.debug("Finished memcache flush for region " + this.regionInfo.getRegionName() + " in " + @@ -1374,8 +1296,8 @@ public class HRegion implements HConstants { Text row = b.getRow(); long lockid = obtainRowLock(row); - long commitTime = - (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp(); + long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? + System.currentTimeMillis() : b.getTimestamp(); try { List deletes = null; @@ -1612,9 +1534,11 @@ public class HRegion implements HConstants { (val == null ? 0 : val.length)); stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); } - if (this.flushListener != null && size > this.memcacheFlushSize) { + if (this.flushListener != null && !this.flushRequested && + size > this.memcacheFlushSize) { // Request a cache flush this.flushListener.flushRequested(this); + this.flushRequested = true; } } } @@ -1729,6 +1653,18 @@ public class HRegion implements HConstants { } } } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + return this.hashCode() == ((HRegion)o).hashCode(); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return this.regionInfo.getRegionName().hashCode(); + } /** {@inheritDoc} */ @Override @@ -2011,8 +1947,7 @@ public class HRegion implements HConstants { * @throws IOException */ public static void removeRegionFromMETA(final HRegionInterface srvr, - final Text metaRegionName, final Text regionName) - throws IOException { + final Text metaRegionName, final Text regionName) throws IOException { srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); } @@ -2025,8 +1960,7 @@ public class HRegion implements HConstants { * @throws IOException */ public static void offlineRegionInMETA(final HRegionInterface srvr, - final Text metaRegionName, final HRegionInfo info) - throws IOException { + final Text metaRegionName, final HRegionInfo info) throws IOException { BatchUpdate b = new BatchUpdate(info.getRegionName()); info.setOffline(true); b.put(COL_REGIONINFO, Writables.getBytes(info)); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bd8f9ab484a..20491199772 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -220,7 +220,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { conf.getInt("hbase.master.lease.period", 30 * 1000); // Cache flushing thread. - this.cacheFlusher = new Flusher(this); + this.cacheFlusher = new Flusher(conf, this); // Compaction thread this.compactSplitThread = new CompactSplitThread(this); @@ -295,6 +295,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { LOG.info("Server quiesced and not serving any regions. " + "Starting shutdown"); stopRequested.set(true); + this.outboundMsgs.clear(); continue; } @@ -412,7 +413,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already cacheFlusher.interruptPolitely(); - compactSplitThread.interrupt(); + compactSplitThread.interruptPolitely(); synchronized (logRollerLock) { this.logRoller.interrupt(); } @@ -828,8 +829,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } finally { this.lock.writeLock().unlock(); } - reportOpen(regionInfo); } + reportOpen(regionInfo); } /* @@ -1228,10 +1229,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { return lock.writeLock(); } - void compactionRequested(QueueEntry e) { - compactSplitThread.compactionRequested(e); - } - /** * @return Immutable list of this servers regions. */ diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2350bfd7064..9672a1fb0f5 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -24,15 +24,11 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; @@ -44,7 +40,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -53,7 +48,6 @@ import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.hbase.BloomFilterDescriptor; @@ -69,7 +63,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.io.RowResult; /** * HStore maintains a bunch of data files. It is responsible for maintaining @@ -87,7 +80,7 @@ public class HStore implements HConstants { * If reference, then the regex has more than just one group. Group 1 is * this files id. Group 2 the referenced region name, etc. */ - private static Pattern REF_NAME_PARSER = + private static final Pattern REF_NAME_PARSER = Pattern.compile("^(\\d+)(?:\\.(.+))?$"); private static final String BLOOMFILTER_FILE_NAME = "filter"; @@ -101,15 +94,16 @@ public class HStore implements HConstants { private final HBaseConfiguration conf; private final Path filterDir; final Filter bloomFilter; - private final Path compactionDir; - private final Integer compactLock = new Integer(0); + private final long desiredMaxFileSize; + private volatile long storeSize; + private final Integer flushLock = new Integer(0); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final AtomicInteger activeScanners = new AtomicInteger(0); - final String storeName; + final Text storeName; /* * Sorted Map of readers keyed by sequence id (Most recent should be last in @@ -125,8 +119,15 @@ public class HStore implements HConstants { private final SortedMap readers = new TreeMap(); + // The most-recent log-seq-ID that's present. The most-recent such ID means + // we can ignore all log messages up to and including that ID (because they're + // already reflected in the TreeMaps). private volatile long maxSeqId; + + private final Path compactionDir; + private final Integer compactLock = new Integer(0); private final int compactionThreshold; + private final ReentrantReadWriteLock newScannerLock = new ReentrantReadWriteLock(); @@ -177,8 +178,18 @@ public class HStore implements HConstants { this.compactionDir = HRegion.getCompactionDir(basedir); this.storeName = - this.info.getEncodedName() + "/" + this.family.getFamilyName(); + new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName()); + // By default, we compact if an HStore has more than + // MIN_COMMITS_FOR_COMPACTION map files + this.compactionThreshold = + conf.getInt("hbase.hstore.compactionThreshold", 3); + + // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. + this.desiredMaxFileSize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); + this.storeSize = 0L; + if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) { this.compression = SequenceFile.CompressionType.BLOCK; } else if (family.getCompression() == @@ -219,21 +230,10 @@ public class HStore implements HConstants { // MapFiles are in a reliable state. Every entry in 'mapdir' must have a // corresponding one in 'loginfodir'. Without a corresponding log info // file, the entry in 'mapdir' must be deleted. - List hstoreFiles = loadHStoreFiles(infodir, mapdir); - for(HStoreFile hsf: hstoreFiles) { - this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf); - } + // loadHStoreFiles also computes the max sequence id + this.maxSeqId = -1L; + this.storefiles.putAll(loadHStoreFiles(infodir, mapdir)); - // Now go through all the HSTORE_LOGINFOFILEs and figure out the - // most-recent log-seq-ID that's present. The most-recent such ID means we - // can ignore all log messages up to and including that ID (because they're - // already reflected in the TreeMaps). - // - // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That - // means it was built prior to the previous run of HStore, and so it cannot - // contain any updates also contained in the log. - - this.maxSeqId = getMaxSequenceId(hstoreFiles); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); @@ -250,16 +250,6 @@ public class HStore implements HConstants { " -- continuing. Probably DATA LOSS!", e); } - // By default, we compact if an HStore has more than - // MIN_COMMITS_FOR_COMPACTION map files - this.compactionThreshold = - conf.getInt("hbase.hstore.compactionThreshold", 3); - - // We used to compact in here before bringing the store online. Instead - // get it online quick even if it needs compactions so we can start - // taking updates as soon as possible (Once online, can take updates even - // during a compaction). - // Move maxSeqId on by one. Why here? And not in HRegion? this.maxSeqId += 1; @@ -276,28 +266,13 @@ public class HStore implements HConstants { first = false; } else { this.readers.put(e.getKey(), - e.getValue().getReader(this.fs, this.bloomFilter)); + e.getValue().getReader(this.fs, this.bloomFilter)); } } } - - /* - * @param hstoreFiles - * @return Maximum sequence number found or -1. - * @throws IOException - */ - private long getMaxSequenceId(final List hstoreFiles) - throws IOException { - long maxSeqID = -1; - for (HStoreFile hsf : hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - return maxSeqID; + + HColumnDescriptor getFamily() { + return this.family; } long getMaxSequenceId() { @@ -388,7 +363,7 @@ public class HStore implements HConstants { * @param mapdir qualified path for map file directory * @throws IOException */ - private List loadHStoreFiles(Path infodir, Path mapdir) + private SortedMap loadHStoreFiles(Path infodir, Path mapdir) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("infodir: " + infodir.toString() + " mapdir: " + @@ -397,7 +372,7 @@ public class HStore implements HConstants { // Look first at info files. If a reference, these contain info we need // to create the HStoreFile. FileStatus infofiles[] = fs.listStatus(infodir); - ArrayList results = new ArrayList(infofiles.length); + SortedMap results = new TreeMap(); ArrayList mapfiles = new ArrayList(infofiles.length); for (int i = 0; i < infofiles.length; i++) { Path p = infofiles[i].getPath(); @@ -414,6 +389,11 @@ public class HStore implements HConstants { boolean isReference = isReference(p, m); long fid = Long.parseLong(m.group(1)); + if (LOG.isDebugEnabled()) { + LOG.debug("loading file " + p.toString() + ", isReference=" + + isReference + ", file id=" + fid); + } + HStoreFile curfile = null; HStoreFile.Reference reference = null; if (isReference) { @@ -421,6 +401,22 @@ public class HStore implements HConstants { } curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), fid, reference); + + storeSize += curfile.length(); + long storeSeqId = -1; + try { + storeSeqId = curfile.loadInfo(fs); + if (storeSeqId > this.maxSeqId) { + this.maxSeqId = storeSeqId; + } + } catch (IOException e) { + // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. + // That means it was built prior to the previous run of HStore, and so + // it cannot contain any updates also contained in the log. + LOG.info("HSTORE_LOGINFOFILE " + curfile + + " does not contain a sequence number - ignoring"); + } + Path mapfile = curfile.getMapFilePath(); if (!fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); @@ -432,7 +428,7 @@ public class HStore implements HConstants { // TODO: Confirm referent exists. // Found map and sympathetic info file. Add this hstorefile to result. - results.add(curfile); + results.put(storeSeqId, curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next // section. Make sure path is fully qualified for compare. mapfiles.add(mapfile); @@ -581,17 +577,14 @@ public class HStore implements HConstants { for (MapFile.Reader reader: this.readers.values()) { reader.close(); } - this.readers.clear(); result = new ArrayList(storefiles.values()); - this.storefiles.clear(); LOG.debug("closed " + this.storeName); return result; } finally { this.lock.writeLock().unlock(); } } - - + ////////////////////////////////////////////////////////////////////////////// // Flush changes to disk ////////////////////////////////////////////////////////////////////////////// @@ -627,10 +620,10 @@ public class HStore implements HConstants { synchronized(flushLock) { // A. Write the Maps out to the disk HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, - info.getEncodedName(), family.getFamilyName(), -1L, null); + info.getEncodedName(), family.getFamilyName(), -1L, null); String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, - this.bloomFilter); + this.bloomFilter); // Here we tried picking up an existing HStoreFile from disk and // interlacing the memcache flush compacting as we go. The notion was @@ -644,18 +637,23 @@ public class HStore implements HConstants { // Related, looks like 'merging compactions' in BigTable paper interlaces // a memcache flush. We don't. int entries = 0; + long cacheSize = 0; try { for (Map.Entry es: cache.entrySet()) { HStoreKey curkey = es.getKey(); + byte[] bytes = es.getValue(); TextSequence f = HStoreKey.extractFamily(curkey.getColumn()); if (f.equals(this.family.getFamilyName())) { entries++; - out.append(curkey, new ImmutableBytesWritable(es.getValue())); + out.append(curkey, new ImmutableBytesWritable(bytes)); + cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0); } } } finally { out.close(); } + long newStoreSize = flushedFile.length(); + storeSize += newStoreSize; // B. Write out the log sequence number that corresponds to this output // MapFile. The MapFile is current up to and including the log seq num. @@ -676,14 +674,14 @@ public class HStore implements HConstants { this.storefiles.put(flushid, flushedFile); if(LOG.isDebugEnabled()) { LOG.debug("Added " + name + " with " + entries + - " entries, sequence id " + logCacheFlushId + ", and size " + - StringUtils.humanReadableInt(flushedFile.length()) + " for " + + " entries, sequence id " + logCacheFlushId + ", data size " + + StringUtils.humanReadableInt(cacheSize) + ", file size " + + StringUtils.humanReadableInt(newStoreSize) + " for " + this.storeName); } } finally { this.lock.writeLock().unlock(); } - return; } } @@ -691,28 +689,6 @@ public class HStore implements HConstants { // Compaction ////////////////////////////////////////////////////////////////////////////// - /** - * @return True if this store needs compaction. - */ - boolean needsCompaction() { - return this.storefiles != null && - (this.storefiles.size() >= this.compactionThreshold || hasReferences()); - } - - /* - * @return True if this store has references. - */ - private boolean hasReferences() { - if (this.storefiles != null) { - for (HStoreFile hsf: this.storefiles.values()) { - if (hsf.isReference()) { - return true; - } - } - } - return false; - } - /** * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. @@ -728,42 +704,66 @@ public class HStore implements HConstants { * * We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. - * @throws IOException * - * @return true if compaction completed successfully + * @return mid key if a split is needed, null otherwise + * @throws IOException */ - boolean compact() throws IOException { + Text compact() throws IOException { synchronized (compactLock) { - if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + - " files using " + compactionDir.toString() + " for " + - this.storeName); - } - - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); + long maxId = -1; + List filesToCompact = null; + synchronized (storefiles) { + filesToCompact = new ArrayList(this.storefiles.values()); + if (filesToCompact.size() < 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not compacting " + this.storeName + + " because no store files to compact."); + } + return checkSplit(); + } else if (filesToCompact.size() == 1) { + if (!filesToCompact.get(0).isReference()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not compacting " + this.storeName + + " because only one store file and it is not a reference"); + } + return checkSplit(); + } + } else if (filesToCompact.size() < compactionThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not compacting " + this.storeName + + " because number of stores " + filesToCompact.size() + + " < compaction threshold " + compactionThreshold); + } + return checkSplit(); } - return false; - } - if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { - LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); - return false; + if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { + LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); + return checkSplit(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("started compaction of " + filesToCompact.size() + + " files using " + compactionDir.toString() + " for " + + this.storeName); + } + + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + Collections.reverse(filesToCompact); + + // The max-sequenceID in any of the to-be-compacted TreeMaps is the + // last key of storefiles. + + maxId = this.storefiles.lastKey(); } // Step through them, writing to the brand-new MapFile HStoreFile compactedOutputFile = new HStoreFile(conf, fs, - this.compactionDir, info.getEncodedName(), family.getFamilyName(), - -1L, null); + this.compactionDir, info.getEncodedName(), family.getFamilyName(), + -1L, null); MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); + this.compression, this.bloomFilter); try { compactHStoreFiles(compactedOut, filesToCompact); } finally { @@ -771,14 +771,17 @@ public class HStore implements HConstants { } // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. - long maxId = getMaxSequenceId(filesToCompact); compactedOutputFile.writeInfo(fs, maxId); // Move the compaction into place. completeCompaction(filesToCompact, compactedOutputFile); - return true; + + if (LOG.isDebugEnabled()) { + LOG.debug("Completed compaction of " + this.storeName + + " store size is " + StringUtils.humanReadableInt(storeSize)); + } } + return checkSplit(); } /* @@ -975,11 +978,11 @@ public class HStore implements HConstants { *

    * 1) Wait for active scanners to exit
    * 2) Acquiring the write-lock
-   * 3) Figuring out what MapFiles are going to be replaced
-   * 4) Moving the new compacted MapFile into place
-   * 5) Unloading all the replaced MapFiles.
-   * 6) Deleting all the old MapFile files.
-   * 7) Loading the new TreeMap.
+   * 3) Moving the new compacted MapFile into place
+   * 4) Unloading all the replaced MapFiles and close.
+   * 5) Deleting all the replaced MapFile files.
+   * 6) Loading the new TreeMap.
+   * 7) Compute new store size
    * 8) Releasing the write-lock
    * 9) Allow new scanners to proceed.
    * 
@@ -1027,46 +1030,53 @@ public class HStore implements HConstants { // 4. and 5. Unload all the replaced MapFiles, close and delete. - List toDelete = new ArrayList(); - for (Map.Entry e: this.storefiles.entrySet()) { - if (!compactedFiles.contains(e.getValue())) { - continue; - } - Long key = e.getKey(); - MapFile.Reader reader = this.readers.remove(key); - if (reader != null) { - reader.close(); - } - toDelete.add(key); - } - - try { - for (Long key: toDelete) { - HStoreFile hsf = this.storefiles.remove(key); - hsf.delete(); + synchronized (storefiles) { + List toDelete = new ArrayList(); + for (Map.Entry e: this.storefiles.entrySet()) { + if (!compactedFiles.contains(e.getValue())) { + continue; + } + Long key = e.getKey(); + MapFile.Reader reader = this.readers.remove(key); + if (reader != null) { + reader.close(); + } + toDelete.add(key); } - // 6. Loading the new TreeMap. - Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); - this.readers.put(orderVal, - // Use a block cache (if configured) for this reader since - // it is the only one. - finalCompactedFile.getReader(this.fs, this.bloomFilter, - family.isBlockCacheEnabled())); - this.storefiles.put(orderVal, finalCompactedFile); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files for " + this.storeName + - ". Compacted file is " + finalCompactedFile.toString() + - ". Files replaced are " + compactedFiles.toString() + - " some of which may have been already removed", e); + try { + for (Long key: toDelete) { + HStoreFile hsf = this.storefiles.remove(key); + hsf.delete(); + } + + // 6. Loading the new TreeMap. + Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); + this.readers.put(orderVal, + // Use a block cache (if configured) for this reader since + // it is the only one. + finalCompactedFile.getReader(this.fs, this.bloomFilter, + family.isBlockCacheEnabled())); + this.storefiles.put(orderVal, finalCompactedFile); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files for " + this.storeName + + ". Compacted file is " + finalCompactedFile.toString() + + ". Files replaced are " + compactedFiles.toString() + + " some of which may have been already removed", e); + } + // 7. Compute new store size + storeSize = 0L; + for (HStoreFile hsf: storefiles.values()) { + storeSize += hsf.length(); + } } } finally { - // 7. Releasing the write-lock + // 8. Releasing the write-lock this.lock.writeLock().unlock(); } } finally { - // 8. Allow new scanners to proceed. + // 9. Allow new scanners to proceed. newScannerLock.writeLock().unlock(); } } @@ -1304,7 +1314,7 @@ public class HStore implements HConstants { do{ // if the row matches, we might want this one. - if(rowMatches(origin, readkey)){ + if (rowMatches(origin, readkey)) { // if the cell matches, then we definitely want this key. if (cellMatches(origin, readkey)) { // store the key if it isn't deleted or superceeded by what's @@ -1323,11 +1333,11 @@ public class HStore implements HConstants { // timestamps, so move to the next key continue; } - } else{ + } else { // the row doesn't match, so we've gone too far. break; } - }while(map.next(readkey, readval)); // advance to the next key + } while (map.next(readkey, readval)); // advance to the next key } } @@ -1572,71 +1582,77 @@ public class HStore implements HConstants { } /** - * Gets size for the store. + * Determines if HStore can be split * - * @param midKey Gets set to the middle key of the largest splitable store - * file or its set to empty if largest is not splitable. - * @return Sizes for the store and the passed midKey is - * set to midKey of largest splitable. Otherwise, its set to empty - * to indicate we couldn't find a midkey to split on + * @return midKey if store can be split, null otherwise */ - HStoreSize size(Text midKey) { - long maxSize = 0L; - long aggregateSize = 0L; - // Not splitable if we find a reference store file present in the store. - boolean splitable = true; + Text checkSplit() { if (this.storefiles.size() <= 0) { - return new HStoreSize(0, 0, splitable); + return null; + } + if (storeSize < this.desiredMaxFileSize) { + return null; } - this.lock.readLock().lock(); try { + // Not splitable if we find a reference store file present in the store. + boolean splitable = true; + long maxSize = 0L; Long mapIndex = Long.valueOf(0L); // Iterate through all the MapFiles - for (Map.Entry e: storefiles.entrySet()) { - HStoreFile curHSF = e.getValue(); - long size = curHSF.length(); - aggregateSize += size; - if (maxSize == 0L || size > maxSize) { - // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); - } - if (splitable) { - splitable = !curHSF.isReference(); + synchronized (storefiles) { + for (Map.Entry e: storefiles.entrySet()) { + HStoreFile curHSF = e.getValue(); + long size = curHSF.length(); + if (size > maxSize) { + // This is the largest one so far + maxSize = size; + mapIndex = e.getKey(); + } + if (splitable) { + splitable = !curHSF.isReference(); + } } } - if (splitable) { - MapFile.Reader r = this.readers.get(mapIndex); - // seek back to the beginning of mapfile - r.reset(); - // get the first and last keys - HStoreKey firstKey = new HStoreKey(); - HStoreKey lastKey = new HStoreKey(); - Writable value = new ImmutableBytesWritable(); - r.next(firstKey, value); - r.finalKey(lastKey); - // get the midkey - HStoreKey mk = (HStoreKey)r.midKey(); - if (mk != null) { - // if the midkey is the same as the first and last keys, then we cannot - // (ever) split this region. - if (mk.getRow().equals(firstKey.getRow()) && - mk.getRow().equals(lastKey.getRow())) { - return new HStoreSize(aggregateSize, maxSize, false); - } - // Otherwise, set midKey - midKey.set(mk.getRow()); + if (!splitable) { + return null; + } + MapFile.Reader r = this.readers.get(mapIndex); + + // seek back to the beginning of mapfile + r.reset(); + + // get the first and last keys + HStoreKey firstKey = new HStoreKey(); + HStoreKey lastKey = new HStoreKey(); + Writable value = new ImmutableBytesWritable(); + r.next(firstKey, value); + r.finalKey(lastKey); + + // get the midkey + HStoreKey mk = (HStoreKey)r.midKey(); + if (mk != null) { + // if the midkey is the same as the first and last keys, then we cannot + // (ever) split this region. + if (mk.getRow().equals(firstKey.getRow()) && + mk.getRow().equals(lastKey.getRow())) { + return null; } + return mk.getRow(); } } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeName, e); } finally { this.lock.readLock().unlock(); } - return new HStoreSize(aggregateSize, maxSize, splitable); + return null; } + /** @return aggregate size of HStore */ + public long getSize() { + return storeSize; + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -1665,7 +1681,7 @@ public class HStore implements HConstants { /** {@inheritDoc} */ @Override public String toString() { - return this.storeName; + return this.storeName.toString(); } /* diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java b/src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java deleted file mode 100644 index f45f3c3089c..00000000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java +++ /dev/null @@ -1,33 +0,0 @@ - -package org.apache.hadoop.hbase.regionserver; - -/* - * Data structure to hold result of a look at store file sizes. - */ -public class HStoreSize { - final long aggregate; - final long largest; - boolean splitable; - - HStoreSize(final long a, final long l, final boolean s) { - this.aggregate = a; - this.largest = l; - this.splitable = s; - } - - public long getAggregate() { - return this.aggregate; - } - - public long getLargest() { - return this.largest; - } - - public boolean isSplitable() { - return this.splitable; - } - - public void setSplitable(final boolean s) { - this.splitable = s; - } -} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java index 7f8a36f2963..beacc6c0d83 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.List; import java.util.ArrayList; import java.util.Set; -import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -63,7 +62,7 @@ class Memcache { snapshot = Collections.synchronizedSortedMap(new TreeMap()); } - + /** * Creates a snapshot of the current Memcache */ @@ -196,12 +195,9 @@ class Memcache { /** * @param row * @param timestamp - * @return the key that matches row exactly, or the one that - * immediately preceeds it. */ void getRowKeyAtOrBefore(final Text row, - SortedMap candidateKeys) - throws IOException { + SortedMap candidateKeys) { this.lock.readLock().lock(); try { diff --git a/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java b/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java deleted file mode 100644 index 4c808d3b19f..00000000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright 2008 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.Delayed; - -/** Queue entry passed to flusher, compactor and splitter threads */ -class QueueEntry implements Delayed { - private final HRegion region; - private long expirationTime; - - QueueEntry(HRegion region, long expirationTime) { - this.region = region; - this.expirationTime = expirationTime; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object o) { - QueueEntry other = (QueueEntry) o; - return this.hashCode() == other.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return this.region.getRegionInfo().hashCode(); - } - - /** {@inheritDoc} */ - public long getDelay(TimeUnit unit) { - return unit.convert(this.expirationTime - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - public int compareTo(Delayed o) { - long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); - - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; - } - - /** @return the region */ - public HRegion getRegion() { - return region; - } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } -} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/util/MetaUtils.java b/src/java/org/apache/hadoop/hbase/util/MetaUtils.java index 251a961c385..637086c0e4e 100644 --- a/src/java/org/apache/hadoop/hbase/util/MetaUtils.java +++ b/src/java/org/apache/hadoop/hbase/util/MetaUtils.java @@ -35,12 +35,13 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HScannerInterface; import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.client.HTable; /** @@ -316,13 +317,16 @@ public class MetaUtils { throws IOException { HTable t = new HTable(c, HConstants.META_TABLE_NAME); Cell cell = t.get(row, HConstants.COL_REGIONINFO); + if (cell == null) { + throw new IOException("no information for row " + row); + } // Throws exception if null. HRegionInfo info = Writables.getHRegionInfo(cell); - long id = t.startUpdate(row); + BatchUpdate b = new BatchUpdate(row); info.setOffline(onlineOffline); - t.put(id, HConstants.COL_REGIONINFO, Writables.getBytes(info)); - t.delete(id, HConstants.COL_SERVER); - t.delete(id, HConstants.COL_STARTCODE); - t.commit(id); + b.put(HConstants.COL_REGIONINFO, Writables.getBytes(info)); + b.delete(HConstants.COL_SERVER); + b.delete(HConstants.COL_STARTCODE); + t.commit(b); } } diff --git a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java index bb8cf975447..58000373ade 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java @@ -44,7 +44,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { protected MiniDFSCluster dfsCluster; protected int regionServers; protected boolean startDfs; - + + /** default constructor */ public HBaseClusterTestCase() { this(1); } @@ -53,6 +54,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { * Start a MiniHBaseCluster with regionServers region servers in-process to * start with. Also, start a MiniDfsCluster before starting the hbase cluster. * The configuration used will be edited so that this works correctly. + * @param regionServers number of region servers to start. */ public HBaseClusterTestCase(int regionServers) { this(regionServers, true); @@ -65,6 +67,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { * configured in hbase-site.xml and is already started, or you have started a * MiniDFSCluster on your own and edited the configuration in memory. (You * can modify the config used by overriding the preHBaseClusterSetup method.) + * @param regionServers number of region servers to start. + * @param startDfs set to true if MiniDFS should be started */ public HBaseClusterTestCase(int regionServers, boolean startDfs) { super(); @@ -81,9 +85,11 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { /** * Actually start the MiniHBase instance. */ + @SuppressWarnings("unused") protected void HBaseClusterSetup() throws Exception { // start the mini cluster this.cluster = new MiniHBaseCluster(conf, regionServers); + // opening the META table ensures that cluster is running HTable meta = new HTable(conf, new Text(".META.")); } diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 590938cd657..d55462162e5 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -102,11 +102,8 @@ public abstract class HBaseTestCase extends TestCase { localfs = (conf.get("fs.default.name", "file:///").compareTo("file::///") == 0); - try { + if (fs == null) { this.fs = FileSystem.get(conf); - } catch (IOException e) { - LOG.fatal("error getting file system", e); - throw e; } try { if (localfs) { @@ -509,54 +506,88 @@ public abstract class HBaseTestCase extends TestCase { */ public static class HTableIncommon implements Incommon { final HTable table; + private BatchUpdate batch; + + private void checkBatch() { + if (batch == null) { + throw new IllegalStateException("No batch update in progress."); + } + } + /** * @param table */ public HTableIncommon(final HTable table) { super(); this.table = table; + this.batch = null; } /** {@inheritDoc} */ - public void abort(long lockid) { - this.table.abort(lockid); + public void abort(@SuppressWarnings("unused") long lockid) { + if (this.batch != null) { + this.batch = null; + } } /** {@inheritDoc} */ - public void commit(long lockid) throws IOException { - this.table.commit(lockid); + public void commit(@SuppressWarnings("unused") long lockid) + throws IOException { + checkBatch(); + this.table.commit(batch); + this.batch = null; } + /** {@inheritDoc} */ - public void commit(long lockid, final long ts) throws IOException { - this.table.commit(lockid, ts); + public void commit(@SuppressWarnings("unused") long lockid, final long ts) + throws IOException { + checkBatch(); + this.batch.setTimestamp(ts); + this.table.commit(batch); + this.batch = null; } + /** {@inheritDoc} */ - public void put(long lockid, Text column, byte[] val) { - this.table.put(lockid, column, val); + public void put(@SuppressWarnings("unused") long lockid, Text column, + byte[] val) { + checkBatch(); + this.batch.put(column, val); } + /** {@inheritDoc} */ - public void delete(long lockid, Text column) { - this.table.delete(lockid, column); + public void delete(@SuppressWarnings("unused") long lockid, Text column) { + checkBatch(); + this.batch.delete(column); } + /** {@inheritDoc} */ public void deleteAll(Text row, Text column, long ts) throws IOException { this.table.deleteAll(row, column, ts); } + /** {@inheritDoc} */ public long startUpdate(Text row) { - return this.table.startUpdate(row); + if (this.batch != null) { + throw new IllegalStateException("Batch update already in progress."); + } + this.batch = new BatchUpdate(row); + return 0L; } + /** {@inheritDoc} */ public HScannerInterface getScanner(Text [] columns, Text firstRow, long ts) throws IOException { return this.table.obtainScanner(columns, firstRow, ts, null); } + /** {@inheritDoc} */ public Cell get(Text row, Text column) throws IOException { return this.table.get(row, column); } + /** {@inheritDoc} */ public Cell[] get(Text row, Text column, int versions) throws IOException { return this.table.get(row, column, versions); } + /** {@inheritDoc} */ public Cell[] get(Text row, Text column, long ts, int versions) throws IOException { @@ -576,8 +607,10 @@ public abstract class HBaseTestCase extends TestCase { fail(column.toString() + " at timestamp " + timestamp + "\" was expected to be \"" + value + " but was null"); } - assertEquals(column.toString() + " at timestamp " - + timestamp, value, new String(cell_value.getValue())); + if (cell_value != null) { + assertEquals(column.toString() + " at timestamp " + + timestamp, value, new String(cell_value.getValue())); + } } } } diff --git a/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java b/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java index f057aa4a78a..71521115c0f 100644 --- a/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.Set; import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate; * Test HBase Master and Region servers, client API */ public class TestHBaseCluster extends HBaseClusterTestCase { + private static final Log LOG = LogFactory.getLog(TestHBaseCluster.class); private HTableDescriptor desc; private HBaseAdmin admin; @@ -104,7 +107,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase { (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING)); table.commit(b); } - System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + LOG.info("Write " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // Read them back in @@ -134,7 +137,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase { teststr.compareTo(bodystr) == 0); } - System.out.println("Read " + NUM_VALS + " rows. Elapsed time: " + LOG.info("Read " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); } @@ -175,7 +178,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase { anchorFetched++; } else { - System.out.println(col); + LOG.info(col); } } curVals.clear(); @@ -184,7 +187,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase { assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched); assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched); - System.out.println("Scanned " + NUM_VALS + LOG.info("Scanned " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); diff --git a/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java b/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java index c2559f9932e..56319a2a859 100644 --- a/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java +++ b/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MultiRegionTable; -import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -76,9 +76,9 @@ public class TestTableIndex extends MultiRegionTable { private HTableDescriptor desc; + private JobConf jobConf = null; - private Path dir; - + /** default constructor */ public TestTableIndex() { // Enable DEBUG-level MR logging. Logger.getLogger("org.apache.hadoop.mapred").setLevel(Level.DEBUG); @@ -105,7 +105,6 @@ public class TestTableIndex extends MultiRegionTable { // Create a table. HBaseAdmin admin = new HBaseAdmin(conf); admin.createTable(desc); - // Populate a table into multiple regions makeMultiRegionTable(conf, cluster, dfsCluster.getFileSystem(), TABLE_NAME, INPUT_COLUMN); @@ -116,6 +115,14 @@ public class TestTableIndex extends MultiRegionTable { assertTrue(startKeys.length > 1); } + /** {@inheritDoc} */ + @Override + public void tearDown() throws Exception { + if (jobConf != null) { + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + /** * Test HBase map/reduce * @@ -135,7 +142,7 @@ public class TestTableIndex extends MultiRegionTable { conf.set("hbase.index.conf", createIndexConfContent()); try { - JobConf jobConf = new JobConf(conf, TestTableIndex.class); + jobConf = new JobConf(conf, TestTableIndex.class); jobConf.setJobName("index column contents"); jobConf.setNumMapTasks(2); // number of indexes to partition into diff --git a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java index cbee64752ee..59f148519f2 100644 --- a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java +++ b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -20,13 +20,14 @@ package org.apache.hadoop.hbase.mapred; import java.io.IOException; +import java.io.File; import java.io.UnsupportedEncodingException; import java.util.Map; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MultiRegionTable; -import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.MapWritable; @@ -66,8 +66,6 @@ public class TestTableMapReduce extends MultiRegionTable { TEXT_OUTPUT_COLUMN }; - private Path dir; - private static byte[][] values = null; static { @@ -193,8 +191,9 @@ public class TestTableMapReduce extends MultiRegionTable { @SuppressWarnings("deprecation") MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + JobConf jobConf = null; try { - JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf = new JobConf(conf, TestTableMapReduce.class); jobConf.setJobName("process column contents"); jobConf.setNumMapTasks(1); jobConf.setNumReduceTasks(1); @@ -215,6 +214,9 @@ public class TestTableMapReduce extends MultiRegionTable { verify(SINGLE_REGION_TABLE_NAME); } finally { mrCluster.shutdown(); + if (jobConf != null) { + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } } } @@ -244,8 +246,9 @@ public class TestTableMapReduce extends MultiRegionTable { @SuppressWarnings("deprecation") MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + JobConf jobConf = null; try { - JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf = new JobConf(conf, TestTableMapReduce.class); jobConf.setJobName("process column contents"); jobConf.setNumMapTasks(2); jobConf.setNumReduceTasks(1); @@ -262,6 +265,9 @@ public class TestTableMapReduce extends MultiRegionTable { verify(MULTI_REGION_TABLE_NAME); } finally { mrCluster.shutdown(); + if (jobConf != null) { + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } } } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java index f941e334e5b..9345dd9b219 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -93,7 +93,6 @@ public class TestCompaction extends HBaseTestCase { */ public void testCompaction() throws Exception { createStoreFile(r); - assertFalse(r.compactIfNeeded()); for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); } @@ -106,35 +105,8 @@ public class TestCompaction extends HBaseTestCase { r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); // Assert that I can get > 5 versions (Should be at least 5 in there). assertTrue(cellValues.length >= 5); - // Try to run compaction concurrent with a thread flush just to see that - // we can. - final HRegion region = this.r; - Thread t1 = new Thread() { - @Override - public void run() { - try { - region.flushcache(); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - assertTrue(region.compactIfNeeded()); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - t1.setDaemon(true); - t1.start(); - t2.setDaemon(true); - t2.start(); - t1.join(); - t2.join(); + r.flushcache(); + r.compactStores(); // Now assert that there are 4 versions of a record only: thats the // 3 versions that should be in the compacted store and then the one more // we added when we flushed. But could be 3 only if the flush happened @@ -170,7 +142,8 @@ public class TestCompaction extends HBaseTestCase { // compacted store and the flush above when we added deletes. Add more // content to be certain. createSmallerStoreFile(this.r); - assertTrue(r.compactIfNeeded()); + r.flushcache(); + r.compactStores(); // Assert that the first row is still deleted. cellValues = r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); assertNull(cellValues); diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java index dbb9eb64cc5..d2754bebe2a 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -44,6 +45,13 @@ public class TestHMemcache extends TestCase { private static final String COLUMN_FAMILY = "column"; + private static final int FIRST_ROW = 1; + private static final int NUM_VALS = 1000; + private static final Text CONTENTS_BASIC = new Text("contents:basic"); + private static final String CONTENTSTR = "contentstr"; + private static final String ANCHORNUM = "anchor:anchornum-"; + private static final String ANCHORSTR = "anchorstr"; + /** {@inheritDoc} */ @Override public void setUp() throws Exception { @@ -51,6 +59,50 @@ public class TestHMemcache extends TestCase { this.hmemcache = new Memcache(); } + /** + * @throws UnsupportedEncodingException + */ + public void testMemcache() throws UnsupportedEncodingException { + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { + Text row = new Text("row_" + k); + HStoreKey key = + new HStoreKey(row, CONTENTS_BASIC, System.currentTimeMillis()); + hmemcache.add(key, (CONTENTSTR + k).getBytes(HConstants.UTF8_ENCODING)); + + key = + new HStoreKey(row, new Text(ANCHORNUM + k), System.currentTimeMillis()); + hmemcache.add(key, (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING)); + } + + // Read them back + + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { + List results; + Text row = new Text("row_" + k); + HStoreKey key = new HStoreKey(row, CONTENTS_BASIC, Long.MAX_VALUE); + results = hmemcache.get(key, 1); + assertNotNull("no data for " + key.toString(), results); + assertEquals(1, results.size()); + String bodystr = new String(results.get(0).getValue(), + HConstants.UTF8_ENCODING); + String teststr = CONTENTSTR + k; + assertTrue("Incorrect value for key: (" + key.toString() + + "), expected: '" + teststr + "' got: '" + + bodystr + "'", teststr.compareTo(bodystr) == 0); + + key = new HStoreKey(row, new Text(ANCHORNUM + k), Long.MAX_VALUE); + results = hmemcache.get(key, 1); + assertNotNull("no data for " + key.toString(), results); + assertEquals(1, results.size()); + bodystr = new String(results.get(0).getValue(), + HConstants.UTF8_ENCODING); + teststr = ANCHORSTR + k; + assertTrue("Incorrect value for key: (" + key.toString() + + "), expected: '" + teststr + "' got: '" + bodystr + "'", + teststr.compareTo(bodystr) == 0); + } + } + private Text getRowName(final int index) { return new Text("row" + Integer.toString(index)); } @@ -175,8 +227,8 @@ public class TestHMemcache extends TestCase { } } - /** For HBASE-528 **/ - public void testGetRowKeyAtOrBefore() throws IOException { + /** For HBASE-528 */ + public void testGetRowKeyAtOrBefore() { // set up some test data Text t10 = new Text("010"); Text t20 = new Text("020"); diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 40e998e8a20..7baaafba0c4 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -19,7 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -57,7 +56,7 @@ implements RegionUnavailableListener { */ public void testHRegion() throws IOException { try { - setup(); + init(); locks(); badPuts(); basic(); @@ -65,14 +64,7 @@ implements RegionUnavailableListener { batchWrite(); splitAndMerge(); read(); - cleanup(); } finally { - if (r != null) { - r.close(); - } - if (log != null) { - log.closeAndDelete(); - } StaticTestEnvironment.shutdownDfs(cluster); } } @@ -96,22 +88,36 @@ implements RegionUnavailableListener { HRegionIncommon region = null; private static int numInserted = 0; - - // Create directories, start mini cluster, etc. - private void setup() throws IOException { + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + this.conf.set("hbase.hstore.compactionThreshold", "2"); + + if (!StaticTestEnvironment.debugging) { + conf.setLong("hbase.hregion.max.filesize", 65536); + } cluster = new MiniDFSCluster(conf, 2, true, (String[])null); + fs = cluster.getFileSystem(); + // Set the hbase.rootdir to be the home directory in mini dfs. this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem().getHomeDirectory().toString()); + + super.setUp(); + } + // Create directories, start mini cluster, etc. + + private void init() throws IOException { desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); r = createNewHRegion(desc, null, null); log = r.getLog(); region = new HRegionIncommon(r); + LOG.info("setup completed."); } // Test basic functionality. Writes to contents:basic and anchor:anchornum-* @@ -129,7 +135,7 @@ implements RegionUnavailableListener { (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING)); region.commit(writeid, System.currentTimeMillis()); } - System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + LOG.info("Write " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // Flush cache @@ -138,7 +144,7 @@ implements RegionUnavailableListener { region.flushcache(); - System.out.println("Cache flush elapsed time: " + LOG.info("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // Read them back in @@ -165,8 +171,10 @@ implements RegionUnavailableListener { bodystr, teststr); } - System.out.println("Read " + NUM_VALS + " rows. Elapsed time: " + LOG.info("Read " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); + + LOG.info("basic completed."); } private void badPuts() { @@ -198,6 +206,7 @@ implements RegionUnavailableListener { } } assertTrue("Bad family", exceptionThrown); + LOG.info("badPuts completed."); } /** @@ -253,6 +262,7 @@ implements RegionUnavailableListener { } } } + LOG.info("locks completed."); } // Test scanners. Writes contents:firstcol and anchor:secondcol @@ -283,7 +293,7 @@ implements RegionUnavailableListener { numInserted += 2; } - System.out.println("Write " + (vals1.length / 2) + " elapsed time: " + LOG.info("Write " + (vals1.length / 2) + " elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 2. Scan from cache @@ -321,7 +331,7 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + (vals1.length / 2) + LOG.info("Scanned " + (vals1.length / 2) + " rows from cache. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -331,7 +341,7 @@ implements RegionUnavailableListener { region.flushcache(); - System.out.println("Cache flush elapsed time: " + LOG.info("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 4. Scan from disk @@ -368,7 +378,7 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + (vals1.length / 2) + LOG.info("Scanned " + (vals1.length / 2) + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -386,7 +396,7 @@ implements RegionUnavailableListener { numInserted += 2; } - System.out.println("Write " + (vals1.length / 2) + " rows. Elapsed time: " + LOG.info("Write " + (vals1.length / 2) + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 6. Scan from cache and disk @@ -423,7 +433,7 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + vals1.length + LOG.info("Scanned " + vals1.length + " rows from cache and disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -433,7 +443,7 @@ implements RegionUnavailableListener { region.flushcache(); - System.out.println("Cache flush elapsed time: " + LOG.info("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 8. Scan from disk @@ -468,7 +478,7 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + vals1.length + LOG.info("Scanned " + vals1.length + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -506,9 +516,11 @@ implements RegionUnavailableListener { } assertEquals("Should have fetched " + (numInserted / 2) + " values, but fetched " + numFetched, (numInserted / 2), numFetched); - System.out.println("Scanned " + (numFetched / 2) + LOG.info("Scanned " + (numFetched / 2) + " rows from disk with specified start point. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); + + LOG.info("scan completed."); } // Do a large number of writes. Disabled if not debugging because it takes a @@ -517,6 +529,7 @@ implements RegionUnavailableListener { private void batchWrite() throws IOException { if(! StaticTestEnvironment.debugging) { + LOG.info("batchWrite completed."); return; } @@ -542,7 +555,7 @@ implements RegionUnavailableListener { buf1.toString().getBytes(HConstants.UTF8_ENCODING)); region.commit(writeid, System.currentTimeMillis()); if (k > 0 && k % (N_ROWS / 100) == 0) { - System.out.println("Flushing write #" + k); + LOG.info("Flushing write #" + k); long flushStart = System.currentTimeMillis(); region.flushcache(); @@ -550,51 +563,55 @@ implements RegionUnavailableListener { totalFlush += (flushEnd - flushStart); if (k % (N_ROWS / 10) == 0) { - System.out.print("Rolling log..."); + System.err.print("Rolling log..."); long logStart = System.currentTimeMillis(); log.rollWriter(); long logEnd = System.currentTimeMillis(); totalLog += (logEnd - logStart); - System.out.println(" elapsed time: " + ((logEnd - logStart) / 1000.0)); + LOG.info(" elapsed time: " + ((logEnd - logStart) / 1000.0)); } } } long startCompact = System.currentTimeMillis(); - if(r.compactIfNeeded()) { - totalCompact = System.currentTimeMillis() - startCompact; - System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); - - } else { - System.out.println("No compaction required."); - } + r.compactStores(); + totalCompact = System.currentTimeMillis() - startCompact; + LOG.info("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); long endTime = System.currentTimeMillis(); long totalElapsed = (endTime - startTime); - System.out.println(); - System.out.println("Batch-write complete."); - System.out.println("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes"); - System.out.println("Total flush-time: " + (totalFlush / 1000.0)); - System.out.println("Total compact-time: " + (totalCompact / 1000.0)); - System.out.println("Total log-time: " + (totalLog / 1000.0)); - System.out.println("Total time elapsed: " + (totalElapsed / 1000.0)); - System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0))); - System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)); - System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0))); - System.out.println(); + LOG.info(""); + LOG.info("Batch-write complete."); + LOG.info("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes"); + LOG.info("Total flush-time: " + (totalFlush / 1000.0)); + LOG.info("Total compact-time: " + (totalCompact / 1000.0)); + LOG.info("Total log-time: " + (totalLog / 1000.0)); + LOG.info("Total time elapsed: " + (totalElapsed / 1000.0)); + LOG.info("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0))); + LOG.info("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)); + LOG.info("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0))); + LOG.info(""); + LOG.info("batchWrite completed."); } // NOTE: This test depends on testBatchWrite succeeding private void splitAndMerge() throws IOException { Path oldRegionPath = r.getRegionDir(); + Text midKey = r.compactStores(); + assertNotNull(midKey); long startTime = System.currentTimeMillis(); - HRegion subregions[] = r.splitRegion(this); + HRegion subregions[] = r.splitRegion(this, midKey); if (subregions != null) { - System.out.println("Split region elapsed time: " + LOG.info("Split region elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); assertEquals("Number of subregions", subregions.length, 2); + for (int i = 0; i < subregions.length; i++) { + subregions[i] = openClosedRegion(subregions[i]); + subregions[i].compactStores(); + } + // Now merge it back together Path oldRegion1 = subregions[0].getRegionDir(); @@ -602,12 +619,13 @@ implements RegionUnavailableListener { startTime = System.currentTimeMillis(); r = HRegion.mergeAdjacent(subregions[0], subregions[1]); region = new HRegionIncommon(r); - System.out.println("Merge regions elapsed time: " + LOG.info("Merge regions elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); fs.delete(oldRegion1); fs.delete(oldRegion2); fs.delete(oldRegionPath); } + LOG.info("splitAndMerge completed."); } /** @@ -668,7 +686,7 @@ implements RegionUnavailableListener { anchorFetched++; } else { - System.out.println("UNEXPECTED COLUMN " + col); + LOG.info("UNEXPECTED COLUMN " + col); } } curVals.clear(); @@ -677,7 +695,7 @@ implements RegionUnavailableListener { assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched); assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched); - System.out.println("Scanned " + NUM_VALS + LOG.info("Scanned " + NUM_VALS + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -720,7 +738,7 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + (numFetched / 2) + LOG.info("Scanned " + (numFetched / 2) + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -753,7 +771,7 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + N_ROWS + " values, but fetched " + numFetched, N_ROWS, numFetched); - System.out.println("Scanned " + N_ROWS + LOG.info("Scanned " + N_ROWS + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -785,37 +803,14 @@ implements RegionUnavailableListener { } assertEquals("Inserted " + (NUM_VALS + numInserted/2) + " values, but fetched " + fetched, (NUM_VALS + numInserted/2), fetched); - System.out.println("Scanned " + fetched + LOG.info("Scanned " + fetched + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); } finally { s.close(); } + LOG.info("read completed."); } - private static void deleteFile(File f) { - if(f.isDirectory()) { - File[] children = f.listFiles(); - for(int i = 0; i < children.length; i++) { - deleteFile(children[i]); - } - } - f.delete(); - } - - private void cleanup() { - try { - r.close(); - r = null; - log.closeAndDelete(); - log = null; - } catch (IOException e) { - e.printStackTrace(); - } - - // Delete all the DFS files - - deleteFile(new File(System.getProperty("test.build.data"), "dfs")); - } } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java b/src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java index 4dd40801379..d65740bc198 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java @@ -24,16 +24,15 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.MultiRegionTable; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HScannerInterface; -import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.hbase.io.Cell; /** @@ -75,6 +74,7 @@ public class TestSplit extends MultiRegionTable { HRegion region = null; try { HTableDescriptor htd = createTableDescriptor(getName()); + htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3)); region = createNewHRegion(htd, null, null); basicSplit(region); } finally { @@ -88,9 +88,9 @@ public class TestSplit extends MultiRegionTable { private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); region.flushcache(); - Text midkey = new Text(); - assertTrue(region.needsSplit(midkey)); - HRegion [] regions = split(region); + Text midkey = region.compactStores(); + assertNotNull(midkey); + HRegion [] regions = split(region, midkey); try { // Need to open the regions. // TODO: Add an 'open' to HRegion... don't do open by constructing @@ -106,17 +106,9 @@ public class TestSplit extends MultiRegionTable { assertScan(regions[0], COLFAMILY_NAME3, new Text(START_KEY)); assertScan(regions[1], COLFAMILY_NAME3, midkey); // Now prove can't split regions that have references. - Text[] midkeys = new Text[regions.length]; for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); // Add so much data to this region, we create a store file that is > - // than - // one of our unsplitable references. - // it will. + // than one of our unsplitable references. it will. for (int j = 0; j < 2; j++) { addContent(regions[i], COLFAMILY_NAME3); } @@ -125,30 +117,23 @@ public class TestSplit extends MultiRegionTable { regions[i].flushcache(); } - // Assert that even if one store file is larger than a reference, the - // region is still deemed unsplitable (Can't split region if references - // presen). - for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); - } - + Text[] midkeys = new Text[regions.length]; // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { - regions[i].compactStores(); + midkeys[i] = regions[i].compactStores(); } TreeMap sortedMap = new TreeMap(); // Split these two daughter regions so then I'll have 4 regions. Will // split because added data above. for (int i = 0; i < regions.length; i++) { - HRegion[] rs = split(regions[i]); - for (int j = 0; j < rs.length; j++) { - sortedMap.put(rs[j].getRegionName().toString(), - openClosedRegion(rs[j])); + HRegion[] rs = null; + if (midkeys[i] != null) { + rs = split(regions[i], midkeys[i]); + for (int j = 0; j < rs.length; j++) { + sortedMap.put(rs[j].getRegionName().toString(), + openClosedRegion(rs[j])); + } } } LOG.info("Made 4 regions"); @@ -219,12 +204,11 @@ public class TestSplit extends MultiRegionTable { } } - private HRegion [] split(final HRegion r) throws IOException { - Text midKey = new Text(); - assertTrue(r.needsSplit(midKey)); + private HRegion [] split(final HRegion r, final Text midKey) + throws IOException { // Assert can get mid key from passed region. assertGet(r, COLFAMILY_NAME3, midKey); - HRegion [] regions = r.splitRegion(null); + HRegion [] regions = r.splitRegion(null, midKey); assertEquals(regions.length, 2); return regions; } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestTimestamp.java b/src/test/org/apache/hadoop/hbase/regionserver/TestTimestamp.java index 54b7fbba9cb..615b98ccc41 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestTimestamp.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestTimestamp.java @@ -19,20 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.TreeMap; import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.HScannerInterface; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.TimestampTestBase; import org.apache.commons.logging.Log; @@ -43,7 +35,7 @@ import org.apache.commons.logging.LogFactory; * tests same in presence of deletes. Test cores are written so can be * run against an HRegion and against an HTable: i.e. both local and remote. */ -public class TestTimestamp extends HBaseTestCase { +public class TestTimestamp extends HBaseClusterTestCase { private static final Log LOG = LogFactory.getLog(TestTimestamp.class.getName()); @@ -51,11 +43,6 @@ public class TestTimestamp extends HBaseTestCase { private static final Text COLUMN = new Text(COLUMN_NAME); private static final int VERSIONS = 3; - /** constructor */ - public TestTimestamp() { - super(); - } - /** * Test that delete works according to description in hadoop-1784. diff --git a/src/test/org/apache/hadoop/hbase/util/TestMergeTool.java b/src/test/org/apache/hadoop/hbase/util/TestMergeTool.java index bd86b2bee02..e851a2272ff 100644 --- a/src/test/org/apache/hadoop/hbase/util/TestMergeTool.java +++ b/src/test/org/apache/hadoop/hbase/util/TestMergeTool.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; /** Test stand alone merge tool that can merge arbitrary regions */ @@ -54,6 +55,7 @@ public class TestMergeTool extends HBaseTestCase { /** {@inheritDoc} */ @Override public void setUp() throws Exception { + this.conf.set("hbase.hstore.compactionThreshold", "2"); // Create table description @@ -237,7 +239,9 @@ public class TestMergeTool extends HBaseTestCase { for (int i = 0; i < 3 ; i++) { for (int j = 0; j < rows[i].length; j++) { - byte[] bytes = merged.get(rows[i][j], COLUMN_NAME).getValue(); + Cell cell = merged.get(rows[i][j], COLUMN_NAME); + assertNotNull(cell); + byte[] bytes = cell.getValue(); assertNotNull(bytes); Text value = new Text(bytes); assertTrue(value.equals(rows[i][j]));