From dda8e0e328cace9298ea2ed5468608e02693af55 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Thu, 16 Jul 2015 12:33:57 -0700 Subject: [PATCH] HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write files rather than the entire DFSClient. (mingma) (cherry picked from commit fbd88f1062f3c4b208724d208e3f501eb196dfab) (cherry picked from commit 516bbf1c20547dc513126df0d9f0934bb65c10c7) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (cherry picked from commit fb1bf424bdad20fff7ab390ce75c4bec558e7e6d) --- .../org/apache/hadoop/hdfs/DFSClient.java | 16 +---- .../org/apache/hadoop/hdfs/LeaseRenewer.java | 12 +++- .../hadoop/hdfs/TestDFSClientRetries.java | 66 ++++++++++++++++++- 3 files changed, 76 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ad24a0d1391..20f9d001e7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -903,23 +903,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, void closeConnectionToNamenode() { RPC.stopProxy(namenode); } - - /** Abort and release resources held. Ignore all errors. */ - void abort() { - clientRunning = false; - closeAllFilesBeingWritten(true); - try { - // remove reference to this client and stop the renewer, - // if there is no more clients under the renewer. - getLeaseRenewer().closeClient(this); - } catch (IOException ioe) { - LOG.info("Exception occurred while aborting the client " + ioe); - } - closeConnectionToNamenode(); - } /** Close/abort all files being written. */ - private void closeAllFilesBeingWritten(final boolean abort) { + public void closeAllFilesBeingWritten(final boolean abort) { for(;;) { final long inodeId; final DFSOutputStream out; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java index f8f337c16f7..855b539da8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java @@ -211,6 +211,12 @@ class LeaseRenewer { return renewal; } + /** Used for testing only. */ + @VisibleForTesting + public synchronized void setRenewalTime(final long renewal) { + this.renewal = renewal; + } + /** Add a client. */ private synchronized void addClient(final DFSClient dfsc) { for(DFSClient c : dfsclients) { @@ -450,8 +456,12 @@ class LeaseRenewer { + (elapsed/1000) + " seconds. Aborting ...", ie); synchronized (this) { while (!dfsclients.isEmpty()) { - dfsclients.get(0).abort(); + DFSClient dfsClient = dfsclients.get(0); + dfsClient.closeAllFilesBeingWritten(true); + closeClient(dfsClient); } + //Expire the current LeaseRenewer thread. + emptyTime = 0; } break; } catch (IOException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 382ad4809ef..0a39cb582e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -32,6 +32,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.when; import java.io.FileNotFoundException; @@ -63,6 +64,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.LeaseRenewer; import org.apache.hadoop.hdfs.client.HdfsUtils; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -355,7 +357,59 @@ public class TestDFSClientRetries { cluster.shutdown(); } } - + + /** + * Test DFSClient can continue to function after renewLease RPC + * receives SocketTimeoutException. + */ + @Test + public void testLeaseRenewSocketTimeout() throws Exception + { + String file1 = "/testFile1"; + String file2 = "/testFile2"; + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); + conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc()); + Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease( + Mockito.anyString()); + DFSClient client = new DFSClient(null, spyNN, conf, null); + // Get hold of the lease renewer instance used by the client + LeaseRenewer leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + OutputStream out1 = client.create(file1, false); + + Mockito.verify(spyNN, timeout(10000).times(1)).renewLease( + Mockito.anyString()); + verifyEmptyLease(leaseRenewer); + try { + out1.write(new byte[256]); + fail("existing output stream should be aborted"); + } catch (IOException e) { + } + + // Verify DFSClient can do read operation after renewLease aborted. + client.exists(file2); + // Verify DFSClient can do write operation after renewLease no longer + // throws SocketTimeoutException. + Mockito.doNothing().when(spyNN).renewLease( + Mockito.anyString()); + leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + OutputStream out2 = client.create(file2, false); + Mockito.verify(spyNN, timeout(10000).times(2)).renewLease( + Mockito.anyString()); + out2.write(new byte[256]); + out2.close(); + verifyEmptyLease(leaseRenewer); + } finally { + cluster.shutdown(); + } + } + /** * Test that getAdditionalBlock() and close() are idempotent. This allows * a client to safely retry a call and still produce a correct @@ -670,7 +724,15 @@ public class TestDFSClientRetries { } return ret; } - + + private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception { + int sleepCount = 0; + while (!leaseRenewer.isEmpty() && sleepCount++ < 20) { + Thread.sleep(500); + } + assertTrue("Lease should be empty.", leaseRenewer.isEmpty()); + } + class DFSClientReader implements Runnable { DFSClient client;