HADOOP-1615 Replacing thread notification-based queue with java.util.concurrent.BlockingQueue in HMaster, HRegionServer

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@557118 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-07-18 02:26:03 +00:00
parent 6a64ae1542
commit bf798cca50
3 changed files with 56 additions and 56 deletions

View File

@ -65,4 +65,5 @@ Trunk (unreleased changes)
41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates 41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates
42. HADOOP-1468 Add HBase batch update to reduce RPC overhead 42. HADOOP-1468 Add HBase batch update to reduce RPC overhead
43. HADOOP-1616 Sporadic TestTable failures 43. HADOOP-1616 Sporadic TestTable failures
44. HADOOP-1615 Replacing thread notification-based queue with
java.util.concurrent.BlockingQueue in HMaster, HRegionServer

View File

@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -37,6 +36,9 @@ import java.util.TimerTask;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -86,7 +88,7 @@ public class HMaster implements HConstants, HMasterInterface,
int numRetries; int numRetries;
long maxRegionOpenTime; long maxRegionOpenTime;
LinkedList<PendingOperation> msgQueue; BlockingQueue<PendingOperation> msgQueue;
private Leases serverLeases; private Leases serverLeases;
private Server server; private Server server;
@ -636,7 +638,7 @@ public class HMaster implements HConstants, HMasterInterface,
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 2); this.numRetries = conf.getInt("hbase.client.retries.number", 2);
this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
this.msgQueue = new LinkedList<PendingOperation>(); this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
this.serverLeases = new Leases( this.serverLeases = new Leases(
conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.period", 30 * 1000),
conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
@ -736,18 +738,13 @@ public class HMaster implements HConstants, HMasterInterface,
// Main processing loop // Main processing loop
for (PendingOperation op = null; !closed; ) { for (PendingOperation op = null; !closed; ) {
synchronized(msgQueue) { try {
while(msgQueue.size() == 0 && !closed) { op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
try { } catch (InterruptedException e) {
msgQueue.wait(threadWakeFrequency); // continue
} catch(InterruptedException iex) { }
// continue if (op == null || closed) {
} continue;
}
if(closed) {
continue;
}
op = msgQueue.removeFirst();
} }
try { try {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -765,8 +762,10 @@ public class HMaster implements HConstants, HMasterInterface,
} }
} }
LOG.warn(ex); LOG.warn(ex);
synchronized(msgQueue) { try {
msgQueue.addLast(op); msgQueue.put(op);
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
} }
} }
} }
@ -874,10 +873,11 @@ public class HMaster implements HConstants, HMasterInterface,
// name, then we can timeout the old one right away and register // name, then we can timeout the old one right away and register
// the new one. // the new one.
storedInfo = serversToServerInfo.remove(s); storedInfo = serversToServerInfo.remove(s);
if(storedInfo != null && !closed) { if (storedInfo != null && !closed) {
synchronized(msgQueue) { try {
msgQueue.addLast(new PendingServerShutdown(storedInfo)); msgQueue.put(new PendingServerShutdown(storedInfo));
msgQueue.notifyAll(); } catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
} }
} }
@ -1064,9 +1064,10 @@ public class HMaster implements HConstants, HMasterInterface,
// Queue up an update to note the region location. // Queue up an update to note the region location.
synchronized(msgQueue) { try {
msgQueue.addLast(new PendingOpenReport(info, region)); msgQueue.put(new PendingOpenReport(info, region));
msgQueue.notifyAll(); } catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
} }
} }
break; break;
@ -1097,9 +1098,10 @@ public class HMaster implements HConstants, HMasterInterface,
unassignedRegions.remove(region.regionName); unassignedRegions.remove(region.regionName);
assignAttempts.remove(region.regionName); assignAttempts.remove(region.regionName);
synchronized(msgQueue) { try {
msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion)); msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion));
msgQueue.notifyAll(); } catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
} }
// NOTE: we cannot put the region into unassignedRegions as that // NOTE: we cannot put the region into unassignedRegions as that
@ -2406,9 +2408,10 @@ public class HMaster implements HConstants, HMasterInterface,
HGlobals.rootRegionInfo); HGlobals.rootRegionInfo);
assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
} }
synchronized(msgQueue) { try {
msgQueue.addLast(new PendingServerShutdown(storedInfo)); msgQueue.put(new PendingServerShutdown(storedInfo));
msgQueue.notifyAll(); } catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
} }
} }
} }

View File

@ -24,12 +24,14 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -449,7 +451,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker); this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker);
// Process requests from Master // Process requests from Master
this.toDo = new LinkedList<ToDoEntry>(); this.toDo = new LinkedBlockingQueue<ToDoEntry>();
this.worker = new Worker(); this.worker = new Worker();
this.workerThread = new Thread(worker); this.workerThread = new Thread(worker);
@ -661,7 +663,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Got default message"); LOG.debug("Got default message");
} }
toDo.addLast(new ToDoEntry(msgs[i])); try {
toDo.put(new ToDoEntry(msgs[i]));
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
} }
} }
@ -828,7 +834,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.msg = msg; this.msg = msg;
} }
} }
LinkedList<ToDoEntry> toDo; BlockingQueue<ToDoEntry> toDo;
private Worker worker; private Worker worker;
private Thread workerThread; private Thread workerThread;
/** Thread that performs long running requests from the master */ /** Thread that performs long running requests from the master */
@ -844,26 +850,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/ */
public void run() { public void run() {
for(ToDoEntry e = null; !stopRequested; ) { for(ToDoEntry e = null; !stopRequested; ) {
synchronized(toDo) { try {
while(toDo.size() == 0 && !stopRequested) { e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
try { } catch (InterruptedException ex) {
if (LOG.isDebugEnabled()) { // continue
LOG.debug("Wait on todo"); }
} if(e == null || stopRequested) {
toDo.wait(threadWakeFrequency); continue;
if (LOG.isDebugEnabled()) {
LOG.debug("Wake on todo");
}
} catch(InterruptedException ex) {
// continue
}
}
if(stopRequested) {
continue;
}
e = toDo.removeFirst();
} }
try { try {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(e.msg.toString()); LOG.debug(e.msg.toString());
@ -900,8 +894,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if(e.tries < numRetries) { if(e.tries < numRetries) {
LOG.warn(ie); LOG.warn(ie);
e.tries++; e.tries++;
synchronized(toDo) { try {
toDo.addLast(e); toDo.put(e);
} catch (InterruptedException ex) {
throw new RuntimeException("Putting into msgQueue was interrupted.", ex);
} }
} else { } else {
LOG.error("unable to process message: " + e.msg.toString(), ie); LOG.error("unable to process message: " + e.msg.toString(), ie);