diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 9e532f2494a..ed3f86b2a6e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -509,7 +509,15 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out) throws IOException { synchronized (filesBeingWritten) { putFileBeingWritten(inodeId, out); - getLeaseRenewer().put(this); + LeaseRenewer renewer = getLeaseRenewer(); + boolean result = renewer.put(this); + if (!result) { + // Existing LeaseRenewer cannot add another Daemon, so remove existing + // and add new one. + LeaseRenewer.remove(renewer); + renewer = getLeaseRenewer(); + renewer.put(this); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java index d108af987cc..6b4c8994544 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -79,6 +80,8 @@ public class LeaseRenewer { private static long leaseRenewerGraceDefault = 60*1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; + private AtomicBoolean isLSRunning = new AtomicBoolean(false); + /** Get a {@link LeaseRenewer} instance */ public static LeaseRenewer getInstance(final String authority, final UserGroupInformation ugi, final DFSClient dfsc) { @@ -87,6 +90,15 @@ public static LeaseRenewer getInstance(final String authority, return r; } + /** + * Remove the given renewer from the Factory. + * Subsequent call will receive new {@link LeaseRenewer} instance. + * @param renewer Instance to be cleared from Factory + */ + public static void remove(LeaseRenewer renewer) { + Factory.INSTANCE.remove(renewer); + } + /** * A factory for sharing {@link LeaseRenewer} objects * among {@link DFSClient} instances @@ -156,6 +168,9 @@ private synchronized void remove(final LeaseRenewer r) { final LeaseRenewer stored = renewers.get(r.factorykey); //Since a renewer may expire, the stored renewer can be different. if (r == stored) { + // Expire LeaseRenewer daemon thread as soon as possible. + r.clearClients(); + r.setEmptyTime(0); renewers.remove(r.factorykey); } } @@ -241,6 +256,10 @@ private synchronized void addClient(final DFSClient dfsc) { } } + private synchronized void clearClients() { + dfsclients.clear(); + } + private synchronized boolean clientsRunning() { for(Iterator i = dfsclients.iterator(); i.hasNext(); ) { if (!i.next().isClientRunning()) { @@ -292,11 +311,18 @@ private synchronized boolean isRenewerExpired() { && Time.monotonicNow() - emptyTime > gracePeriod; } - public synchronized void put(final DFSClient dfsc) { + public synchronized boolean put(final DFSClient dfsc) { if (dfsc.isClientRunning()) { if (!isRunning() || isRenewerExpired()) { - //start a new deamon with a new id. + // Start a new daemon with a new id. final int id = ++currentId; + if (isLSRunning.get()) { + // Not allowed to add multiple daemons into LeaseRenewer, let client + // create new LR and continue to acquire lease. + return false; + } + isLSRunning.getAndSet(true); + daemon = new Daemon(new Runnable() { @Override public void run() { @@ -328,6 +354,7 @@ public String toString() { } emptyTime = Long.MAX_VALUE; } + return true; } @VisibleForTesting @@ -426,9 +453,6 @@ private void run(final int id) throws InterruptedException { synchronized (this) { DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout(); dfsclientsCopy = new ArrayList<>(dfsclients); - dfsclients.clear(); - //Expire the current LeaseRenewer thread. - emptyTime = 0; Factory.INSTANCE.remove(LeaseRenewer.this); } for (DFSClient dfsClient : dfsclientsCopy) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java index 1ffec85e02b..f1a11edeefc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java @@ -31,7 +31,11 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import static org.junit.Assert.assertSame; @@ -168,6 +172,11 @@ public Boolean get() { renewer.closeClient(mockClient1); renewer.closeClient(mockClient2); + renewer.closeClient(MOCK_DFSCLIENT); + + // Make sure renewer is not running due to expiration. + Thread.sleep(FAST_GRACE_PERIOD * 2); + Assert.assertTrue(!renewer.isRunning()); } @Test @@ -197,4 +206,82 @@ public void testThreadName() throws Exception { Assert.assertFalse(renewer.isRunning()); } + /** + * Test for HDFS-14575. In this fix, the LeaseRenewer clears all clients + * and expires immediately via setting empty time to 0 before it's removed + * from factory. Previously, LeaseRenewer#daemon thread might leak. + */ + @Test + public void testDaemonThreadLeak() throws Exception { + Assert.assertFalse("Renewer not initially running", renewer.isRunning()); + + // Pretend to create a file#1, daemon#1 starts + renewer.put(MOCK_DFSCLIENT); + Assert.assertTrue("Renewer should have started running", + renewer.isRunning()); + Pattern daemonThreadNamePattern = Pattern.compile("LeaseRenewer:\\S+"); + Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern)); + + // Pretend to create file#2, daemon#2 starts due to expiration + LeaseRenewer lastRenewer = renewer; + renewer = + LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + Assert.assertEquals(lastRenewer, renewer); + + // Pretend to close file#1 + renewer.closeClient(MOCK_DFSCLIENT); + Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern)); + + // Pretend to be expired + renewer.setEmptyTime(0); + + renewer = + LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD); + boolean success = renewer.put(MOCK_DFSCLIENT); + if (!success) { + LeaseRenewer.remove(renewer); + renewer = + LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD); + renewer.put(MOCK_DFSCLIENT); + } + + int threadCount = countThreadMatching(daemonThreadNamePattern); + //Sometimes old LR#Daemon gets closed and lead to count 1 (rare scenario) + Assert.assertTrue(1 == threadCount || 2 == threadCount); + + // After grace period, both daemon#1 and renewer#1 will be removed due to + // expiration, then daemon#2 will leak before HDFS-14575. + Thread.sleep(FAST_GRACE_PERIOD * 2); + + // Pretend to close file#2, renewer#2 will be created + lastRenewer = renewer; + renewer = + LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + Assert.assertEquals(lastRenewer, renewer); + renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD); + renewer.closeClient(MOCK_DFSCLIENT); + renewer.setEmptyTime(0); + // Make sure LeaseRenewer#daemon threads will terminate after grace period + Thread.sleep(FAST_GRACE_PERIOD * 2); + Assert.assertEquals("LeaseRenewer#daemon thread leaks", 0, + countThreadMatching(daemonThreadNamePattern)); + } + + private static int countThreadMatching(Pattern pattern) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] infos = + threadBean.getThreadInfo(threadBean.getAllThreadIds(), 1); + int count = 0; + for (ThreadInfo info : infos) { + if (info == null) { + continue; + } + if (pattern.matcher(info.getThreadName()).matches()) { + count++; + } + } + return count; + } }