diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 63bff063d3f..1cb5a11e596 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -23,14 +23,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.NavigableSet; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -90,21 +88,11 @@ public class LeaseManager { private long lastHolderUpdateTime; private String internalLeaseHolder; + // // Used for handling lock-leases // Mapping: leaseHolder -> Lease - private final SortedMap leases = new TreeMap<>(); - // Set of: Lease - private final NavigableSet sortedLeases = new TreeSet<>( - new Comparator() { - @Override - public int compare(Lease o1, Lease o2) { - if (o1.getLastUpdate() != o2.getLastUpdate()) { - return Long.signum(o1.getLastUpdate() - o2.getLastUpdate()); - } else { - return o1.holder.compareTo(o2.holder); - } - } - }); + // + private final HashMap leases = new HashMap<>(); // INodeID -> Lease private final TreeMap leasesById = new TreeMap<>(); @@ -338,7 +326,7 @@ public class LeaseManager { /** @return the number of leases currently in the system */ @VisibleForTesting public synchronized int countLease() { - return sortedLeases.size(); + return leases.size(); } /** @return the number of paths contained in all leases */ @@ -354,7 +342,6 @@ public class LeaseManager { if (lease == null) { lease = new Lease(holder); leases.put(holder, lease); - sortedLeases.add(lease); } else { renewLease(lease); } @@ -380,9 +367,8 @@ public class LeaseManager { } if (!lease.hasFiles()) { - leases.remove(lease.holder); - if (!sortedLeases.remove(lease)) { - LOG.error("{} not found in sortedLeases", lease); + if (leases.remove(lease.holder) == null) { + LOG.error("{} not found", lease); } } } @@ -401,7 +387,6 @@ public class LeaseManager { } synchronized void removeAllLeases() { - sortedLeases.clear(); leasesById.clear(); leases.clear(); } @@ -424,11 +409,10 @@ public class LeaseManager { synchronized void renewLease(String holder) { renewLease(getLease(holder)); } + synchronized void renewLease(Lease lease) { if (lease != null) { - sortedLeases.remove(lease); lease.renew(); - sortedLeases.add(lease); } } @@ -452,10 +436,10 @@ public class LeaseManager { private final String holder; private long lastUpdate; private final HashSet files = new HashSet<>(); - + /** Only LeaseManager object can create a lease */ - private Lease(String holder) { - this.holder = holder; + private Lease(String h) { + this.holder = h; renew(); } /** Only LeaseManager object can renew a lease */ @@ -468,6 +452,10 @@ public class LeaseManager { return monotonicNow() - lastUpdate > hardLimit; } + public boolean expiredHardLimit(long now) { + return now - lastUpdate > hardLimit; + } + /** @return true if the Soft Limit Timer has expired */ public boolean expiredSoftLimit() { return monotonicNow() - lastUpdate > softLimit; @@ -490,7 +478,7 @@ public class LeaseManager { public int hashCode() { return holder.hashCode(); } - + private Collection getFiles() { return Collections.unmodifiableCollection(files); } @@ -509,6 +497,17 @@ public class LeaseManager { this.softLimit = softLimit; this.hardLimit = hardLimit; } + + private synchronized Collection getExpiredCandidateLeases() { + final long now = Time.monotonicNow(); + Collection expired = new HashSet<>(); + for (Lease lease : leases.values()) { + if (lease.expiredHardLimit(now)) { + expired.add(lease); + } + } + return expired; + } /****************************************************** * Monitor checks for leases that have expired, @@ -523,10 +522,19 @@ public class LeaseManager { for(; shouldRunMonitor && fsnamesystem.isRunning(); ) { boolean needSync = false; try { + // sleep now to avoid infinite loop if an exception was thrown. + Thread.sleep(fsnamesystem.getLeaseRecheckIntervalMs()); + + // pre-filter the leases w/o the fsn lock. + Collection candidates = getExpiredCandidateLeases(); + if (candidates.isEmpty()) { + continue; + } + fsnamesystem.writeLockInterruptibly(); try { if (!fsnamesystem.isInSafeMode()) { - needSync = checkLeases(); + needSync = checkLeases(candidates); } } finally { fsnamesystem.writeUnlock("leaseManager"); @@ -535,8 +543,6 @@ public class LeaseManager { fsnamesystem.getEditLog().logSync(); } } - - Thread.sleep(fsnamesystem.getLeaseRecheckIntervalMs()); } catch(InterruptedException ie) { LOG.debug("{} is interrupted", name, ie); } catch(Throwable e) { @@ -551,17 +557,22 @@ public class LeaseManager { */ @VisibleForTesting synchronized boolean checkLeases() { + return checkLeases(getExpiredCandidateLeases()); + } + + private synchronized boolean checkLeases(Collection leasesToCheck) { boolean needSync = false; assert fsnamesystem.hasWriteLock(); long start = monotonicNow(); - - while(!sortedLeases.isEmpty() && - sortedLeases.first().expiredHardLimit() - && !isMaxLockHoldToReleaseLease(start)) { - Lease leaseToCheck = sortedLeases.first(); + for (Lease leaseToCheck : leasesToCheck) { + if (isMaxLockHoldToReleaseLease(start)) { + break; + } + if (!leaseToCheck.expiredHardLimit(Time.monotonicNow())) { + continue; + } LOG.info("{} has expired hard limit", leaseToCheck); - final List removing = new ArrayList<>(); // need to create a copy of the oldest lease files, because // internalReleaseLease() removes files corresponding to empty files, @@ -623,7 +634,6 @@ public class LeaseManager { removeLease(leaseToCheck, id); } } - return needSync; } @@ -638,7 +648,6 @@ public class LeaseManager { public synchronized String toString() { return getClass().getSimpleName() + "= {" + "\n leases=" + leases - + "\n sortedLeases=" + sortedLeases + "\n leasesById=" + leasesById + "\n}"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java index 7e0f64b12d3..f0b4ffccd70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java @@ -21,13 +21,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.Semaphore; import org.apache.hadoop.fs.Options; @@ -55,7 +53,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; -import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.Node; @@ -68,7 +65,7 @@ import org.junit.Test; import org.junit.rules.Timeout; import org.mockito.Mockito; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LEASE_HARDLIMIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY; import static org.junit.Assert.assertNotEquals; @@ -386,6 +383,10 @@ public class TestDeleteRace { // Disable permissions so that another user can recover the lease. config.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); config.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + long leaseRecheck = 1000; + conf.setLong(DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY, leaseRecheck); + conf.setLong(DFS_LEASE_HARDLIMIT_KEY, leaseRecheck/1000); + FSDataOutputStream stm = null; try { cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build(); @@ -410,30 +411,8 @@ public class TestDeleteRace { // the streamer. AppendTestUtil.write(stm, 0, BLOCK_SIZE); - // Mock a scenario that the lease reached hard limit. - final LeaseManager lm = (LeaseManager) Whitebox - .getInternalState(cluster.getNameNode().getNamesystem(), - "leaseManager"); - final TreeSet leases = - (TreeSet) Whitebox.getInternalState(lm, "sortedLeases"); - final TreeSet spyLeases = new TreeSet<>(new Comparator() { - @Override - public int compare(Lease o1, Lease o2) { - return Long.signum(o1.getLastUpdate() - o2.getLastUpdate()); - } - }); - while (!leases.isEmpty()) { - final Lease lease = leases.first(); - final Lease spyLease = Mockito.spy(lease); - Mockito.doReturn(true).when(spyLease).expiredHardLimit(); - spyLeases.add(spyLease); - leases.remove(lease); - } - Whitebox.setInternalState(lm, "sortedLeases", spyLeases); - // wait for lease manager's background 'Monitor' class to check leases. - Thread.sleep(2 * conf.getLong(DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY, - DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT)); + Thread.sleep(2 * leaseRecheck); LOG.info("Now check we can restart"); cluster.restartNameNodes();