HBASE-2295 Row locks may deadlock with themselves
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@921098 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bfd6812b40
commit
c9d8e720bb
@ -58,10 +58,13 @@ package org.apache.hadoop.hbase.regionserver;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
@ -119,8 +122,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||||||
// Members
|
// Members
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private final Map<Integer, byte []> locksToRows =
|
private final Set<byte[]> lockedRows =
|
||||||
new ConcurrentHashMap<Integer, byte []>();
|
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
private final Map<Integer, byte []> lockIds =
|
||||||
|
new HashMap<Integer, byte []>();
|
||||||
|
private int lockIdGenerator = 1;
|
||||||
|
static private Random rand = new Random();
|
||||||
|
|
||||||
protected final Map<byte [], Store> stores =
|
protected final Map<byte [], Store> stores =
|
||||||
new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
|
new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
|
||||||
|
|
||||||
@ -1583,18 +1591,34 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||||||
if (this.closed.get()) {
|
if (this.closed.get()) {
|
||||||
throw new NotServingRegionException("Region " + this + " closed");
|
throw new NotServingRegionException("Region " + this + " closed");
|
||||||
}
|
}
|
||||||
Integer key = Bytes.mapKey(row);
|
synchronized (lockedRows) {
|
||||||
synchronized (locksToRows) {
|
while (lockedRows.contains(row)) {
|
||||||
while (locksToRows.containsKey(key)) {
|
|
||||||
try {
|
try {
|
||||||
locksToRows.wait();
|
lockedRows.wait();
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
// Empty
|
// Empty
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
locksToRows.put(key, row);
|
// generate a new lockid. Attempt to insert the new [lockid, row].
|
||||||
locksToRows.notifyAll();
|
// if this lockid already exists in the map then revert and retry
|
||||||
return key;
|
// We could have first done a lockIds.get, and if it does not exist only
|
||||||
|
// then do a lockIds.put, but the hope is that the lockIds.put will
|
||||||
|
// mostly return null the first time itself because there won't be
|
||||||
|
// too many lockId collisions.
|
||||||
|
byte [] prev = null;
|
||||||
|
Integer lockId = null;
|
||||||
|
do {
|
||||||
|
lockId = new Integer(lockIdGenerator++);
|
||||||
|
prev = lockIds.put(lockId, row);
|
||||||
|
if (prev != null) {
|
||||||
|
lockIds.put(lockId, prev); // revert old value
|
||||||
|
lockIdGenerator = rand.nextInt(); // generate new start point
|
||||||
|
}
|
||||||
|
} while (prev != null);
|
||||||
|
|
||||||
|
lockedRows.add(row);
|
||||||
|
lockedRows.notifyAll();
|
||||||
|
return lockId;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
splitsAndClosesLock.readLock().unlock();
|
splitsAndClosesLock.readLock().unlock();
|
||||||
@ -1607,7 +1631,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||||||
* @return Row that goes with <code>lockid</code>
|
* @return Row that goes with <code>lockid</code>
|
||||||
*/
|
*/
|
||||||
byte [] getRowFromLock(final Integer lockid) {
|
byte [] getRowFromLock(final Integer lockid) {
|
||||||
return locksToRows.get(lockid);
|
synchronized (lockedRows) {
|
||||||
|
return lockIds.get(lockid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1615,9 +1641,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||||||
* @param lockid The lock ID to release.
|
* @param lockid The lock ID to release.
|
||||||
*/
|
*/
|
||||||
void releaseRowLock(final Integer lockid) {
|
void releaseRowLock(final Integer lockid) {
|
||||||
synchronized (locksToRows) {
|
synchronized (lockedRows) {
|
||||||
locksToRows.remove(lockid);
|
byte[] row = lockIds.remove(lockid);
|
||||||
locksToRows.notifyAll();
|
lockedRows.remove(row);
|
||||||
|
lockedRows.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1627,8 +1654,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||||||
* @return boolean
|
* @return boolean
|
||||||
*/
|
*/
|
||||||
private boolean isRowLocked(final Integer lockid) {
|
private boolean isRowLocked(final Integer lockid) {
|
||||||
synchronized (locksToRows) {
|
synchronized (lockedRows) {
|
||||||
if(locksToRows.containsKey(lockid)) {
|
if (lockIds.get(lockid) != null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
@ -1656,11 +1683,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void waitOnRowLocks() {
|
private void waitOnRowLocks() {
|
||||||
synchronized (locksToRows) {
|
synchronized (lockedRows) {
|
||||||
while (this.locksToRows.size() > 0) {
|
while (!this.lockedRows.isEmpty()) {
|
||||||
LOG.debug("waiting for " + this.locksToRows.size() + " row locks");
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Waiting on " + this.lockedRows.size() + " row locks");
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
this.locksToRows.wait();
|
this.lockedRows.wait();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// Catch. Let while test determine loop-end.
|
// Catch. Let while test determine loop-end.
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user