diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 75128aef28e..3fd822018fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -484,12 +484,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, /** Get a lease and start automatic renewal */ private void beginFileLease(final long inodeId, final DFSOutputStream out) throws IOException { - getLeaseRenewer().put(inodeId, out, this); + synchronized (filesBeingWritten) { + putFileBeingWritten(inodeId, out); + getLeaseRenewer().put(this); + } } /** Stop renewal of lease for the file. */ void endFileLease(final long inodeId) { - getLeaseRenewer().closeFile(inodeId, this); + synchronized (filesBeingWritten) { + removeFileBeingWritten(inodeId); + // remove client from renewer if no files are open + if (filesBeingWritten.isEmpty()) { + getLeaseRenewer().closeClient(this); + } + } } @@ -615,9 +624,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, @Override public synchronized void close() throws IOException { if(clientRunning) { + // lease renewal stops when all files are closed closeAllFilesBeingWritten(false); clientRunning = false; - getLeaseRenewer().closeClient(this); // close connections to the namenode closeConnectionToNamenode(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index b58cf16a323..d36c0581c21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -63,4 +63,6 @@ public class DFSClientFaultInjector { } public void sleepBeforeHedgedGet() {} + + public void delayWhenRenewLeaseTimeout() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java index 6faf133f83a..e33d024e853 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -30,7 +30,7 @@ import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.DFSClientFaultInjector; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; @@ -76,7 +76,7 @@ import org.slf4j.LoggerFactory; public class LeaseRenewer { static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); - static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; + private static long leaseRenewerGraceDefault = 60*1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; /** Get a {@link LeaseRenewer} instance */ @@ -156,9 +156,7 @@ public class LeaseRenewer { final LeaseRenewer stored = renewers.get(r.factorykey); //Since a renewer may expire, the stored renewer can be different. if (r == stored) { - if (!r.clientsRunning()) { - renewers.remove(r.factorykey); - } + renewers.remove(r.factorykey); } } } @@ -201,7 +199,7 @@ public class LeaseRenewer { private LeaseRenewer(Factory.Key factorykey) { this.factorykey = factorykey; - unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); + unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault); if (LOG.isTraceEnabled()) { instantiationTrace = StringUtils.stringifyException( @@ -293,8 +291,7 @@ public class LeaseRenewer { && Time.monotonicNow() - emptyTime > gracePeriod; } - public synchronized void put(final long inodeId, final DFSOutputStream out, - final DFSClient dfsc) { + public synchronized void put(final DFSClient dfsc) { if (dfsc.isClientRunning()) { if (!isRunning() || isRenewerExpired()) { //start a new deamon with a new id. @@ -328,7 +325,6 @@ public class LeaseRenewer { }); daemon.start(); } - dfsc.putFileBeingWritten(inodeId, out); emptyTime = Long.MAX_VALUE; } } @@ -338,28 +334,6 @@ public class LeaseRenewer { emptyTime = time; } - /** Close a file. */ - public void closeFile(final long inodeId, final DFSClient dfsc) { - dfsc.removeFileBeingWritten(inodeId); - - synchronized(this) { - if (dfsc.isFilesBeingWrittenEmpty()) { - dfsclients.remove(dfsc); - } - //update emptyTime if necessary - if (emptyTime == Long.MAX_VALUE) { - for(DFSClient c : dfsclients) { - if (!c.isFilesBeingWrittenEmpty()) { - //found a non-empty file-being-written map - return; - } - } - //discover the first time that all file-being-written maps are empty. - emptyTime = Time.monotonicNow(); - } - } - } - /** Close the given client. */ public synchronized void closeClient(final DFSClient dfsc) { dfsclients.remove(dfsc); @@ -447,14 +421,17 @@ public class LeaseRenewer { } catch (SocketTimeoutException ie) { LOG.warn("Failed to renew lease for " + clientsString() + " for " + (elapsed/1000) + " seconds. Aborting ...", ie); + List dfsclientsCopy; synchronized (this) { - while (!dfsclients.isEmpty()) { - DFSClient dfsClient = dfsclients.get(0); - dfsClient.closeAllFilesBeingWritten(true); - closeClient(dfsClient); - } + DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout(); + dfsclientsCopy = new ArrayList<>(dfsclients); + dfsclients.clear(); //Expire the current LeaseRenewer thread. emptyTime = 0; + Factory.INSTANCE.remove(LeaseRenewer.this); + } + for (DFSClient dfsClient : dfsclientsCopy) { + dfsClient.closeAllFilesBeingWritten(true); } break; } catch (IOException ie) { @@ -511,4 +488,10 @@ public class LeaseRenewer { return b.append("]").toString(); } } + + @VisibleForTesting + public static void setLeaseRenewerGraceDefault( + long leaseRenewerGraceDefault) { + LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java index eb10e965424..f73ea6d2471 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java @@ -109,7 +109,7 @@ public class TestLeaseRenewer { // Set up a file so that we start renewing our lease. DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); long fileId = 123L; - renewer.put(fileId, mockStream, MOCK_DFSCLIENT); + renewer.put(MOCK_DFSCLIENT); // Wait for lease to get renewed long failTime = Time.monotonicNow() + 5000; @@ -121,7 +121,7 @@ public class TestLeaseRenewer { Assert.fail("Did not renew lease at all!"); } - renewer.closeFile(fileId, MOCK_DFSCLIENT); + renewer.closeClient(MOCK_DFSCLIENT); } /** @@ -136,11 +136,8 @@ public class TestLeaseRenewer { Mockito.doReturn(false).when(mockClient1).renewLease(); assertSame(renewer, LeaseRenewer.getInstance( FAKE_AUTHORITY, FAKE_UGI_A, mockClient1)); - - // Set up a file so that we start renewing our lease. - DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class); long fileId = 456L; - renewer.put(fileId, mockStream1, mockClient1); + renewer.put(mockClient1); // Second DFSClient does renew lease final DFSClient mockClient2 = createMockClient(); @@ -148,9 +145,7 @@ public class TestLeaseRenewer { assertSame(renewer, LeaseRenewer.getInstance( FAKE_AUTHORITY, FAKE_UGI_A, mockClient2)); - // Set up a file so that we start renewing our lease. - DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class); - renewer.put(fileId, mockStream2, mockClient2); + renewer.put(mockClient2); // Wait for lease to get renewed @@ -171,19 +166,17 @@ public class TestLeaseRenewer { } }, 100, 10000); - renewer.closeFile(fileId, mockClient1); - renewer.closeFile(fileId, mockClient2); + renewer.closeClient(mockClient1); + renewer.closeClient(mockClient2); } @Test public void testThreadName() throws Exception { - DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); - long fileId = 789L; Assert.assertFalse("Renewer not initially running", renewer.isRunning()); // Pretend to open a file - renewer.put(fileId, mockStream, MOCK_DFSCLIENT); + renewer.put(MOCK_DFSCLIENT); Assert.assertTrue("Renewer should have started running", renewer.isRunning()); @@ -193,7 +186,7 @@ public class TestLeaseRenewer { Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName); // Pretend to close the file - renewer.closeFile(fileId, MOCK_DFSCLIENT); + renewer.closeClient(MOCK_DFSCLIENT); renewer.setEmptyTime(Time.monotonicNow()); // Should stop the renewer running within a few seconds diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index cf939431f4f..e9985b9de7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -47,6 +47,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -1236,4 +1237,83 @@ public class TestDFSClientRetries { cluster.shutdown(); } } + + @Test(timeout=120000) + public void testLeaseRenewAndDFSOutputStreamDeadLock() throws Exception { + final CountDownLatch testLatch = new CountDownLatch(1); + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + public void delayWhenRenewLeaseTimeout() { + try { + testLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + String file1 = "/testFile1"; + // Set short retry timeouts so this test runs faster + conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + final NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc()); + + doAnswer(new SleepFixedTimeAnswer(1500, testLatch)).when(spyNN).complete( + anyString(), anyString(), any(ExtendedBlock.class), anyLong()); + DFSClient client = new DFSClient(null, spyNN, conf, null); + // Get hold of the lease renewer instance used by the client + LeaseRenewer leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + final OutputStream out1 = client.create(file1, false); + + out1.write(new byte[256]); + + Thread closeThread = new Thread(new Runnable() { + @Override public void run() { + try { + //1. trigger get LeaseRenewer lock + Mockito.doThrow(new SocketTimeoutException()).when(spyNN) + .renewLease(Mockito.anyString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + closeThread.start(); + + //2. trigger get DFSOutputStream lock + out1.close(); + + } finally { + cluster.shutdown(); + } + } + + private static class SleepFixedTimeAnswer implements Answer { + private final int sleepTime; + private final CountDownLatch testLatch; + + SleepFixedTimeAnswer(int sleepTime, CountDownLatch latch) { + this.sleepTime = sleepTime; + this.testLatch = latch; + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + boolean interrupted = false; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + interrupted = true; + } + try { + return invocation.callRealMethod(); + } finally { + testLatch.countDown(); + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 0cff7d491f9..d92a67dfdcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -290,6 +290,7 @@ public class TestDistributedFileSystem { Configuration conf = getTestConfiguration(); final long grace = 1000L; MiniDFSCluster cluster = null; + LeaseRenewer.setLeaseRenewerGraceDefault(grace); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); @@ -302,10 +303,6 @@ public class TestDistributedFileSystem { { final DistributedFileSystem dfs = cluster.getFileSystem(); - Method setMethod = dfs.dfs.getLeaseRenewer().getClass() - .getDeclaredMethod("setGraceSleepPeriod", long.class); - setMethod.setAccessible(true); - setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace); Method checkMethod = dfs.dfs.getLeaseRenewer().getClass() .getDeclaredMethod("isRunning"); checkMethod.setAccessible(true);