HBASE-6466 Revert, TestLogRolling failed twice on trunk build

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1437274 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-01-23 05:28:51 +00:00
parent 538798fc95
commit 29a226e189
4 changed files with 101 additions and 156 deletions

View File

@ -208,30 +208,16 @@ public class Threads {
} }
/** /**
* Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)}, * Get a named {@link ThreadFactory} that just builds daemon threads
* without setting the exception handler. * @param prefix name prefix for all threads created from the factory
* @return a thread factory that creates named, daemon threads
*/ */
public static ThreadFactory newDaemonThreadFactory(final String prefix) { public static ThreadFactory newDaemonThreadFactory(final String prefix) {
return newDaemonThreadFactory(prefix, null);
}
/**
* Get a named {@link ThreadFactory} that just builds daemon threads.
* @param prefix name prefix for all threads created from the factory
* @param handler unhandles exception handler to set for all threads
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
public static ThreadFactory newDaemonThreadFactory(final String prefix,
final UncaughtExceptionHandler handler) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix); final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() { return new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread t = namedFactory.newThread(r); Thread t = namedFactory.newThread(r);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
}
if (!t.isDaemon()) { if (!t.isDaemon()) {
t.setDaemon(true); t.setDaemon(true);
} }

View File

@ -1531,7 +1531,8 @@ public class HRegionServer implements ClientProtocol,
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
uncaughtExceptionHandler); uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
".compactionChecker", uncaughtExceptionHandler); ".compactionChecker", uncaughtExceptionHandler);
if (this.healthCheckChore != null) { if (this.healthCheckChore != null) {
@ -1789,7 +1790,7 @@ public class HRegionServer implements ClientProtocol,
*/ */
protected void join() { protected void join() {
Threads.shutdown(this.compactionChecker.getThread()); Threads.shutdown(this.compactionChecker.getThread());
this.cacheFlusher.join(); Threads.shutdown(this.cacheFlusher.getThread());
if (this.healthCheckChore != null) { if (this.healthCheckChore != null) {
Threads.shutdown(this.healthCheckChore.getThread()); Threads.shutdown(this.healthCheckChore.getThread());
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
@ -30,10 +29,10 @@ import java.util.SortedMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter; import org.cliffc.high_scale_lib.Counter;
@ -61,7 +59,7 @@ import com.google.common.base.Preconditions;
* @see FlushRequester * @see FlushRequester
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class MemStoreFlusher implements FlushRequester { class MemStoreFlusher extends HasThread implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
// These two data members go together. Any entry in the one must have // These two data members go together. Any entry in the one must have
// a corresponding entry in the other. // a corresponding entry in the other.
@ -73,8 +71,8 @@ class MemStoreFlusher implements FlushRequester {
private final long threadWakeFrequency; private final long threadWakeFrequency;
private final HRegionServer server; private final HRegionServer server;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantLock lock = new ReentrantLock();
private final Object blockSignal = new Object(); private final Condition flushOccurred = lock.newCondition();
protected final long globalMemStoreLimit; protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark; protected final long globalMemStoreLimitLowMark;
@ -89,9 +87,6 @@ class MemStoreFlusher implements FlushRequester {
private long blockingWaitTime; private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter(); private final Counter updatesBlockedMsHighWater = new Counter();
private FlushHandler[] flushHandlers = null;
private int handlerCount;
/** /**
* @param conf * @param conf
* @param server * @param server
@ -116,7 +111,6 @@ class MemStoreFlusher implements FlushRequester {
conf.getInt("hbase.hstore.blockingStoreFiles", 7); conf.getInt("hbase.hstore.blockingStoreFiles", 7);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000); 90000);
this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
LOG.info("globalMemStoreLimit=" + LOG.info("globalMemStoreLimit=" +
StringUtils.humanReadableInt(this.globalMemStoreLimit) + StringUtils.humanReadableInt(this.globalMemStoreLimit) +
", globalMemStoreLimitLowMark=" + ", globalMemStoreLimitLowMark=" +
@ -219,33 +213,37 @@ class MemStoreFlusher implements FlushRequester {
return true; return true;
} }
private class FlushHandler extends HasThread {
@Override @Override
public void run() { public void run() {
while (!server.isStopped()) { while (!this.server.isStopped()) {
FlushQueueEntry fqe = null; FlushQueueEntry fqe = null;
try { try {
wakeupPending.set(false); // allow someone to wake us up again wakeupPending.set(false); // allow someone to wake us up again
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null || fqe instanceof WakeupFlushThread) { if (fqe == null || fqe instanceof WakeupFlushThread) {
if (isAboveLowWaterMark()) { if (isAboveLowWaterMark()) {
LOG.debug("Flush thread woke up because memory above low water=" LOG.debug("Flush thread woke up because memory above low water=" +
+ StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
if (!flushOneForGlobalPressure()) { if (!flushOneForGlobalPressure()) {
// Wasn't able to flush any region, but we're above low water mark // Wasn't able to flush any region, but we're above low water mark
// This is unlikely to happen, but might happen when closing the // This is unlikely to happen, but might happen when closing the
// entire server - another thread is flushing regions. We'll just // entire server - another thread is flushing regions. We'll just
// sleep a little bit to avoid spinning, and then pretend that // sleep a little bit to avoid spinning, and then pretend that
// we flushed one, so anyone blocked will check again // we flushed one, so anyone blocked will check again
lock.lock();
try {
Thread.sleep(1000); Thread.sleep(1000);
wakeUpIfBlocking(); flushOccurred.signalAll();
} finally {
lock.unlock();
}
} }
// Enqueue another one of these tokens so we'll wake up again // Enqueue another one of these tokens so we'll wake up again
wakeupFlushThread(); wakeupFlushThread();
} }
continue; continue;
} }
FlushRegionEntry fre = (FlushRegionEntry) fqe; FlushRegionEntry fre = (FlushRegionEntry)fqe;
if (!flushRegion(fre)) { if (!flushRegion(fre)) {
break; break;
} }
@ -260,17 +258,18 @@ class MemStoreFlusher implements FlushRequester {
} }
} }
} }
synchronized (regionsInQueue) { this.regionsInQueue.clear();
regionsInQueue.clear(); this.flushQueue.clear();
flushQueue.clear();
}
// Signal anyone waiting, so they see the close flag // Signal anyone waiting, so they see the close flag
wakeUpIfBlocking(); lock.lock();
try {
flushOccurred.signalAll();
} finally {
lock.unlock();
}
LOG.info(getName() + " exiting"); LOG.info(getName() + " exiting");
} }
}
private void wakeupFlushThread() { private void wakeupFlushThread() {
if (wakeupPending.compareAndSet(false, true)) { if (wakeupPending.compareAndSet(false, true)) {
@ -288,10 +287,6 @@ class MemStoreFlusher implements FlushRequester {
continue; continue;
} }
if (region.writestate.flushing || !region.writestate.writesEnabled) {
continue;
}
if (checkStoreFileCount && isTooManyStoreFiles(region)) { if (checkStoreFileCount && isTooManyStoreFiles(region)) {
continue; continue;
} }
@ -337,41 +332,11 @@ class MemStoreFlusher implements FlushRequester {
* Only interrupt once it's done with a run through the work loop. * Only interrupt once it's done with a run through the work loop.
*/ */
void interruptIfNecessary() { void interruptIfNecessary() {
lock.writeLock().lock(); lock.lock();
try { try {
for (FlushHandler flushHander : flushHandlers) { this.interrupt();
if (flushHander != null) flushHander.interrupt();
}
} finally { } finally {
lock.writeLock().unlock(); lock.unlock();
}
}
synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
server.getServerName().toString() + "-MemStoreFlusher", eh);
flushHandlers = new FlushHandler[handlerCount];
for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler();
flusherThreadFactory.newThread(flushHandlers[i]);
flushHandlers[i].start();
}
}
boolean isAlive() {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null && flushHander.isAlive()) {
return true;
}
}
return false;
}
void join() {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null) {
Threads.shutdown(flushHander.getThread());
}
} }
} }
@ -400,8 +365,7 @@ class MemStoreFlusher implements FlushRequester {
"store files; delaying flush up to " + this.blockingWaitTime + "ms"); "store files; delaying flush up to " + this.blockingWaitTime + "ms");
if (!this.server.compactSplitThread.requestSplit(region)) { if (!this.server.compactSplitThread.requestSplit(region)) {
try { try {
this.server.compactSplitThread.requestCompaction(region, Thread this.server.compactSplitThread.requestCompaction(region, getName());
.currentThread().getName());
} catch (IOException e) { } catch (IOException e) {
LOG.error( LOG.error(
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@ -440,8 +404,8 @@ class MemStoreFlusher implements FlushRequester {
// emergencyFlush, then item was removed via a flushQueue.poll. // emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe); flushQueue.remove(fqe);
} }
lock.lock();
} }
lock.readLock().lock();
try { try {
boolean shouldCompact = region.flushcache(); boolean shouldCompact = region.flushcache();
// We just want to check the size // We just want to check the size
@ -449,7 +413,7 @@ class MemStoreFlusher implements FlushRequester {
if (shouldSplit) { if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region); this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) { } else if (shouldCompact) {
server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); server.compactSplitThread.requestCompaction(region, getName());
} }
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
@ -468,18 +432,15 @@ class MemStoreFlusher implements FlushRequester {
return false; return false;
} }
} finally { } finally {
lock.readLock().unlock(); try {
wakeUpIfBlocking(); flushOccurred.signalAll();
} finally {
lock.unlock();
}
} }
return true; return true;
} }
private void wakeUpIfBlocking() {
synchronized (blockSignal) {
blockSignal.notifyAll();
}
}
private boolean isTooManyStoreFiles(HRegion region) { private boolean isTooManyStoreFiles(HRegion region) {
for (Store hstore : region.stores.values()) { for (Store hstore : region.stores.values()) {
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@ -497,12 +458,12 @@ class MemStoreFlusher implements FlushRequester {
*/ */
public void reclaimMemStoreMemory() { public void reclaimMemStoreMemory() {
if (isAboveHighWaterMark()) { if (isAboveHighWaterMark()) {
long start = System.currentTimeMillis(); lock.lock();
synchronized (this.blockSignal) { try {
boolean blocked = false; boolean blocked = false;
long startTime = 0; long startTime = 0;
while (isAboveHighWaterMark() && !server.isStopped()) { while (isAboveHighWaterMark() && !server.isStopped()) {
if (!blocked) { if(!blocked){
startTime = EnvironmentEdgeManager.currentTimeMillis(); startTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Blocking updates on " + server.toString() + LOG.info("Blocking updates on " + server.toString() +
": the global memstore size " + ": the global memstore size " +
@ -515,12 +476,10 @@ class MemStoreFlusher implements FlushRequester {
try { try {
// we should be able to wait forever, but we've seen a bug where // we should be able to wait forever, but we've seen a bug where
// we miss a notify, so put a 5 second bound on it at least. // we miss a notify, so put a 5 second bound on it at least.
blockSignal.wait(5 * 1000); flushOccurred.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
long took = System.currentTimeMillis() - start;
LOG.warn("Memstore is above high water mark and block " + took + "ms");
} }
if(blocked){ if(blocked){
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@ -529,6 +488,8 @@ class MemStoreFlusher implements FlushRequester {
} }
LOG.info("Unblocking updates for server " + server.toString()); LOG.info("Unblocking updates for server " + server.toString());
} }
} finally {
lock.unlock();
} }
} else if (isAboveLowWaterMark()) { } else if (isAboveLowWaterMark()) {
wakeupFlushThread(); wakeupFlushThread();

View File

@ -42,9 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;