From bf798cca50a330d9807671c62378245710887e4a Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Wed, 18 Jul 2007 02:26:03 +0000 Subject: [PATCH] HADOOP-1615 Replacing thread notification-based queue with java.util.concurrent.BlockingQueue in HMaster, HRegionServer git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@557118 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 +- src/java/org/apache/hadoop/hbase/HMaster.java | 63 ++++++++++--------- .../apache/hadoop/hbase/HRegionServer.java | 46 +++++++------- 3 files changed, 56 insertions(+), 56 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 69803a1a8e3..783bfe3536d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -65,4 +65,5 @@ Trunk (unreleased changes) 41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates 42. HADOOP-1468 Add HBase batch update to reduce RPC overhead 43. HADOOP-1616 Sporadic TestTable failures - + 44. HADOOP-1615 Replacing thread notification-based queue with + java.util.concurrent.BlockingQueue in HMaster, HRegionServer diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index 26478424324..3c0f441f912 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.Set; @@ -37,6 +36,9 @@ import java.util.TimerTask; import java.util.TreeMap; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +88,7 @@ public class HMaster implements HConstants, HMasterInterface, int numRetries; long maxRegionOpenTime; - LinkedList msgQueue; + BlockingQueue msgQueue; private Leases serverLeases; private Server server; @@ -636,7 +638,7 @@ public class HMaster implements HConstants, HMasterInterface, this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 2); this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); - this.msgQueue = new LinkedList(); + this.msgQueue = new LinkedBlockingQueue(); this.serverLeases = new Leases( conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); @@ -736,18 +738,13 @@ public class HMaster implements HConstants, HMasterInterface, // Main processing loop for (PendingOperation op = null; !closed; ) { - synchronized(msgQueue) { - while(msgQueue.size() == 0 && !closed) { - try { - msgQueue.wait(threadWakeFrequency); - } catch(InterruptedException iex) { - // continue - } - } - if(closed) { - continue; - } - op = msgQueue.removeFirst(); + try { + op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // continue + } + if (op == null || closed) { + continue; } try { if (LOG.isDebugEnabled()) { @@ -765,8 +762,10 @@ public class HMaster implements HConstants, HMasterInterface, } } LOG.warn(ex); - synchronized(msgQueue) { - msgQueue.addLast(op); + try { + msgQueue.put(op); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } } @@ -874,10 +873,11 @@ public class HMaster implements HConstants, HMasterInterface, // name, then we can timeout the old one right away and register // the new one. storedInfo = serversToServerInfo.remove(s); - if(storedInfo != null && !closed) { - synchronized(msgQueue) { - msgQueue.addLast(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); + if (storedInfo != null && !closed) { + try { + msgQueue.put(new PendingServerShutdown(storedInfo)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } @@ -1064,9 +1064,10 @@ public class HMaster implements HConstants, HMasterInterface, // Queue up an update to note the region location. - synchronized(msgQueue) { - msgQueue.addLast(new PendingOpenReport(info, region)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingOpenReport(info, region)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } break; @@ -1097,9 +1098,10 @@ public class HMaster implements HConstants, HMasterInterface, unassignedRegions.remove(region.regionName); assignAttempts.remove(region.regionName); - synchronized(msgQueue) { - msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } // NOTE: we cannot put the region into unassignedRegions as that @@ -2406,9 +2408,10 @@ public class HMaster implements HConstants, HMasterInterface, HGlobals.rootRegionInfo); assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); } - synchronized(msgQueue) { - msgQueue.addLast(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingServerShutdown(storedInfo)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } } diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index b9a394c09c9..8d8efaab228 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -24,12 +24,14 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -449,7 +451,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker); // Process requests from Master - this.toDo = new LinkedList(); + this.toDo = new LinkedBlockingQueue(); this.worker = new Worker(); this.workerThread = new Thread(worker); @@ -661,7 +663,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { if (LOG.isDebugEnabled()) { LOG.debug("Got default message"); } - toDo.addLast(new ToDoEntry(msgs[i])); + try { + toDo.put(new ToDoEntry(msgs[i])); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); + } } } @@ -828,7 +834,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.msg = msg; } } - LinkedList toDo; + BlockingQueue toDo; private Worker worker; private Thread workerThread; /** Thread that performs long running requests from the master */ @@ -844,26 +850,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public void run() { for(ToDoEntry e = null; !stopRequested; ) { - synchronized(toDo) { - while(toDo.size() == 0 && !stopRequested) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Wait on todo"); - } - toDo.wait(threadWakeFrequency); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake on todo"); - } - } catch(InterruptedException ex) { - // continue - } - } - if(stopRequested) { - continue; - } - e = toDo.removeFirst(); + try { + e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + // continue + } + if(e == null || stopRequested) { + continue; } - try { if (LOG.isDebugEnabled()) { LOG.debug(e.msg.toString()); @@ -900,8 +894,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { if(e.tries < numRetries) { LOG.warn(ie); e.tries++; - synchronized(toDo) { - toDo.addLast(e); + try { + toDo.put(e); + } catch (InterruptedException ex) { + throw new RuntimeException("Putting into msgQueue was interrupted.", ex); } } else { LOG.error("unable to process message: " + e.msg.toString(), ie);