s: servedRegions.values()) {
for (HRegionInfo i: s) {
- regionsToDelete.add(i.getRegionName());
+ synchronized (serversToServerInfo) {
+ regionsToDelete.add(i.getRegionName());
+ }
}
}
@@ -3005,6 +3055,10 @@ HMasterRegionInterface {
synchronized (serversToServerInfo) {
info = serversToServerInfo.remove(server);
if (info != null) {
+ HServerAddress root = rootRegionLocation.get();
+ if (root != null && root.equals(info.getServerAddress())) {
+ unassignRootRegion();
+ }
String serverName = info.getServerAddress().toString();
HServerLoad load = serversToLoad.remove(serverName);
if (load != null) {
@@ -3021,9 +3075,9 @@ HMasterRegionInterface {
// NOTE: If the server was serving the root region, we cannot reassign it
// here because the new server will start serving the root region before
- // the PendingServerShutdown operation has a chance to split the log file.
+ // the ProcessServerShutdown operation has a chance to split the log file.
if (info != null) {
- shutdownQueue.put(new PendingServerShutdown(info));
+ shutdownQueue.put(new ProcessServerShutdown(info));
}
}
}
diff --git a/src/java/org/apache/hadoop/hbase/HMerge.java b/src/java/org/apache/hadoop/hbase/HMerge.java
index a0cf28b688e..a941899228a 100644
--- a/src/java/org/apache/hadoop/hbase/HMerge.java
+++ b/src/java/org/apache/hadoop/hbase/HMerge.java
@@ -108,7 +108,8 @@ class HMerge implements HConstants {
this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
this.basedir = new Path(dir, "merge_" + System.currentTimeMillis());
fs.mkdirs(basedir);
- this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf);
+ this.hlog =
+ new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf, null);
}
void process() throws IOException {
@@ -150,11 +151,11 @@ class HMerge implements HConstants {
for(int i = 0; i < regions.length - 1; i++) {
if(currentRegion == null) {
currentRegion =
- new HRegion(dir, hlog, fs, conf, regions[i], null);
+ new HRegion(dir, hlog, fs, conf, regions[i], null, null);
currentSize = currentRegion.largestHStore(midKey).getAggregate();
}
nextRegion =
- new HRegion(dir, hlog, fs, conf, regions[i + 1], null);
+ new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null);
nextSize = nextRegion.largestHStore(midKey).getAggregate();
@@ -327,7 +328,7 @@ class HMerge implements HConstants {
// Scan root region to find all the meta regions
HRegion root =
- new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null);
+ new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
HInternalScannerInterface rootScanner =
root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
@@ -363,7 +364,7 @@ class HMerge implements HConstants {
HRegion newRegion) throws IOException {
HRegion root =
- new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null);
+ new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null, null);
Text[] regionsToDelete = {
oldRegion1,
diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java
index f4ff1a482f7..a6f978e11b7 100644
--- a/src/java/org/apache/hadoop/hbase/HRegion.java
+++ b/src/java/org/apache/hadoop/hbase/HRegion.java
@@ -90,7 +90,6 @@ public class HRegion implements HConstants {
static final Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false);
- private volatile long noFlushCount = 0;
/**
* Merge two HRegions. They must be available on the current
@@ -159,7 +158,7 @@ public class HRegion implements HConstants {
// Done
// Construction moves the merge files into place under region.
HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
- newRegionDir);
+ newRegionDir, null);
// Get rid of merges directory
@@ -221,9 +220,10 @@ public class HRegion implements HConstants {
volatile WriteState writestate = new WriteState();
final int memcacheFlushSize;
+ private volatile long lastFlushTime;
+ final CacheFlushListener flushListener;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
- protected final int optionalFlushCount;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0);
private final long desiredMaxFileSize;
@@ -251,11 +251,13 @@ public class HRegion implements HConstants {
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
+ * @param listener an object that implements CacheFlushListener or null
*
* @throws IOException
*/
public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles) throws IOException {
+ HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
+ throws IOException {
this.rootDir = rootDir;
this.log = log;
@@ -265,8 +267,6 @@ public class HRegion implements HConstants {
this.encodedRegionName =
HRegionInfo.encodeRegionName(this.regionInfo.getRegionName());
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.optionalFlushCount =
- conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@@ -314,6 +314,7 @@ public class HRegion implements HConstants {
// By default, we flush the cache when 16M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*16);
+ this.flushListener = listener;
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
@@ -323,6 +324,7 @@ public class HRegion implements HConstants {
// HRegion is ready to go!
this.writestate.compacting = false;
+ this.lastFlushTime = System.currentTimeMillis();
LOG.info("region " + this.regionInfo.getRegionName() + " available");
}
@@ -485,6 +487,11 @@ public class HRegion implements HConstants {
return this.fs;
}
+ /** @return the last time the region was flushed */
+ public long getLastFlushTime() {
+ return this.lastFlushTime;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// HRegion maintenance.
//
@@ -598,8 +605,10 @@ public class HRegion implements HConstants {
// Done!
// Opening the region copies the splits files from the splits directory
// under each region.
- HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
- HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
+ HRegion regionA =
+ new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
+ HRegion regionB =
+ new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
// Cleanup
boolean deleted = fs.delete(splits); // Get rid of splits directory
@@ -751,54 +760,30 @@ public class HRegion implements HConstants {
}
/**
- * Flush the cache if necessary. This is called periodically to minimize the
- * amount of log processing needed upon startup.
+ * Flush the cache.
*
- * The returned Vector is a list of all the files used by the component
- * HStores. It is a list of HStoreFile objects. If the returned value is
- * NULL, then the flush could not be executed, because the HRegion is busy
- * doing something else storage-intensive. The caller should check back
- * later.
+ * When this method is called the cache will be flushed unless:
+ *
+ * - the cache is empty
+ * - the region is closed.
+ * - a flush is already in progress
+ * - writes are disabled
+ *
*
* This method may block for some time, so it should not be called from a
* time-sensitive thread.
*
- * @param disableFutureWrites indicates that the caller intends to
- * close() the HRegion shortly, so the HRegion should not take on any new and
- * potentially long-lasting disk operations. This flush() should be the final
- * pre-close() disk operation.
+ * @return true if cache was flushed
+ *
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- void flushcache() throws IOException {
+ boolean flushcache() throws IOException {
lock.readLock().lock(); // Prevent splits and closes
try {
if (this.closed.get()) {
- return;
- }
- boolean needFlush = false;
- long memcacheSize = this.memcacheSize.get();
- if(memcacheSize > this.memcacheFlushSize) {
- needFlush = true;
- } else if (memcacheSize > 0) {
- if (this.noFlushCount >= this.optionalFlushCount) {
- LOG.info("Optional flush called " + this.noFlushCount +
- " times when data present without flushing. Forcing one.");
- needFlush = true;
- } else {
- // Only increment if something in the cache.
- // Gets zero'd when a flushcache is called.
- this.noFlushCount++;
- }
- }
- if (!needFlush) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cache flush not needed for region " +
- regionInfo.getRegionName() + ". Cache size=" + memcacheSize +
- ", cache flush threshold=" + this.memcacheFlushSize);
- }
- return;
+ return false;
}
synchronized (writestate) {
if ((!writestate.flushing) && writestate.writesEnabled) {
@@ -811,16 +796,15 @@ public class HRegion implements HConstants {
writestate.flushing + ", writesEnabled=" +
writestate.writesEnabled);
}
- return;
+ return false;
}
}
- this.noFlushCount = 0;
long startTime = -1;
synchronized (updateLock) {// Stop updates while we snapshot the memcaches
startTime = snapshotMemcaches();
}
try {
- internalFlushcache(startTime);
+ return internalFlushcache(startTime);
} finally {
synchronized (writestate) {
writestate.flushing = false;
@@ -835,7 +819,7 @@ public class HRegion implements HConstants {
/*
* It is assumed that updates are blocked for the duration of this method
*/
- long snapshotMemcaches() {
+ private long snapshotMemcaches() {
if (this.memcacheSize.get() == 0) {
return -1;
}
@@ -883,17 +867,24 @@ public class HRegion implements HConstants {
* routes.
*
*
This method may block for some time.
+ *
+ * @param startTime the time the cache was snapshotted or -1 if a flush is
+ * not needed
+ *
+ * @return true if the cache was flushed
+ *
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- void internalFlushcache(long startTime) throws IOException {
+ private boolean internalFlushcache(long startTime) throws IOException {
if (startTime == -1) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Not flushing cache: snapshotMemcaches() determined that " +
- "there was nothing to do");
+ LOG.debug("Not flushing cache for region " +
+ regionInfo.getRegionName() +
+ ": snapshotMemcaches() determined that there was nothing to do");
}
- return;
+ return false;
}
// We pass the log to the HMemcache, so we can lock down both
@@ -914,7 +905,6 @@ public class HRegion implements HConstants {
// Otherwise, the snapshot content while backed up in the hlog, it will not
// be part of the current running servers state.
- long logCacheFlushId = sequenceId;
try {
// A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the
@@ -938,7 +928,7 @@ public class HRegion implements HConstants {
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), logCacheFlushId);
+ regionInfo.getTableDesc().getName(), sequenceId);
// D. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
@@ -948,8 +938,10 @@ public class HRegion implements HConstants {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished memcache flush for region " +
this.regionInfo.getRegionName() + " in " +
- (System.currentTimeMillis() - startTime) + "ms");
+ (System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
+ sequenceId);
}
+ return true;
}
//////////////////////////////////////////////////////////////////////////////
@@ -1309,13 +1301,18 @@ public class HRegion implements HConstants {
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), updatesByColumn);
+ long memcacheSize = 0;
for (Map.Entry e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
byte[] val = e.getValue();
- this.memcacheSize.addAndGet(key.getSize() +
+ memcacheSize = this.memcacheSize.addAndGet(key.getSize() +
(val == null ? 0 : val.length));
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
}
+ if (this.flushListener != null && memcacheSize > this.memcacheFlushSize) {
+ // Request a cache flush
+ this.flushListener.flushRequested(this);
+ }
}
}
@@ -1582,8 +1579,8 @@ public class HRegion implements HConstants {
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
return new HRegion(rootDir,
- new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
- fs, conf, info, initialFiles);
+ new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+ fs, conf, info, initialFiles, null);
}
/**
diff --git a/src/java/org/apache/hadoop/hbase/HRegionInfo.java b/src/java/org/apache/hadoop/hbase/HRegionInfo.java
index 2a33a00f282..78ff42fa5b7 100644
--- a/src/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ b/src/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -78,7 +78,18 @@ public class HRegionInfo implements WritableComparable {
private boolean split;
private Text startKey;
private HTableDescriptor tableDesc;
-
+ private int hashCode;
+
+ private void setHashCode() {
+ int result = this.regionName.hashCode();
+ result ^= Long.valueOf(this.regionId).hashCode();
+ result ^= this.startKey.hashCode();
+ result ^= this.endKey.hashCode();
+ result ^= Boolean.valueOf(this.offLine).hashCode();
+ result ^= this.tableDesc.hashCode();
+ this.hashCode = result;
+ }
+
/** Used to construct the HRegionInfo for the root and first meta regions */
private HRegionInfo(long regionId, HTableDescriptor tableDesc) {
this.regionId = regionId;
@@ -89,6 +100,7 @@ public class HRegionInfo implements WritableComparable {
DELIMITER + regionId);
this.split = false;
this.startKey = new Text();
+ setHashCode();
}
/** Default constructor - creates empty object */
@@ -100,6 +112,7 @@ public class HRegionInfo implements WritableComparable {
this.split = false;
this.startKey = new Text();
this.tableDesc = new HTableDescriptor();
+ this.hashCode = 0;
}
/**
@@ -152,6 +165,7 @@ public class HRegionInfo implements WritableComparable {
}
this.tableDesc = tableDesc;
+ setHashCode();
}
/** @return the endKey */
@@ -232,13 +246,7 @@ public class HRegionInfo implements WritableComparable {
*/
@Override
public int hashCode() {
- int result = this.regionName.hashCode();
- result ^= Long.valueOf(this.regionId).hashCode();
- result ^= this.startKey.hashCode();
- result ^= this.endKey.hashCode();
- result ^= Boolean.valueOf(this.offLine).hashCode();
- result ^= this.tableDesc.hashCode();
- return result;
+ return this.hashCode;
}
//
@@ -256,6 +264,7 @@ public class HRegionInfo implements WritableComparable {
out.writeBoolean(split);
startKey.write(out);
tableDesc.write(out);
+ out.writeInt(hashCode);
}
/**
@@ -269,6 +278,7 @@ public class HRegionInfo implements WritableComparable {
this.split = in.readBoolean();
this.startKey.readFields(in);
this.tableDesc.readFields(in);
+ this.hashCode = in.readInt();
}
//
diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java
index 63eaaab444d..d06281cd394 100644
--- a/src/java/org/apache/hadoop/hbase/HRegionServer.java
+++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java
@@ -26,15 +26,19 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.ConcurrentModificationException;
import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,27 +129,79 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/** region server process name */
public static final String REGIONSERVER = "regionserver";
+ /** Queue entry passed to flusher, compactor and splitter threads */
+ class QueueEntry implements Delayed {
+ private final HRegion region;
+ private long expirationTime;
+
+ QueueEntry(HRegion region, long expirationTime) {
+ this.region = region;
+ this.expirationTime = expirationTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ QueueEntry other = (QueueEntry) o;
+ return this.hashCode() == other.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ return this.region.getRegionInfo().hashCode();
+ }
+
+ /** {@inheritDoc} */
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(this.expirationTime - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ public int compareTo(Delayed o) {
+ long delta = this.getDelay(TimeUnit.MILLISECONDS) -
+ o.getDelay(TimeUnit.MILLISECONDS);
+
+ int value = 0;
+ if (delta > 0) {
+ value = 1;
+
+ } else if (delta < 0) {
+ value = -1;
+ }
+ return value;
+ }
+
+ /** @return the region */
+ public HRegion getRegion() {
+ return region;
+ }
+
+ /** @param expirationTime the expirationTime to set */
+ public void setExpirationTime(long expirationTime) {
+ this.expirationTime = expirationTime;
+ }
+ }
+
// Check to see if regions should be split
- private final Thread splitOrCompactCheckerThread;
+ final Splitter splitter;
// Needed at shutdown. On way out, if can get this lock then we are not in
// middle of a split or compaction: i.e. splits/compactions cannot be
// interrupted.
- protected final Integer splitOrCompactLock = new Integer(0);
+ final Integer splitterLock = new Integer(0);
- /*
- * Runs periodically to determine if regions need to be compacted or split
- */
- class SplitOrCompactChecker extends Chore
- implements RegionUnavailableListener {
+ /** Split regions on request */
+ class Splitter extends Thread implements RegionUnavailableListener {
+ private final BlockingQueue splitQueue =
+ new LinkedBlockingQueue();
+
private HTable root = null;
private HTable meta = null;
- /**
- * @param stop
- */
- public SplitOrCompactChecker(final AtomicBoolean stop) {
- super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
- 30 * 1000), stop);
+ /** constructor */
+ public Splitter() {
+ super();
}
/** {@inheritDoc} */
@@ -178,35 +234,50 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
/**
- * Scan for splits or compactions to run. Run any we find.
+ * Perform region splits if necessary
*/
@Override
- protected void chore() {
- // Don't interrupt us while we're working
- synchronized (splitOrCompactLock) {
- checkForSplitsOrCompactions();
- }
- }
-
- private void checkForSplitsOrCompactions() {
- // Grab a list of regions to check
- List nonClosedRegionsToCheck = getRegionsToCheck();
- for(HRegion cur: nonClosedRegionsToCheck) {
+ public void run() {
+ while (!stopRequested.get()) {
+ QueueEntry e = null;
try {
- if (cur.compactIfNeeded()) {
- // After compaction, it probably needs splitting. May also need
- // splitting just because one of the memcache flushes was big.
- split(cur);
- }
+ e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ if (e == null) {
+ continue;
+ }
+ synchronized (splitterLock) { // Don't interrupt us while we're working
+ try {
+ split(e.getRegion());
+
+ } catch (IOException ex) {
+ LOG.error("Split failed for region " +
+ e.getRegion().getRegionName(),
+ RemoteExceptionHandler.checkIOException(ex));
+ if (!checkFileSystem()) {
+ break;
+ }
- } catch(IOException e) {
- //TODO: What happens if this fails? Are we toast?
- LOG.error("Split or compaction failed", e);
- if (!checkFileSystem()) {
- break;
+ } catch (Exception ex) {
+ LOG.error("Split failed on region " +
+ e.getRegion().getRegionName(), ex);
+ if (!checkFileSystem()) {
+ break;
+ }
}
}
}
+ LOG.info(getName() + " exiting");
+ }
+
+ /**
+ * @param e entry indicating which region needs to be split
+ */
+ public void splitRequested(QueueEntry e) {
+ splitQueue.add(e);
}
private void split(final HRegion region) throws IOException {
@@ -271,99 +342,239 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
- // Cache flushing
- private final Thread cacheFlusherThread;
+ // Compactions
+ final Compactor compactor;
// Needed during shutdown so we send an interrupt after completion of a
- // flush, not in the midst.
- protected final Integer cacheFlusherLock = new Integer(0);
-
- /* Runs periodically to flush memcache.
- */
- class Flusher extends Chore {
- /**
- * @param period
- * @param stop
- */
- public Flusher(final int period, final AtomicBoolean stop) {
- super(period, stop);
+ // compaction, not in the midst.
+ final Integer compactionLock = new Integer(0);
+
+ /** Compact region on request */
+ class Compactor extends Thread {
+ private final BlockingQueue compactionQueue =
+ new LinkedBlockingQueue();
+
+ /** constructor */
+ public Compactor() {
+ super();
}
+ /** {@inheritDoc} */
@Override
- protected void chore() {
- synchronized(cacheFlusherLock) {
- checkForFlushesToRun();
- }
- }
-
- private void checkForFlushesToRun() {
- // Grab a list of items to flush
- List nonClosedRegionsToFlush = getRegionsToCheck();
- // Flush them, if necessary
- for(HRegion cur: nonClosedRegionsToFlush) {
+ public void run() {
+ while (!stopRequested.get()) {
+ QueueEntry e = null;
try {
- cur.flushcache();
- } catch (DroppedSnapshotException e) {
- // Cache flush can fail in a few places. If it fails in a critical
- // section, we get a DroppedSnapshotException and a replay of hlog
- // is required. Currently the only way to do this is a restart of
- // the server.
- LOG.fatal("Replay of hlog required. Forcing server restart", e);
+ e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ if (e == null) {
+ continue;
+ }
+ try {
+ if (e.getRegion().compactIfNeeded()) {
+ splitter.splitRequested(e);
+ }
+
+ } catch (IOException ex) {
+ LOG.error("Compaction failed for region " +
+ e.getRegion().getRegionName(),
+ RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
}
- HRegionServer.this.stop();
- } catch (IOException iex) {
- LOG.error("Cache flush failed",
- RemoteExceptionHandler.checkIOException(iex));
+
+ } catch (Exception ex) {
+ LOG.error("Compaction failed for region " +
+ e.getRegion().getRegionName(), ex);
if (!checkFileSystem()) {
break;
}
}
}
+ LOG.info(getName() + " exiting");
+ }
+
+ /**
+ * @param e QueueEntry for region to be compacted
+ */
+ public void compactionRequested(QueueEntry e) {
+ compactionQueue.add(e);
+ }
+ }
+
+ // Cache flushing
+ final Flusher cacheFlusher;
+ // Needed during shutdown so we send an interrupt after completion of a
+ // flush, not in the midst.
+ final Integer cacheFlusherLock = new Integer(0);
+
+ /** Flush cache upon request */
+ class Flusher extends Thread implements CacheFlushListener {
+ private final DelayQueue flushQueue =
+ new DelayQueue();
+
+ private final long optionalFlushPeriod;
+
+ /** constructor */
+ public Flusher() {
+ super();
+ this.optionalFlushPeriod = conf.getLong(
+ "hbase.regionserver.optionalcacheflushinterval", 60L * 1000L);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+ while (!stopRequested.get()) {
+ QueueEntry e = null;
+ try {
+ e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException ex) {
+ continue;
+
+ } catch (ConcurrentModificationException ex) {
+ continue;
+
+ }
+ synchronized(cacheFlusherLock) { // Don't interrupt while we're working
+ if (e != null) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushing region " + e.getRegion().getRegionName());
+ }
+ if (e.getRegion().flushcache()) {
+ compactor.compactionRequested(e);
+ }
+
+ } catch (DroppedSnapshotException ex) {
+ // Cache flush can fail in a few places. If it fails in a critical
+ // section, we get a DroppedSnapshotException and a replay of hlog
+ // is required. Currently the only way to do this is a restart of
+ // the server.
+ LOG.fatal("Replay of hlog required. Forcing server restart", ex);
+ if (!checkFileSystem()) {
+ break;
+ }
+ HRegionServer.this.stop();
+
+ } catch (IOException ex) {
+ LOG.error("Cache flush failed for region " +
+ e.getRegion().getRegionName(),
+ RemoteExceptionHandler.checkIOException(ex));
+ if (!checkFileSystem()) {
+ break;
+ }
+
+ } catch (Exception ex) {
+ LOG.error("Cache flush failed for region " +
+ e.getRegion().getRegionName(), ex);
+ if (!checkFileSystem()) {
+ break;
+ }
+ }
+ e.setExpirationTime(System.currentTimeMillis() +
+ optionalFlushPeriod);
+ flushQueue.add(e);
+ }
+
+ // Now insure that all the active regions are in the queue
+
+ Set regions = getRegionsToCheck();
+ for (HRegion r: regions) {
+ e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
+ synchronized (flushQueue) {
+ if (!flushQueue.contains(e)) {
+ flushQueue.add(e);
+ }
+ }
+ }
+
+ // Now make sure that the queue only contains active regions
+
+ synchronized (flushQueue) {
+ for (Iterator i = flushQueue.iterator(); i.hasNext(); ) {
+ e = i.next();
+ if (!regions.contains(e.getRegion())) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+ flushQueue.clear();
+ LOG.info(getName() + " exiting");
+ }
+
+ /** {@inheritDoc} */
+ public void flushRequested(HRegion region) {
+ QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
+ synchronized (flushQueue) {
+ if (flushQueue.contains(e)) {
+ flushQueue.remove(e);
+ }
+ flushQueue.add(e);
+ }
}
}
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected HLog log;
- private final Thread logRollerThread;
- protected final Integer logRollerLock = new Integer(0);
+ final LogRoller logRoller;
+ final Integer logRollerLock = new Integer(0);
/** Runs periodically to determine if the HLog should be rolled */
- class LogRoller extends Chore {
- private int MAXLOGENTRIES =
- conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
+ class LogRoller extends Thread implements LogRollListener {
+ private volatile boolean rollLog;
- /**
- * @param period
- * @param stop
- */
- public LogRoller(final int period, final AtomicBoolean stop) {
- super(period, stop);
+ /** constructor */
+ public LogRoller() {
+ super();
+ this.rollLog = false;
}
/** {@inheritDoc} */
@Override
- protected void chore() {
- synchronized(logRollerLock) {
- checkForLogRoll();
+ public synchronized void run() {
+ while (!stopRequested.get()) {
+ try {
+ this.wait(threadWakeFrequency);
+
+ } catch (InterruptedException e) {
+ continue;
+ }
+ if (!rollLog) {
+ continue;
+ }
+ synchronized (logRollerLock) {
+ try {
+ LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries());
+ log.rollWriter();
+
+ } catch (IOException ex) {
+ LOG.error("Log rolling failed",
+ RemoteExceptionHandler.checkIOException(ex));
+ checkFileSystem();
+
+ } catch (Exception ex) {
+ LOG.error("Log rolling failed", ex);
+ checkFileSystem();
+
+ } finally {
+ rollLog = false;
+ }
+ }
}
}
- private void checkForLogRoll() {
- // If the number of log entries is high enough, roll the log. This
- // is a very fast operation, but should not be done too frequently.
- int nEntries = log.getNumEntries();
- if(nEntries > this.MAXLOGENTRIES) {
- try {
- LOG.info("Rolling hlog. Number of entries: " + nEntries);
- log.rollWriter();
- } catch (IOException iex) {
- LOG.error("Log rolling failed",
- RemoteExceptionHandler.checkIOException(iex));
- checkFileSystem();
- }
- }
+ /** {@inheritDoc} */
+ public synchronized void logRollRequested() {
+ rollLog = true;
+ this.notifyAll();
}
}
@@ -396,20 +607,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.serverLeaseTimeout =
conf.getInt("hbase.master.lease.period", 30 * 1000);
- // Cache flushing chore thread.
- this.cacheFlusherThread =
- new Flusher(this.threadWakeFrequency, stopRequested);
+ // Cache flushing thread.
+ this.cacheFlusher = new Flusher();
- // Check regions to see if they need to be split or compacted chore thread
- this.splitOrCompactCheckerThread =
- new SplitOrCompactChecker(this.stopRequested);
+ // Compaction thread
+ this.compactor = new Compactor();
+ // Region split thread
+ this.splitter = new Splitter();
+
+ // Log rolling thread
+ this.logRoller = new LogRoller();
+
// Task thread to process requests from Master
this.worker = new Worker();
this.workerThread = new Thread(worker);
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
- this.logRollerThread =
- new LogRoller(this.threadWakeFrequency, stopRequested);
// Server to handle client requests
this.server = RPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
@@ -557,14 +770,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
- synchronized(logRollerLock) {
- this.logRollerThread.interrupt();
- }
synchronized(cacheFlusherLock) {
- this.cacheFlusherThread.interrupt();
+ this.cacheFlusher.interrupt();
}
- synchronized(splitOrCompactLock) {
- this.splitOrCompactCheckerThread.interrupt();
+ synchronized (compactionLock) {
+ this.compactor.interrupt();
+ }
+ synchronized (splitterLock) {
+ this.splitter.interrupt();
+ }
+ synchronized (logRollerLock) {
+ this.logRoller.interrupt();
}
if (abortRequested) {
@@ -657,7 +873,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
"running at " + this.serverInfo.getServerAddress().toString() +
" because logdir " + logdir.toString() + " exists");
}
- return new HLog(fs, logdir, conf);
+ return new HLog(fs, logdir, conf, logRoller);
}
/*
@@ -680,16 +896,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
- Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher",
+ Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
+ handler);
+ Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
- Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread,
- n + ".splitOrCompactChecker", handler);
- Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller",
- handler);
- // Worker is not the same as the above threads in that it does not
- // inherit from Chore. Set an UncaughtExceptionHandler on it in case its
- // the one to see an OOME, etc., first. The handler will set the stop
- // flag.
+ Threads.setDaemonThreadRunning(this.compactor, n + ".compactor",
+ handler);
+ Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler);
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@@ -752,9 +965,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/
void join() {
join(this.workerThread);
- join(this.logRollerThread);
- join(this.cacheFlusherThread);
- join(this.splitOrCompactCheckerThread);
+ join(this.logRoller);
+ join(this.cacheFlusher);
+ join(this.compactor);
+ join(this.splitter);
}
private void join(final Thread t) {
@@ -925,7 +1139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
HRegion region = onlineRegions.get(regionInfo.getRegionName());
if(region == null) {
region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
- this.log, FileSystem.get(conf), conf, regionInfo, null);
+ this.log, FileSystem.get(conf), conf, regionInfo, null,
+ this.cacheFlusher);
this.lock.writeLock().lock();
try {
this.log.setSequenceNumber(region.getMinSequenceId());
@@ -1226,6 +1441,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
public AtomicInteger getRequestCount() {
return this.requestCount;
}
+
+ /** @return reference to CacheFlushListener */
+ public CacheFlushListener getCacheFlushListener() {
+ return this.cacheFlusher;
+ }
/**
* Protected utility method for safely obtaining an HRegion handle.
@@ -1318,8 +1538,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @return Returns list of non-closed regions hosted on this server. If no
* regions to check, returns an empty list.
*/
- protected List getRegionsToCheck() {
- ArrayList regionsToCheck = new ArrayList();
+ protected Set getRegionsToCheck() {
+ HashSet regionsToCheck = new HashSet();
//TODO: is this locking necessary?
lock.readLock().lock();
try {
@@ -1328,8 +1548,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
lock.readLock().unlock();
}
// Purge closed regions.
- for (final ListIterator i = regionsToCheck.listIterator();
- i.hasNext();) {
+ for (final Iterator i = regionsToCheck.iterator(); i.hasNext();) {
HRegion r = i.next();
if (r.isClosed()) {
i.remove();
diff --git a/src/java/org/apache/hadoop/hbase/LogRollListener.java b/src/java/org/apache/hadoop/hbase/LogRollListener.java
new file mode 100644
index 00000000000..fbec1c853f1
--- /dev/null
+++ b/src/java/org/apache/hadoop/hbase/LogRollListener.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Mechanism by which the HLog requests a log roll
+ */
+public interface LogRollListener {
+ /** Request that the log be rolled */
+ public void logRollRequested();
+}
diff --git a/src/java/org/apache/hadoop/hbase/util/Sleeper.java b/src/java/org/apache/hadoop/hbase/util/Sleeper.java
index 30c2b1cd8ab..01102685d6d 100644
--- a/src/java/org/apache/hadoop/hbase/util/Sleeper.java
+++ b/src/java/org/apache/hadoop/hbase/util/Sleeper.java
@@ -31,6 +31,10 @@ public class Sleeper {
private final int period;
private AtomicBoolean stop;
+ /**
+ * @param sleep
+ * @param stop
+ */
public Sleeper(final int sleep, final AtomicBoolean stop) {
this.period = sleep;
this.stop = stop;
@@ -40,7 +44,7 @@ public class Sleeper {
* Sleep for period.
*/
public void sleep() {
- sleep(System.currentTimeMillis());
+ sleep(period);
}
/**
diff --git a/src/test/hbase-site.xml b/src/test/hbase-site.xml
index 5a3ae42141b..9e104c64cce 100644
--- a/src/test/hbase-site.xml
+++ b/src/test/hbase-site.xml
@@ -103,8 +103,17 @@
the master will notice a dead region server sooner. The default is 15 seconds.