diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d22f592b686..d90ec700c27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -13,6 +13,9 @@ Release 2.7.2 - UNRELEASED HADOOP-5323. Trash documentation should describe its directory structure and configurations. (Weiwei Yang via ozawa) + HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write + files rather than the entire DFSClient. (mingma) + OPTIMIZATIONS HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 87e34cdc211..e6638738565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -916,23 +916,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, void closeConnectionToNamenode() { RPC.stopProxy(namenode); } - - /** Abort and release resources held. Ignore all errors. */ - void abort() { - clientRunning = false; - closeAllFilesBeingWritten(true); - try { - // remove reference to this client and stop the renewer, - // if there is no more clients under the renewer. - getLeaseRenewer().closeClient(this); - } catch (IOException ioe) { - LOG.info("Exception occurred while aborting the client " + ioe); - } - closeConnectionToNamenode(); - } /** Close/abort all files being written. */ - private void closeAllFilesBeingWritten(final boolean abort) { + public void closeAllFilesBeingWritten(final boolean abort) { for(;;) { final long inodeId; final DFSOutputStream out; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java index e76750196e2..b60f7e7693d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java @@ -211,6 +211,12 @@ class LeaseRenewer { return renewal; } + /** Used for testing only. */ + @VisibleForTesting + public synchronized void setRenewalTime(final long renewal) { + this.renewal = renewal; + } + /** Add a client. */ private synchronized void addClient(final DFSClient dfsc) { for(DFSClient c : dfsclients) { @@ -450,8 +456,12 @@ class LeaseRenewer { + (elapsed/1000) + " seconds. Aborting ...", ie); synchronized (this) { while (!dfsclients.isEmpty()) { - dfsclients.get(0).abort(); + DFSClient dfsClient = dfsclients.get(0); + dfsClient.closeAllFilesBeingWritten(true); + closeClient(dfsClient); } + //Expire the current LeaseRenewer thread. + emptyTime = 0; } break; } catch (IOException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 382ad4809ef..0a39cb582e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -32,6 +32,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.when; import java.io.FileNotFoundException; @@ -63,6 +64,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.LeaseRenewer; import org.apache.hadoop.hdfs.client.HdfsUtils; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -355,7 +357,59 @@ public class TestDFSClientRetries { cluster.shutdown(); } } - + + /** + * Test DFSClient can continue to function after renewLease RPC + * receives SocketTimeoutException. + */ + @Test + public void testLeaseRenewSocketTimeout() throws Exception + { + String file1 = "/testFile1"; + String file2 = "/testFile2"; + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); + conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc()); + Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease( + Mockito.anyString()); + DFSClient client = new DFSClient(null, spyNN, conf, null); + // Get hold of the lease renewer instance used by the client + LeaseRenewer leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + OutputStream out1 = client.create(file1, false); + + Mockito.verify(spyNN, timeout(10000).times(1)).renewLease( + Mockito.anyString()); + verifyEmptyLease(leaseRenewer); + try { + out1.write(new byte[256]); + fail("existing output stream should be aborted"); + } catch (IOException e) { + } + + // Verify DFSClient can do read operation after renewLease aborted. + client.exists(file2); + // Verify DFSClient can do write operation after renewLease no longer + // throws SocketTimeoutException. + Mockito.doNothing().when(spyNN).renewLease( + Mockito.anyString()); + leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + OutputStream out2 = client.create(file2, false); + Mockito.verify(spyNN, timeout(10000).times(2)).renewLease( + Mockito.anyString()); + out2.write(new byte[256]); + out2.close(); + verifyEmptyLease(leaseRenewer); + } finally { + cluster.shutdown(); + } + } + /** * Test that getAdditionalBlock() and close() are idempotent. This allows * a client to safely retry a call and still produce a correct @@ -670,7 +724,15 @@ public class TestDFSClientRetries { } return ret; } - + + private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception { + int sleepCount = 0; + while (!leaseRenewer.isEmpty() && sleepCount++ < 20) { + Thread.sleep(500); + } + assertTrue("Lease should be empty.", leaseRenewer.isEmpty()); + } + class DFSClientReader implements Runnable { DFSClient client;