From 0fdd555ace87f9d9b45635e9aa4d0f44d56b8807 Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Thu, 7 Feb 2008 06:35:09 +0000 Subject: [PATCH] HBASE-415 Rewrite leases to use DelayedBlockingQueue instead of polling git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@619288 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + src/java/org/apache/hadoop/hbase/HMaster.java | 31 +- .../apache/hadoop/hbase/HRegionServer.java | 6 +- src/java/org/apache/hadoop/hbase/Leases.java | 384 ++++++------------ 4 files changed, 140 insertions(+), 282 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c21c9040c11..c52fe9491fc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,7 @@ Trunk (unreleased changes) HADOOP-2555 Refactor the HTable#get and HTable#getRow methods to avoid repetition of retry-on-failure logic (thanks to Peter Dolan and Bryan Duxbury) + HBASE-415 Rewrite leases to use DelayedBlockingQueue instead of polling Release 0.16.0 diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index 751628f8968..a42e2208f61 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -46,6 +46,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -372,10 +373,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, Path p = HStoreFile.getMapDir(tabledir, split.getEncodedName(), family.getFamilyName()); - // Look for reference files. Call listPaths with an anonymous + // Look for reference files. Call listStatus with an anonymous // instance of PathFilter. - Path [] ps = fs.listPaths(p, + FileStatus [] ps = fs.listStatus(p, new PathFilter () { public boolean accept(Path path) { return HStore.isReference(path); @@ -1306,8 +1307,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, loadToServers.put(load, servers); if (!closed.get()) { - long serverLabel = getServerLabel(s); - serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); + serverLeases.createLease(s, new ServerExpirer(s)); } return createConfigurationSubset(); @@ -1327,15 +1327,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, return mw; } - private long getServerLabel(final String s) { - return s.hashCode(); - } - /** {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String serverName = serverInfo.getServerAddress().toString().trim(); - long serverLabel = getServerLabel(serverName); if (msgs.length > 0) { if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { synchronized (serversToServerInfo) { @@ -1348,7 +1343,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, ": MSG_REPORT_EXITING -- cancelling lease"); } - if (cancelLease(serverName, serverLabel)) { + if (cancelLease(serverName)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. LOG.info("Region server " + serverName + @@ -1428,7 +1423,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, } synchronized (serversToServerInfo) { - cancelLease(serverName, serverLabel); + cancelLease(serverName); serversToServerInfo.notifyAll(); } return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; @@ -1439,7 +1434,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, // This will always succeed; otherwise, the fetch of serversToServerInfo // would have failed above. - serverLeases.renewLease(serverLabel, serverLabel); + serverLeases.renewLease(serverName); // Refresh the info object and the load information @@ -1476,7 +1471,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, } /** Cancel a server's lease and update its load information */ - private boolean cancelLease(final String serverName, final long serverLabel) { + private boolean cancelLease(final String serverName) { boolean leaseCancelled = false; HServerInfo info = serversToServerInfo.remove(serverName); if (info != null) { @@ -1487,7 +1482,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, unassignRootRegion(); } LOG.info("Cancelling lease for " + serverName); - serverLeases.cancelLease(serverLabel, serverLabel); + serverLeases.cancelLease(serverName); leaseCancelled = true; // update load information @@ -3120,20 +3115,20 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, /* * Data structure used to return results out of the toRowMap method. */ - private class RowMap { + class RowMap { final Text row; final SortedMap map; - private RowMap(final Text r, final SortedMap m) { + RowMap(final Text r, final SortedMap m) { this.row = r; this.map = m; } - private Text getRow() { + Text getRow() { return this.row; } - private SortedMap getMap() { + SortedMap getMap() { return this.map; } } diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index eb6a6082be4..3a2048048cc 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -1395,7 +1395,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } - this.leases.renewLease(scannerId, scannerId); + this.leases.renewLease(scannerName); // Collect values to be returned here HbaseMapWritable values = new HbaseMapWritable(); @@ -1458,7 +1458,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { scanners.put(scannerName, s); } this.leases. - createLease(scannerId, scannerId, new ScannerListener(scannerName)); + createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1482,7 +1482,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { throw new UnknownScannerException(scannerName); } s.close(); - this.leases.cancelLease(scannerId, scannerId); + this.leases.cancelLease(scannerName); } catch (IOException e) { checkFileSystem(); throw e; diff --git a/src/java/org/apache/hadoop/hbase/Leases.java b/src/java/org/apache/hadoop/hbase/Leases.java index 4f3a7580c92..6f5e677c0a6 100644 --- a/src/java/org/apache/hadoop/hbase/Leases.java +++ b/src/java/org/apache/hadoop/hbase/Leases.java @@ -21,9 +21,13 @@ package org.apache.hadoop.hbase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; + +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Delayed; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; /** * Leases @@ -39,19 +43,19 @@ import java.util.concurrent.atomic.AtomicBoolean; * An instance of the Leases class will create a thread to do its dirty work. * You should close() the instance if you want to clean up the thread properly. */ -public class Leases { - protected static final Log LOG = LogFactory.getLog(Leases.class.getName()); +public class Leases extends Thread { + private static final Log LOG = LogFactory.getLog(Leases.class.getName()); + private final int leasePeriod; + private final int leaseCheckFrequency; + private volatile DelayQueue leaseQueue = new DelayQueue(); - protected final int leasePeriod; - protected final int leaseCheckFrequency; - private final Thread leaseMonitorThread; - protected final Map leases = - new HashMap(); - protected final TreeSet sortedLeases = new TreeSet(); - protected AtomicBoolean stop = new AtomicBoolean(false); + protected final Map leases = new HashMap(); + protected final Map listeners = + new HashMap(); + private volatile boolean stopRequested = false; /** - * Creates a lease + * Creates a lease monitor * * @param leasePeriod - length of time (milliseconds) that the lease is valid * @param leaseCheckFrequency - how often the lease should be checked @@ -60,21 +64,39 @@ public class Leases { public Leases(final int leasePeriod, final int leaseCheckFrequency) { this.leasePeriod = leasePeriod; this.leaseCheckFrequency = leaseCheckFrequency; - this.leaseMonitorThread = - new LeaseMonitor(this.leaseCheckFrequency, this.stop); - this.leaseMonitorThread.setDaemon(true); } - /** Starts the lease monitor */ - public void start() { - leaseMonitorThread.start(); - } - - /** - * @param name Set name on the lease checking daemon thread. - */ - public void setName(final String name) { - this.leaseMonitorThread.setName(name); + /** {@inheritDoc} */ + @Override + public void run() { + while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) { + Lease lease = null; + try { + lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException e) { + continue; + + } catch (ConcurrentModificationException e) { + continue; + } + if (lease == null) { + continue; + } + // A lease expired + LeaseListener listener = null; + synchronized (leaseQueue) { + String leaseName = lease.getLeaseName(); + leases.remove(leaseName); + listener = listeners.remove(leaseName); + if (listener == null) { + LOG.error("lease listener is null for lease " + leaseName); + continue; + } + } + listener.leaseExpired(); + } + close(); } /** @@ -85,20 +107,7 @@ public class Leases { * allocation of new leases. */ public void closeAfterLeasesExpire() { - synchronized(this.leases) { - while (this.leases.size() > 0) { - LOG.info(Thread.currentThread().getName() + " " + - Integer.toString(leases.size()) + " lease(s) " + - "outstanding. Waiting for them to expire."); - try { - this.leases.wait(this.leaseCheckFrequency); - } catch (InterruptedException e) { - // continue - } - } - } - // Now call close since no leases outstanding. - close(); + this.stopRequested = true; } /** @@ -107,271 +116,124 @@ public class Leases { */ public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); - this.stop.set(true); - while (this.leaseMonitorThread.isAlive()) { - try { - this.leaseMonitorThread.interrupt(); - this.leaseMonitorThread.join(); - } catch (InterruptedException iex) { - // Ignore - } - } - synchronized(leases) { - synchronized(sortedLeases) { - leases.clear(); - sortedLeases.clear(); - } + this.stopRequested = true; + synchronized (leaseQueue) { + leaseQueue.clear(); + leases.clear(); + listeners.clear(); + leaseQueue.notifyAll(); } LOG.info(Thread.currentThread().getName() + " closed leases"); } - /* A client obtains a lease... */ - /** * Obtain a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of the lease * @param listener listener that will process lease expirations */ - public void createLease(final long holderId, final long resourceId, - final LeaseListener listener) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - Lease lease = new Lease(holderId, resourceId, listener); - name = lease.getLeaseName(); - if(leases.get(name) != null) { - throw new AssertionError("Impossible state for createLease(): " + - "Lease " + name + " is still held."); - } - leases.put(name, lease); - sortedLeases.add(lease); - } + public void createLease(String leaseName, final LeaseListener listener) { + if (stopRequested) { + return; + } + Lease lease = new Lease(leaseName, System.currentTimeMillis() + leasePeriod); + synchronized (leaseQueue) { + if (leases.containsKey(leaseName)) { + throw new IllegalStateException("lease '" + leaseName + + "' already exists"); + } + leases.put(leaseName, lease); + listeners.put(leaseName, listener); + leaseQueue.add(lease); } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Created lease " + name); -// } } - /* A client renews a lease... */ /** * Renew a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased - * @throws IOException + * @param leaseName name of lease */ - public void renewLease(final long holderId, final long resourceId) - throws IOException { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); - if (lease == null) { - // It's possible that someone tries to renew the lease, but - // it just expired a moment ago. So fail. - throw new IOException("Cannot renew lease that is not held: " + - name); - } - sortedLeases.remove(lease); - lease.renew(); - sortedLeases.add(lease); + public void renewLease(final String leaseName) { + synchronized (leaseQueue) { + Lease lease = leases.get(leaseName); + if (lease == null) { + throw new IllegalArgumentException("lease '" + leaseName + + "' does not exist"); } + leaseQueue.remove(lease); + lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); + leaseQueue.add(lease); } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Renewed lease " + name); -// } } /** * Client explicitly cancels a lease. * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of lease */ - public void cancelLease(final long holderId, final long resourceId) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); - if (lease == null) { - // It's possible that someone tries to renew the lease, but - // it just expired a moment ago. So just skip it. - return; - } - sortedLeases.remove(lease); - leases.remove(name); + public void cancelLease(final String leaseName) { + synchronized (leaseQueue) { + Lease lease = leases.remove(leaseName); + if (lease == null) { + throw new IllegalArgumentException("lease '" + leaseName + + "' does not exist"); } + leaseQueue.remove(lease); + listeners.remove(leaseName); } } - /** - * LeaseMonitor is a thread that expires Leases that go on too long. - * Its a daemon thread. - */ - class LeaseMonitor extends Chore { - /** - * @param p - * @param s - */ - public LeaseMonitor(int p, AtomicBoolean s) { - super(p, s); - } - - /** {@inheritDoc} */ - @Override - protected void chore() { - synchronized(leases) { - synchronized(sortedLeases) { - Lease top; - while((sortedLeases.size() > 0) - && ((top = sortedLeases.first()) != null)) { - if(top.shouldExpire()) { - leases.remove(top.getLeaseName()); - sortedLeases.remove(top); - top.expired(); - } else { - break; - } - } - } - } - } - } - - /* - * A Lease name. - * More lightweight than String or Text. - */ - @SuppressWarnings("unchecked") - class LeaseName implements Comparable { - private final long holderId; - private final long resourceId; - - LeaseName(final long hid, final long rid) { - this.holderId = hid; - this.resourceId = rid; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object obj) { - LeaseName other = (LeaseName)obj; - return this.holderId == other.holderId && - this.resourceId == other.resourceId; - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - // Copy OR'ing from javadoc for Long#hashCode. - int result = (int)(this.holderId ^ (this.holderId >>> 32)); - result ^= (int)(this.resourceId ^ (this.resourceId >>> 32)); - return result; - } - - /** {@inheritDoc} */ - @Override - public String toString() { - return Long.toString(this.holderId) + "/" + - Long.toString(this.resourceId); - } - - /** {@inheritDoc} */ - public int compareTo(Object obj) { - LeaseName other = (LeaseName)obj; - if (this.holderId < other.holderId) { - return -1; - } - if (this.holderId > other.holderId) { - return 1; - } - // holderIds are equal - if (this.resourceId < other.resourceId) { - return -1; - } - if (this.resourceId > other.resourceId) { - return 1; - } - // Objects are equal - return 0; - } - } - - /** Create a lease id out of the holder and resource ids. */ - protected LeaseName createLeaseName(final long hid, final long rid) { - return new LeaseName(hid, rid); - } - /** This class tracks a single Lease. */ - @SuppressWarnings("unchecked") - private class Lease implements Comparable { - final long holderId; - final long resourceId; - final LeaseListener listener; - long lastUpdate; - private LeaseName leaseId; + private static class Lease implements Delayed { + private final String leaseName; + private long expirationTime; - Lease(final long holderId, final long resourceId, - final LeaseListener listener) { - this.holderId = holderId; - this.resourceId = resourceId; - this.listener = listener; - renew(); + Lease(final String leaseName, long expirationTime) { + this.leaseName = leaseName; + this.expirationTime = expirationTime; } - - synchronized LeaseName getLeaseName() { - if (this.leaseId == null) { - this.leaseId = createLeaseName(holderId, resourceId); - } - return this.leaseId; + + /** @return the lease name */ + public String getLeaseName() { + return leaseName; } - - boolean shouldExpire() { - return (System.currentTimeMillis() - lastUpdate > leasePeriod); - } - - void renew() { - this.lastUpdate = System.currentTimeMillis(); - } - - void expired() { - LOG.info(Thread.currentThread().getName() + " lease expired " + - getLeaseName()); - listener.leaseExpired(); - } - + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { - return compareTo(obj) == 0; + return this.hashCode() == ((Lease) obj).hashCode(); } /** {@inheritDoc} */ @Override public int hashCode() { - int result = this.getLeaseName().hashCode(); - result ^= this.lastUpdate; - return result; + return this.leaseName.hashCode(); } - - ////////////////////////////////////////////////////////////////////////////// - // Comparable - ////////////////////////////////////////////////////////////////////////////// /** {@inheritDoc} */ - public int compareTo(Object o) { - Lease other = (Lease) o; - if(this.lastUpdate < other.lastUpdate) { - return -1; - } else if(this.lastUpdate > other.lastUpdate) { - return 1; - } else { - return this.getLeaseName().compareTo(other.getLeaseName()); + public long getDelay(TimeUnit unit) { + return unit.convert(this.expirationTime - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** {@inheritDoc} */ + public int compareTo(Delayed o) { + long delta = this.getDelay(TimeUnit.MILLISECONDS) - + o.getDelay(TimeUnit.MILLISECONDS); + + int value = 0; + if (delta > 0) { + value = 1; + + } else if (delta < 0) { + value = -1; } + return value; + } + + /** @param expirationTime the expirationTime to set */ + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; } } } \ No newline at end of file