HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix regression introduced by HADOOP-1397.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@541095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-05-23 21:30:25 +00:00
parent 19099c98b1
commit 23f836454d
5 changed files with 180 additions and 78 deletions

View File

@ -14,3 +14,5 @@ Trunk (unreleased changes)
'Performance Evaluation', etc.
7. HADOOP-1420, HADOOP-1423. Findbugs changes, remove reference to removed
class HLocking.
8. HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix
regression introduced by HADOOP-1397.

View File

@ -0,0 +1,101 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.concurrent.atomic.AtomicInteger;
/**
* HLocking is a set of lock primitives that does not rely on a
* particular thread holding the monitor for an object. This is
* especially important when a lock must persist over multiple RPC's
* since there is no guarantee that the same Server thread will handle
* all the RPC's until the lock is released. Not requiring that the locker
* thread is same as unlocking thread is the key distinction between this
* class and {@link java.util.concurrent.locks.ReentrantReadWriteLock}.
*
* <p>For each independent entity that needs locking, create a new HLocking
* instance.
*/
public class HLocking {
private Integer mutex;
// If lockers == 0, the lock is unlocked
// If lockers > 0, locked for read
// If lockers == -1 locked for write
private AtomicInteger lockers;
/** Constructor */
public HLocking() {
this.mutex = new Integer(0);
this.lockers = new AtomicInteger(0);
}
/**
* Caller needs the nonexclusive read-lock
*/
public void obtainReadLock() {
synchronized(mutex) {
while(lockers.get() < 0) {
try {
mutex.wait();
} catch(InterruptedException ie) {
}
}
lockers.incrementAndGet();
mutex.notifyAll();
}
}
/**
* Caller is finished with the nonexclusive read-lock
*/
public void releaseReadLock() {
synchronized(mutex) {
if(lockers.decrementAndGet() < 0) {
throw new IllegalStateException("lockers: " + lockers);
}
mutex.notifyAll();
}
}
/**
* Caller needs the exclusive write-lock
*/
public void obtainWriteLock() {
synchronized(mutex) {
while(!lockers.compareAndSet(0, -1)) {
try {
mutex.wait();
} catch (InterruptedException ie) {
}
}
mutex.notifyAll();
}
}
/**
* Caller is finished with the write lock
*/
public void releaseWriteLock() {
synchronized(mutex) {
if(!lockers.compareAndSet(-1, 0)) {
throw new IllegalStateException("lockers: " + lockers);
}
mutex.notifyAll();
}
}
}

View File

@ -15,14 +15,17 @@
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.io.*;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.*;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/*******************************************************************************
* The HMemcache holds in-memory modifications to the HRegion. This is really a
@ -39,7 +42,7 @@ public class HMemcache {
TreeMap<HStoreKey, BytesWritable> snapshot = null;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final HLocking lock = new HLocking();
public HMemcache() {
super();
@ -70,7 +73,7 @@ public class HMemcache {
public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
if(snapshot != null) {
throw new IOException("Snapshot in progress!");
@ -99,7 +102,7 @@ public class HMemcache {
return retval;
} finally {
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
}
@ -109,7 +112,7 @@ public class HMemcache {
* Modifying the structure means we need to obtain a writelock.
*/
public void deleteSnapshot() throws IOException {
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
if(snapshot == null) {
@ -135,7 +138,7 @@ public class HMemcache {
}
} finally {
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
}
@ -145,14 +148,14 @@ public class HMemcache {
* Operation uses a write lock.
*/
public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
memcache.put(key, es.getValue());
}
} finally {
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
}
@ -163,7 +166,7 @@ public class HMemcache {
*/
public BytesWritable[] get(HStoreKey key, int numVersions) {
Vector<BytesWritable> results = new Vector<BytesWritable>();
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
Vector<BytesWritable> result = get(memcache, key, numVersions-results.size());
results.addAll(0, result);
@ -180,7 +183,7 @@ public class HMemcache {
return (results.size() == 0)?
null: results.toArray(new BytesWritable[results.size()]);
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
}
@ -192,7 +195,7 @@ public class HMemcache {
*/
public TreeMap<Text, BytesWritable> getFull(HStoreKey key) {
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
internalGetFull(memcache, key, results);
for(int i = history.size()-1; i >= 0; i--) {
@ -202,7 +205,7 @@ public class HMemcache {
return results;
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
}
@ -275,7 +278,7 @@ public class HMemcache {
super(timestamp, targetCols);
lock.readLock().lock();
lock.obtainReadLock();
try {
this.backingMaps = new TreeMap[history.size() + 1];
@ -367,7 +370,7 @@ public class HMemcache {
}
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
scannerClosed = true;
}
}

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.conf.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@ -283,7 +282,7 @@ public class HRegion implements HConstants {
int maxUnflushedEntries = 0;
int compactionThreshold = 0;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final HLocking lock = new HLocking();
//////////////////////////////////////////////////////////////////////////////
// Constructor
@ -398,7 +397,7 @@ public class HRegion implements HConstants {
* time-sensitive thread.
*/
public Vector<HStoreFile> close() throws IOException {
lock.writeLock().lock();
lock.obtainWriteLock();
try {
boolean shouldClose = false;
synchronized(writestate) {
@ -438,7 +437,7 @@ public class HRegion implements HConstants {
}
}
} finally {
lock.writeLock().unlock();
lock.releaseWriteLock();
}
}
@ -614,7 +613,7 @@ public class HRegion implements HConstants {
* @return - true if the region should be split
*/
public boolean needsSplit(Text midKey) {
lock.readLock().lock();
lock.obtainReadLock();
try {
Text key = new Text();
@ -632,7 +631,7 @@ public class HRegion implements HConstants {
return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
}
@ -641,7 +640,7 @@ public class HRegion implements HConstants {
*/
public boolean needsCompaction() {
boolean needsCompaction = false;
lock.readLock().lock();
lock.obtainReadLock();
try {
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
if(i.next().getNMaps() > compactionThreshold) {
@ -650,7 +649,7 @@ public class HRegion implements HConstants {
}
}
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
return needsCompaction;
}
@ -670,7 +669,7 @@ public class HRegion implements HConstants {
*/
public boolean compactStores() throws IOException {
boolean shouldCompact = false;
lock.readLock().lock();
lock.obtainReadLock();
try {
synchronized(writestate) {
if((! writestate.writesOngoing)
@ -683,32 +682,30 @@ public class HRegion implements HConstants {
}
}
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
if(! shouldCompact) {
LOG.info("not compacting region " + this.regionInfo.regionName);
return false;
} else {
lock.writeLock().lock();
try {
LOG.info("starting compaction on region " + this.regionInfo.regionName);
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
HStore store = it.next();
store.compact();
}
LOG.info("compaction completed on region " + this.regionInfo.regionName);
return true;
} finally {
synchronized(writestate) {
writestate.writesOngoing = false;
recentCommits = 0;
writestate.notifyAll();
}
lock.writeLock().unlock();
return false;
}
lock.obtainWriteLock();
try {
LOG.info("starting compaction on region " + this.regionInfo.regionName);
for (Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
HStore store = it.next();
store.compact();
}
LOG.info("compaction completed on region " + this.regionInfo.regionName);
return true;
} finally {
synchronized (writestate) {
writestate.writesOngoing = false;
recentCommits = 0;
writestate.notifyAll();
}
lock.releaseWriteLock();
}
}
@ -928,7 +925,7 @@ public class HRegion implements HConstants {
private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
lock.readLock().lock();
lock.obtainReadLock();
try {
// Check the memcache
@ -948,7 +945,7 @@ public class HRegion implements HConstants {
return targetStore.get(key, numVersions);
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
}
@ -965,7 +962,7 @@ public class HRegion implements HConstants {
public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
lock.readLock().lock();
lock.obtainReadLock();
try {
TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
@ -976,7 +973,7 @@ public class HRegion implements HConstants {
return memResult;
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
}
@ -985,7 +982,7 @@ public class HRegion implements HConstants {
* columns. This Iterator must be closed by the caller.
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
lock.readLock().lock();
lock.obtainReadLock();
try {
TreeSet<Text> families = new TreeSet<Text>();
for(int i = 0; i < cols.length; i++) {
@ -1001,7 +998,7 @@ public class HRegion implements HConstants {
return new HScanner(cols, firstRow, memcache, storelist);
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
}
@ -1024,11 +1021,11 @@ public class HRegion implements HConstants {
// We obtain a per-row lock, so other clients will
// block while one client performs an update.
lock.readLock().lock();
lock.obtainReadLock();
try {
return obtainLock(row);
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
}
}

View File

@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -64,7 +63,7 @@ public class HStore {
Integer compactLock = 0;
Integer flushLock = 0;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final HLocking lock = new HLocking();
TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
@ -237,7 +236,7 @@ public class HStore {
/** Turn off all the MapFile readers */
public void close() throws IOException {
LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
for (MapFile.Reader map: maps.values()) {
map.close();
@ -247,7 +246,7 @@ public class HStore {
LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
} finally {
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
}
@ -319,7 +318,7 @@ public class HStore {
// C. Finally, make the new MapFile available.
if(addToAvailableMaps) {
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
@ -330,7 +329,7 @@ public class HStore {
}
} finally {
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
}
return getAllMapFiles();
@ -338,12 +337,12 @@ public class HStore {
}
public Vector<HStoreFile> getAllMapFiles() {
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
return new Vector<HStoreFile>(mapFiles.values());
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
}
@ -385,12 +384,12 @@ public class HStore {
// Grab a list of files to compact.
Vector<HStoreFile> toCompactFiles = null;
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
} finally {
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
@ -627,7 +626,7 @@ public class HStore {
Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
this.lock.writeLock().lock();
this.lock.obtainWriteLock();
try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
if(! fs.exists(doneFile)) {
@ -744,7 +743,7 @@ public class HStore {
// 7. Releasing the write-lock
this.lock.writeLock().unlock();
this.lock.releaseWriteLock();
}
}
@ -760,7 +759,7 @@ public class HStore {
* The returned object should map column names to byte arrays (byte[]).
*/
public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
@ -789,7 +788,7 @@ public class HStore {
}
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
}
@ -805,7 +804,7 @@ public class HStore {
}
Vector<BytesWritable> results = new Vector<BytesWritable>();
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
@ -846,7 +845,7 @@ public class HStore {
}
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
}
@ -862,7 +861,7 @@ public class HStore {
return maxSize;
}
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
long mapIndex = 0L;
@ -889,7 +888,7 @@ public class HStore {
LOG.warn(e);
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
return maxSize;
}
@ -898,12 +897,12 @@ public class HStore {
* @return Returns the number of map files currently in use
*/
public int getNMaps() {
this.lock.readLock().lock();
this.lock.obtainReadLock();
try {
return maps.size();
} finally {
this.lock.readLock().unlock();
this.lock.releaseReadLock();
}
}
@ -945,7 +944,7 @@ public class HStore {
super(timestamp, targetCols);
lock.readLock().lock();
lock.obtainReadLock();
try {
this.readers = new MapFile.Reader[mapFiles.size()];
@ -1060,7 +1059,7 @@ public class HStore {
}
} finally {
lock.readLock().unlock();
lock.releaseReadLock();
scannerClosed = true;
}
}