HBASE-9144 Leases class has contention that's not needed

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1513974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
eclark 2013-08-14 17:41:29 +00:00
parent 88de40d738
commit 2e74ef45c4
1 changed files with 89 additions and 83 deletions

View File

@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.util.HasThread;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@ -54,10 +56,10 @@ import java.io.IOException;
@InterfaceAudience.Private
public class Leases extends HasThread {
private static final Log LOG = LogFactory.getLog(Leases.class.getName());
private final int leaseCheckFrequency;
private final DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
protected final Map<String, Lease> leases = new HashMap<String, Lease>();
private volatile boolean stopRequested = false;
private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
protected final int leaseCheckFrequency;
protected volatile boolean stopRequested = false;
/**
* Creates a lease monitor
@ -71,14 +73,22 @@ public class Leases extends HasThread {
}
/**
* @see java.lang.Thread#run()
* @see Thread#run()
*/
@Override
public void run() {
while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
Lease lease = null;
long toWait = leaseCheckFrequency;
Lease nextLease = null;
long nextLeaseDelay = Long.MAX_VALUE;
while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
try {
lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
if (nextLease != null) {
toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
}
toWait = Math.min(leaseCheckFrequency, toWait);
Thread.sleep(toWait);
} catch (InterruptedException e) {
continue;
} catch (ConcurrentModificationException e) {
@ -87,18 +97,28 @@ public class Leases extends HasThread {
LOG.fatal("Unexpected exception killed leases thread", e);
break;
}
if (lease == null) {
continue;
}
// A lease expired. Run the expired code before removing from queue
// since its presence in queue is used to see if lease exists still.
if (lease.getListener() == null) {
LOG.error("lease listener is null for lease " + lease.getLeaseName());
} else {
lease.getListener().leaseExpired();
}
synchronized (leaseQueue) {
leases.remove(lease.getLeaseName());
nextLease = null;
nextLeaseDelay = Long.MAX_VALUE;
for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, Lease> entry = it.next();
Lease lease = entry.getValue();
long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
if ( thisLeaseDelay > 0) {
if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
nextLease = lease;
nextLeaseDelay = thisLeaseDelay;
}
} else {
// A lease expired. Run the expired code before removing from map
// since its presence in map is used to see if lease exists still.
if (lease.getListener() == null) {
LOG.error("lease listener is null for lease " + lease.getLeaseName());
} else {
lease.getListener().leaseExpired();
}
it.remove();
}
}
}
close();
@ -122,17 +142,13 @@ public class Leases extends HasThread {
public void close() {
LOG.info(Thread.currentThread().getName() + " closing leases");
this.stopRequested = true;
synchronized (leaseQueue) {
leaseQueue.clear();
leases.clear();
leaseQueue.notifyAll();
}
leases.clear();
LOG.info(Thread.currentThread().getName() + " closed leases");
}
/**
* Obtain a lease.
*
*
* @param leaseName name of the lease
* @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations
@ -153,13 +169,54 @@ public class Leases extends HasThread {
return;
}
lease.resetExpirationTime();
synchronized (leaseQueue) {
if (leases.containsKey(lease.getLeaseName())) {
throw new LeaseStillHeldException(lease.getLeaseName());
}
leases.put(lease.getLeaseName(), lease);
leaseQueue.add(lease);
if (leases.containsKey(lease.getLeaseName())) {
throw new LeaseStillHeldException(lease.getLeaseName());
}
leases.put(lease.getLeaseName(), lease);
}
/**
* Renew a lease
*
* @param leaseName name of lease
* @throws LeaseException
*/
public void renewLease(final String leaseName) throws LeaseException {
Lease lease = leases.get(leaseName);
// We need to check to see if the remove is successful as the poll in the run()
// method could have completed between the get and the remove which will result
// in a corrupt leaseQueue.
if (lease == null ) {
throw new LeaseException("lease '" + leaseName +
"' does not exist or has already expired");
}
lease.resetExpirationTime();
}
/**
* Client explicitly cancels a lease.
* @param leaseName name of lease
* @throws org.apache.hadoop.hbase.regionserver.LeaseException
*/
public void cancelLease(final String leaseName) throws LeaseException {
removeLease(leaseName);
}
/**
* Remove named lease.
* Lease is removed from the list of leases and removed from the delay queue.
* Lease can be resinserted using {@link #addLease(Lease)}
*
* @param leaseName name of lease
* @throws org.apache.hadoop.hbase.regionserver.LeaseException
* @return Removed lease
*/
Lease removeLease(final String leaseName) throws LeaseException {
Lease lease = leases.remove(leaseName);
if (lease == null) {
throw new LeaseException("lease '" + leaseName + "' does not exist");
}
return lease;
}
/**
@ -183,57 +240,6 @@ public class Leases extends HasThread {
}
}
/**
* Renew a lease
*
* @param leaseName name of lease
* @throws org.apache.hadoop.hbase.regionserver.LeaseException
*/
public void renewLease(final String leaseName) throws LeaseException {
synchronized (leaseQueue) {
Lease lease = leases.get(leaseName);
// We need to check to see if the remove is successful as the poll in the run()
// method could have completed between the get and the remove which will result
// in a corrupt leaseQueue.
if (lease == null || !leaseQueue.remove(lease)) {
throw new LeaseException("lease '" + leaseName +
"' does not exist or has already expired");
}
lease.resetExpirationTime();
leaseQueue.add(lease);
}
}
/**
* Client explicitly cancels a lease.
* @param leaseName name of lease
* @throws LeaseException
*/
public void cancelLease(final String leaseName) throws LeaseException {
removeLease(leaseName);
}
/**
* Remove named lease.
* Lease is removed from the list of leases and removed from the delay queue.
* Lease can be resinserted using {@link #addLease(Lease)}
*
* @param leaseName name of lease
* @throws LeaseException
* @return Removed lease
*/
Lease removeLease(final String leaseName) throws LeaseException {
Lease lease = null;
synchronized (leaseQueue) {
lease = leases.remove(leaseName);
if (lease == null) {
throw new LeaseException("lease '" + leaseName + "' does not exist");
}
leaseQueue.remove(lease);
}
return lease;
}
/** This class tracks a single Lease. */
static class Lease implements Delayed {
private final String leaseName;