diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java index 78ceb9766de..190ed9d1d1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java @@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.util.HasThread; import java.util.ConcurrentModificationException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Delayed; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; @@ -54,10 +56,10 @@ import java.io.IOException; @InterfaceAudience.Private public class Leases extends HasThread { private static final Log LOG = LogFactory.getLog(Leases.class.getName()); - private final int leaseCheckFrequency; - private final DelayQueue leaseQueue = new DelayQueue(); - protected final Map leases = new HashMap(); - private volatile boolean stopRequested = false; + private final Map leases = new ConcurrentHashMap(); + + protected final int leaseCheckFrequency; + protected volatile boolean stopRequested = false; /** * Creates a lease monitor @@ -71,14 +73,22 @@ public class Leases extends HasThread { } /** - * @see java.lang.Thread#run() + * @see Thread#run() */ @Override public void run() { - while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) { - Lease lease = null; + long toWait = leaseCheckFrequency; + Lease nextLease = null; + long nextLeaseDelay = Long.MAX_VALUE; + + while (!stopRequested || (stopRequested && !leases.isEmpty()) ) { + try { - lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS); + if (nextLease != null) { + toWait = nextLease.getDelay(TimeUnit.MILLISECONDS); + } + toWait = Math.min(leaseCheckFrequency, toWait); + Thread.sleep(toWait); } catch (InterruptedException e) { continue; } catch (ConcurrentModificationException e) { @@ -87,18 +97,28 @@ public class Leases extends HasThread { LOG.fatal("Unexpected exception killed leases thread", e); break; } - if (lease == null) { - continue; - } - // A lease expired. Run the expired code before removing from queue - // since its presence in queue is used to see if lease exists still. - if (lease.getListener() == null) { - LOG.error("lease listener is null for lease " + lease.getLeaseName()); - } else { - lease.getListener().leaseExpired(); - } - synchronized (leaseQueue) { - leases.remove(lease.getLeaseName()); + + nextLease = null; + nextLeaseDelay = Long.MAX_VALUE; + for (Iterator> it = leases.entrySet().iterator(); it.hasNext();) { + Map.Entry entry = it.next(); + Lease lease = entry.getValue(); + long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS); + if ( thisLeaseDelay > 0) { + if (nextLease == null || thisLeaseDelay < nextLeaseDelay) { + nextLease = lease; + nextLeaseDelay = thisLeaseDelay; + } + } else { + // A lease expired. Run the expired code before removing from map + // since its presence in map is used to see if lease exists still. + if (lease.getListener() == null) { + LOG.error("lease listener is null for lease " + lease.getLeaseName()); + } else { + lease.getListener().leaseExpired(); + } + it.remove(); + } } } close(); @@ -122,17 +142,13 @@ public class Leases extends HasThread { public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); this.stopRequested = true; - synchronized (leaseQueue) { - leaseQueue.clear(); - leases.clear(); - leaseQueue.notifyAll(); - } + leases.clear(); LOG.info(Thread.currentThread().getName() + " closed leases"); } /** * Obtain a lease. - * + * * @param leaseName name of the lease * @param leaseTimeoutPeriod length of the lease in milliseconds * @param listener listener that will process lease expirations @@ -153,13 +169,54 @@ public class Leases extends HasThread { return; } lease.resetExpirationTime(); - synchronized (leaseQueue) { - if (leases.containsKey(lease.getLeaseName())) { - throw new LeaseStillHeldException(lease.getLeaseName()); - } - leases.put(lease.getLeaseName(), lease); - leaseQueue.add(lease); + if (leases.containsKey(lease.getLeaseName())) { + throw new LeaseStillHeldException(lease.getLeaseName()); } + leases.put(lease.getLeaseName(), lease); + } + + /** + * Renew a lease + * + * @param leaseName name of lease + * @throws LeaseException + */ + public void renewLease(final String leaseName) throws LeaseException { + Lease lease = leases.get(leaseName); + // We need to check to see if the remove is successful as the poll in the run() + // method could have completed between the get and the remove which will result + // in a corrupt leaseQueue. + if (lease == null ) { + throw new LeaseException("lease '" + leaseName + + "' does not exist or has already expired"); + } + lease.resetExpirationTime(); + } + + /** + * Client explicitly cancels a lease. + * @param leaseName name of lease + * @throws org.apache.hadoop.hbase.regionserver.LeaseException + */ + public void cancelLease(final String leaseName) throws LeaseException { + removeLease(leaseName); + } + + /** + * Remove named lease. + * Lease is removed from the list of leases and removed from the delay queue. + * Lease can be resinserted using {@link #addLease(Lease)} + * + * @param leaseName name of lease + * @throws org.apache.hadoop.hbase.regionserver.LeaseException + * @return Removed lease + */ + Lease removeLease(final String leaseName) throws LeaseException { + Lease lease = leases.remove(leaseName); + if (lease == null) { + throw new LeaseException("lease '" + leaseName + "' does not exist"); + } + return lease; } /** @@ -183,57 +240,6 @@ public class Leases extends HasThread { } } - /** - * Renew a lease - * - * @param leaseName name of lease - * @throws org.apache.hadoop.hbase.regionserver.LeaseException - */ - public void renewLease(final String leaseName) throws LeaseException { - synchronized (leaseQueue) { - Lease lease = leases.get(leaseName); - // We need to check to see if the remove is successful as the poll in the run() - // method could have completed between the get and the remove which will result - // in a corrupt leaseQueue. - if (lease == null || !leaseQueue.remove(lease)) { - throw new LeaseException("lease '" + leaseName + - "' does not exist or has already expired"); - } - lease.resetExpirationTime(); - leaseQueue.add(lease); - } - } - - /** - * Client explicitly cancels a lease. - * @param leaseName name of lease - * @throws LeaseException - */ - public void cancelLease(final String leaseName) throws LeaseException { - removeLease(leaseName); - } - - /** - * Remove named lease. - * Lease is removed from the list of leases and removed from the delay queue. - * Lease can be resinserted using {@link #addLease(Lease)} - * - * @param leaseName name of lease - * @throws LeaseException - * @return Removed lease - */ - Lease removeLease(final String leaseName) throws LeaseException { - Lease lease = null; - synchronized (leaseQueue) { - lease = leases.remove(leaseName); - if (lease == null) { - throw new LeaseException("lease '" + leaseName + "' does not exist"); - } - leaseQueue.remove(lease); - } - return lease; - } - /** This class tracks a single Lease. */ static class Lease implements Delayed { private final String leaseName;