HBASE-1816 Master rewrite; should have removed safe-mode from regionserver-side too -- Needed to remove the wait up in top of flush and compactions threads-- used to wait on safe mode... was stuck there...

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@830844 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-10-29 04:42:15 +00:00
parent c7df216c11
commit b58a646039
4 changed files with 7 additions and 75 deletions

View File

@ -70,13 +70,6 @@ class CompactSplitThread extends Thread implements HConstants {
@Override @Override
public void run() { public void run() {
while (!this.server.isStopRequested()) {
try {
Thread.sleep(this.frequency);
} catch (InterruptedException ex) {
continue;
}
}
int count = 0; int count = 0;
while (!this.server.isStopRequested()) { while (!this.server.isStopRequested()) {
HRegion r = null; HRegion r = null;

View File

@ -203,9 +203,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
protected volatile HLog hlog; protected volatile HLog hlog;
LogRoller hlogRoller; LogRoller hlogRoller;
// limit compactions while starting up
CompactionLimitThread compactionLimitThread;
// flag set after we're done setting up server threads (used for testing) // flag set after we're done setting up server threads (used for testing)
protected volatile boolean isOnline; protected volatile boolean isOnline;
@ -858,48 +855,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
return this.fsOk; return this.fsOk;
} }
/**
* Thread that gradually ups compaction limit.
*/
private class CompactionLimitThread extends Thread {
protected CompactionLimitThread() {}
@Override
public void run() {
// Slowly increase per-cycle compaction limit, finally setting it to
// unlimited (-1)
int compactionCheckInterval =
conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
20 * 1000);
final int limitSteps[] = {
1, 1, 1, 1,
2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
-1
};
for (int i = 0; i < limitSteps.length; i++) {
// Just log changes.
if (compactSplitThread.getLimit() != limitSteps[i] &&
LOG.isDebugEnabled()) {
LOG.debug("setting compaction limit to " + limitSteps[i]);
}
compactSplitThread.setLimit(limitSteps[i]);
try {
Thread.sleep(compactionCheckInterval);
} catch (InterruptedException ex) {
// unlimit compactions before exiting
compactSplitThread.setLimit(-1);
if (LOG.isDebugEnabled()) {
LOG.debug(this.getName() + " exiting on interrupt");
}
return;
}
}
LOG.info("compactions no longer limited");
}
}
/* /*
* Thread to shutdown the region server in an orderly manner. This thread * Thread to shutdown the region server in an orderly manner. This thread
* is registered as a shutdown hook in the HRegionServer constructor and is * is registered as a shutdown hook in the HRegionServer constructor and is
@ -1130,10 +1085,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
} }
} }
this.compactionLimitThread = new CompactionLimitThread();
Threads.setDaemonThreadRunning(this.compactionLimitThread, n +
".compactionLimitThread", handler);
// Start Server. This service is like leases in that it internally runs // Start Server. This service is like leases in that it internally runs
// a thread. // a thread.
this.server.start(); this.server.start();
@ -2083,7 +2034,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
* @return true if a stop has been requested. * @return true if a stop has been requested.
*/ */
public boolean isStopRequested() { public boolean isStopRequested() {
return stopRequested.get(); return this.stopRequested.get();
} }
/** /**

View File

@ -133,16 +133,9 @@ class MemStoreFlusher extends Thread implements FlushRequester {
@Override @Override
public void run() { public void run() {
while (!this.server.isStopRequested()) { while (!this.server.isStopRequested()) {
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException ex) {
continue;
}
}
while (!server.isStopRequested()) {
HRegion r = null; HRegion r = null;
try { try {
r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); r = this.flushQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
if (r == null) { if (r == null) {
continue; continue;
} }
@ -162,8 +155,8 @@ class MemStoreFlusher extends Thread implements FlushRequester {
} }
} }
} }
regionsInQueue.clear(); this.regionsInQueue.clear();
flushQueue.clear(); this.flushQueue.clear();
LOG.info(getName() + " exiting"); LOG.info(getName() + " exiting");
} }

View File

@ -99,7 +99,6 @@ public class TestLogRolling extends HBaseClusterTestCase {
private void startAndWriteData() throws Exception { private void startAndWriteData() throws Exception {
// When the META table can be opened, the region servers are running // When the META table can be opened, the region servers are running
new HTable(conf, HConstants.META_TABLE_NAME); new HTable(conf, HConstants.META_TABLE_NAME);
this.server = cluster.getRegionThreads().get(0).getRegionServer(); this.server = cluster.getRegionThreads().get(0).getRegionServer();
this.log = server.getLog(); this.log = server.getLog();
@ -109,15 +108,12 @@ public class TestLogRolling extends HBaseClusterTestCase {
HBaseAdmin admin = new HBaseAdmin(conf); HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc); admin.createTable(desc);
HTable table = new HTable(conf, tableName); HTable table = new HTable(conf, tableName);
for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
put.add(HConstants.CATALOG_FAMILY, null, value); put.add(HConstants.CATALOG_FAMILY, null, value);
table.put(put); table.put(put);
if (i % 32 == 0) { if (i % 32 == 0) {
// After every 32 writes sleep to let the log roller run // After every 32 writes sleep to let the log roller run
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -133,7 +129,7 @@ public class TestLogRolling extends HBaseClusterTestCase {
* @throws Exception * @throws Exception
*/ */
public void testLogRolling() throws Exception { public void testLogRolling() throws Exception {
tableName = getName(); this.tableName = getName();
try { try {
startAndWriteData(); startAndWriteData();
LOG.info("after writing there are " + log.getNumLogFiles() + " log files"); LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
@ -158,5 +154,4 @@ public class TestLogRolling extends HBaseClusterTestCase {
throw e; throw e;
} }
} }
} }