HADOOP-1397. Replace custom hbase locking with java.util.concurrent.locks.ReentrantLock

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@540424 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-05-22 05:30:07 +00:00
parent 356f3919e9
commit 47bf7fd8bb
4 changed files with 76 additions and 74 deletions

View File

@ -6,3 +6,5 @@ Trunk (unreleased changes)
1. HADOOP-1384. HBase omnibus patch. (jimk, Vuk Ercegovac, and Michael Stack) 1. HADOOP-1384. HBase omnibus patch. (jimk, Vuk Ercegovac, and Michael Stack)
2. HADOOP-1402. Fix javadoc warnings in hbase contrib. (Michael Stack) 2. HADOOP-1402. Fix javadoc warnings in hbase contrib. (Michael Stack)
3. HADOOP-1404. HBase command-line shutdown failing (Michael Stack) 3. HADOOP-1404. HBase command-line shutdown failing (Michael Stack)
4. HADOOP-1397. Replace custom hbase locking with
java.util.concurrent.locks.ReentrantLock (Michael Stack)

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.*;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* HRegion stores data for a certain region of a table. It stores all columns * HRegion stores data for a certain region of a table. It stores all columns
@ -284,7 +285,7 @@ public class HRegion implements HConstants {
int maxUnflushedEntries = 0; int maxUnflushedEntries = 0;
int compactionThreshold = 0; int compactionThreshold = 0;
HLocking lock = null; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
@ -322,8 +323,6 @@ public class HRegion implements HConstants {
this.writestate.writesOngoing = true; this.writestate.writesOngoing = true;
this.writestate.writesEnabled = true; this.writestate.writesEnabled = true;
this.writestate.closed = false; this.writestate.closed = false;
this.lock = new HLocking();
// Declare the regionName. This is a unique string for the region, used to // Declare the regionName. This is a unique string for the region, used to
// build a unique filename. // build a unique filename.
@ -401,7 +400,7 @@ public class HRegion implements HConstants {
* time-sensitive thread. * time-sensitive thread.
*/ */
public Vector<HStoreFile> close() throws IOException { public Vector<HStoreFile> close() throws IOException {
lock.obtainWriteLock(); lock.writeLock().lock();
try { try {
boolean shouldClose = false; boolean shouldClose = false;
synchronized(writestate) { synchronized(writestate) {
@ -441,7 +440,7 @@ public class HRegion implements HConstants {
} }
} }
} finally { } finally {
lock.releaseWriteLock(); lock.writeLock().unlock();
} }
} }
@ -617,7 +616,7 @@ public class HRegion implements HConstants {
* @return - true if the region should be split * @return - true if the region should be split
*/ */
public boolean needsSplit(Text midKey) { public boolean needsSplit(Text midKey) {
lock.obtainReadLock(); lock.readLock().lock();
try { try {
Text key = new Text(); Text key = new Text();
@ -635,7 +634,7 @@ public class HRegion implements HConstants {
return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2))); return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
} }
@ -644,7 +643,7 @@ public class HRegion implements HConstants {
*/ */
public boolean needsCompaction() { public boolean needsCompaction() {
boolean needsCompaction = false; boolean needsCompaction = false;
lock.obtainReadLock(); lock.readLock().lock();
try { try {
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) { for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
if(i.next().getNMaps() > compactionThreshold) { if(i.next().getNMaps() > compactionThreshold) {
@ -653,7 +652,7 @@ public class HRegion implements HConstants {
} }
} }
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
return needsCompaction; return needsCompaction;
} }
@ -673,7 +672,7 @@ public class HRegion implements HConstants {
*/ */
public boolean compactStores() throws IOException { public boolean compactStores() throws IOException {
boolean shouldCompact = false; boolean shouldCompact = false;
lock.obtainReadLock(); lock.readLock().lock();
try { try {
synchronized(writestate) { synchronized(writestate) {
if((! writestate.writesOngoing) if((! writestate.writesOngoing)
@ -686,7 +685,7 @@ public class HRegion implements HConstants {
} }
} }
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
if(! shouldCompact) { if(! shouldCompact) {
@ -694,7 +693,7 @@ public class HRegion implements HConstants {
return false; return false;
} else { } else {
lock.obtainWriteLock(); lock.writeLock().lock();
try { try {
LOG.info("starting compaction on region " + this.regionInfo.regionName); LOG.info("starting compaction on region " + this.regionInfo.regionName);
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) { for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
@ -710,7 +709,7 @@ public class HRegion implements HConstants {
recentCommits = 0; recentCommits = 0;
writestate.notifyAll(); writestate.notifyAll();
} }
lock.releaseWriteLock(); lock.writeLock().unlock();
} }
} }
} }
@ -931,7 +930,7 @@ public class HRegion implements HConstants {
private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException { private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
lock.obtainReadLock(); lock.readLock().lock();
try { try {
// Check the memcache // Check the memcache
@ -951,7 +950,7 @@ public class HRegion implements HConstants {
return targetStore.get(key, numVersions); return targetStore.get(key, numVersions);
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
} }
@ -968,7 +967,7 @@ public class HRegion implements HConstants {
public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException { public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
HStoreKey key = new HStoreKey(row, System.currentTimeMillis()); HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
lock.obtainReadLock(); lock.readLock().lock();
try { try {
TreeMap<Text, BytesWritable> memResult = memcache.getFull(key); TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) { for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
@ -979,7 +978,7 @@ public class HRegion implements HConstants {
return memResult; return memResult;
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
} }
@ -988,7 +987,7 @@ public class HRegion implements HConstants {
* columns. This Iterator must be closed by the caller. * columns. This Iterator must be closed by the caller.
*/ */
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException { public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
lock.obtainReadLock(); lock.readLock().lock();
try { try {
TreeSet<Text> families = new TreeSet<Text>(); TreeSet<Text> families = new TreeSet<Text>();
for(int i = 0; i < cols.length; i++) { for(int i = 0; i < cols.length; i++) {
@ -1004,7 +1003,7 @@ public class HRegion implements HConstants {
return new HScanner(cols, firstRow, memcache, storelist); return new HScanner(cols, firstRow, memcache, storelist);
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
} }
@ -1027,12 +1026,11 @@ public class HRegion implements HConstants {
// We obtain a per-row lock, so other clients will // We obtain a per-row lock, so other clients will
// block while one client performs an update. // block while one client performs an update.
lock.obtainReadLock(); lock.readLock().lock();
try { try {
return obtainLock(row); return obtainLock(row);
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.*;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/******************************************************************************* /*******************************************************************************
* HRegionServer makes a set of HRegions available to clients. It checks in with * HRegionServer makes a set of HRegions available to clients. It checks in with
@ -50,7 +51,7 @@ public class HRegionServer
private Configuration conf; private Configuration conf;
private Random rand; private Random rand;
private TreeMap<Text, HRegion> regions; // region name -> HRegion private TreeMap<Text, HRegion> regions; // region name -> HRegion
private HLocking lock; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Vector<HMsg> outboundMsgs; private Vector<HMsg> outboundMsgs;
private long threadWakeFrequency; private long threadWakeFrequency;
@ -71,9 +72,12 @@ public class HRegionServer
* @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text) * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
*/ */
public void regionIsUnavailable(Text regionName) { public void regionIsUnavailable(Text regionName) {
lock.obtainWriteLock(); lock.writeLock().lock();
regions.remove(regionName); try {
lock.releaseWriteLock(); regions.remove(regionName);
} finally {
lock.writeLock().unlock();
}
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -88,11 +92,11 @@ public class HRegionServer
// Grab a list of regions to check // Grab a list of regions to check
Vector<HRegion> regionsToCheck = new Vector<HRegion>(); Vector<HRegion> regionsToCheck = new Vector<HRegion>();
lock.obtainReadLock(); lock.readLock().lock();
try { try {
regionsToCheck.addAll(regions.values()); regionsToCheck.addAll(regions.values());
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
try { try {
@ -163,10 +167,13 @@ public class HRegionServer
// Finally, start serving the new regions // Finally, start serving the new regions
lock.obtainWriteLock(); lock.writeLock().lock();
regions.put(newRegions[0].getRegionName(), newRegions[0]); try {
regions.put(newRegions[1].getRegionName(), newRegions[1]); regions.put(newRegions[0].getRegionName(), newRegions[0]);
lock.releaseWriteLock(); regions.put(newRegions[1].getRegionName(), newRegions[1]);
} finally {
lock.writeLock().unlock();
}
} }
} }
} }
@ -214,12 +221,11 @@ public class HRegionServer
// Grab a list of items to flush // Grab a list of items to flush
Vector<HRegion> toFlush = new Vector<HRegion>(); Vector<HRegion> toFlush = new Vector<HRegion>();
lock.obtainReadLock(); lock.readLock().lock();
try { try {
toFlush.addAll(regions.values()); toFlush.addAll(regions.values());
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
} }
// Flush them, if necessary // Flush them, if necessary
@ -340,7 +346,6 @@ public class HRegionServer
this.conf = conf; this.conf = conf;
this.rand = new Random(); this.rand = new Random();
this.regions = new TreeMap<Text, HRegion>(); this.regions = new TreeMap<Text, HRegion>();
this.lock = new HLocking();
this.outboundMsgs = new Vector<HMsg>(); this.outboundMsgs = new Vector<HMsg>();
this.scanners = this.scanners =
Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>()); Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>());
@ -752,27 +757,26 @@ public class HRegionServer
} }
private void openRegion(HRegionInfo regionInfo) throws IOException { private void openRegion(HRegionInfo regionInfo) throws IOException {
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
try { try {
HRegion region = HRegion region =
new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
regions.put(region.getRegionName(), region); regions.put(region.getRegionName(), region);
reportOpen(region); reportOpen(region);
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
} }
private void closeRegion(HRegionInfo info, boolean reportWhenCompleted) private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
throws IOException { throws IOException {
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
HRegion region = null; HRegion region = null;
try { try {
region = regions.remove(info.regionName); region = regions.remove(info.regionName);
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
if(region != null) { if(region != null) {
@ -785,13 +789,12 @@ public class HRegionServer
} }
private void closeAndDeleteRegion(HRegionInfo info) throws IOException { private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
HRegion region = null; HRegion region = null;
try { try {
region = regions.remove(info.regionName); region = regions.remove(info.regionName);
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
if(region != null) { if(region != null) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -809,13 +812,12 @@ public class HRegionServer
/** Called either when the master tells us to restart or from stop() */ /** Called either when the master tells us to restart or from stop() */
private void closeAllRegions() { private void closeAllRegions() {
Vector<HRegion> regionsToClose = new Vector<HRegion>(); Vector<HRegion> regionsToClose = new Vector<HRegion>();
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
try { try {
regionsToClose.addAll(regions.values()); regionsToClose.addAll(regions.values());
regions.clear(); regions.clear();
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) { for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) {
HRegion region = it.next(); HRegion region = it.next();
@ -842,7 +844,7 @@ public class HRegionServer
****************************************************************************/ ****************************************************************************/
/* /*
private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException { private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
locking.obtainWriteLock(); locking.writeLock().lock();
try { try {
HRegion srcA = regions.remove(regionNameA); HRegion srcA = regions.remove(regionNameA);
HRegion srcB = regions.remove(regionNameB); HRegion srcB = regions.remove(regionNameB);
@ -854,7 +856,7 @@ public class HRegionServer
reportOpen(newRegion); reportOpen(newRegion);
} finally { } finally {
locking.releaseWriteLock(); locking.writeLock().unlock();
} }
} }
*/ */
@ -1016,13 +1018,12 @@ public class HRegionServer
/** Private utility method for safely obtaining an HRegion handle. */ /** Private utility method for safely obtaining an HRegion handle. */
private HRegion getRegion(Text regionName) throws NotServingRegionException { private HRegion getRegion(Text regionName) throws NotServingRegionException {
this.lock.obtainReadLock(); this.lock.readLock().lock();
HRegion region = null; HRegion region = null;
try { try {
region = regions.get(regionName); region = regions.get(regionName);
} finally { } finally {
this.lock.releaseReadLock(); this.lock.readLock().unlock();
} }
if(region == null) { if(region == null) {

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -63,7 +64,7 @@ public class HStore {
Integer compactLock = new Integer(0); Integer compactLock = new Integer(0);
Integer flushLock = new Integer(0); Integer flushLock = new Integer(0);
HLocking lock = new HLocking(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>(); TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>(); TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
@ -238,7 +239,7 @@ public class HStore {
/** Turn off all the MapFile readers */ /** Turn off all the MapFile readers */
public void close() throws IOException { public void close() throws IOException {
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily); LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
try { try {
@ -252,7 +253,7 @@ public class HStore {
LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily); LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
} }
@ -324,7 +325,7 @@ public class HStore {
// C. Finally, make the new MapFile available. // C. Finally, make the new MapFile available.
if(addToAvailableMaps) { if(addToAvailableMaps) {
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
try { try {
maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf)); maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
@ -335,7 +336,7 @@ public class HStore {
} }
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
} }
return getAllMapFiles(); return getAllMapFiles();
@ -343,12 +344,12 @@ public class HStore {
} }
public Vector<HStoreFile> getAllMapFiles() { public Vector<HStoreFile> getAllMapFiles() {
this.lock.obtainReadLock(); this.lock.readLock().lock();
try { try {
return new Vector<HStoreFile>(mapFiles.values()); return new Vector<HStoreFile>(mapFiles.values());
} finally { } finally {
this.lock.releaseReadLock(); this.lock.readLock().unlock();
} }
} }
@ -390,12 +391,12 @@ public class HStore {
// Grab a list of files to compact. // Grab a list of files to compact.
Vector<HStoreFile> toCompactFiles = null; Vector<HStoreFile> toCompactFiles = null;
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
try { try {
toCompactFiles = new Vector<HStoreFile>(mapFiles.values()); toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
} finally { } finally {
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
@ -630,7 +631,7 @@ public class HStore {
// 1. Acquiring the write-lock // 1. Acquiring the write-lock
this.lock.obtainWriteLock(); this.lock.writeLock().lock();
Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
try { try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE); Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
@ -748,7 +749,7 @@ public class HStore {
// 7. Releasing the write-lock // 7. Releasing the write-lock
this.lock.releaseWriteLock(); this.lock.writeLock().unlock();
} }
} }
@ -764,7 +765,7 @@ public class HStore {
* The returned object should map column names to byte arrays (byte[]). * The returned object should map column names to byte arrays (byte[]).
*/ */
public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException { public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
this.lock.obtainReadLock(); this.lock.readLock().lock();
try { try {
MapFile.Reader[] maparray MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]); = maps.values().toArray(new MapFile.Reader[maps.size()]);
@ -793,7 +794,7 @@ public class HStore {
} }
} finally { } finally {
this.lock.releaseReadLock(); this.lock.readLock().unlock();
} }
} }
@ -809,7 +810,7 @@ public class HStore {
} }
Vector<BytesWritable> results = new Vector<BytesWritable>(); Vector<BytesWritable> results = new Vector<BytesWritable>();
this.lock.obtainReadLock(); this.lock.readLock().lock();
try { try {
MapFile.Reader[] maparray MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]); = maps.values().toArray(new MapFile.Reader[maps.size()]);
@ -850,7 +851,7 @@ public class HStore {
} }
} finally { } finally {
this.lock.releaseReadLock(); this.lock.readLock().unlock();
} }
} }
@ -866,7 +867,7 @@ public class HStore {
return maxSize; return maxSize;
} }
this.lock.obtainReadLock(); this.lock.readLock().lock();
try { try {
long mapIndex = 0L; long mapIndex = 0L;
@ -893,7 +894,7 @@ public class HStore {
LOG.warn(e); LOG.warn(e);
} finally { } finally {
this.lock.releaseReadLock(); this.lock.readLock().unlock();
} }
return maxSize; return maxSize;
} }
@ -902,12 +903,12 @@ public class HStore {
* @return Returns the number of map files currently in use * @return Returns the number of map files currently in use
*/ */
public int getNMaps() { public int getNMaps() {
this.lock.obtainReadLock(); this.lock.readLock().lock();
try { try {
return maps.size(); return maps.size();
} finally { } finally {
this.lock.releaseReadLock(); this.lock.readLock().unlock();
} }
} }
@ -949,7 +950,7 @@ public class HStore {
super(timestamp, targetCols); super(timestamp, targetCols);
lock.obtainReadLock(); lock.readLock().lock();
try { try {
this.readers = new MapFile.Reader[mapFiles.size()]; this.readers = new MapFile.Reader[mapFiles.size()];
@ -1064,7 +1065,7 @@ public class HStore {
} }
} finally { } finally {
lock.releaseReadLock(); lock.readLock().unlock();
scannerClosed = true; scannerClosed = true;
} }
} }