diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 421a675ef55..703faf84bba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -217,6 +217,9 @@ Release 0.23.2 - UNRELEASED HDFS-3012. Exception while renewing delegation token. (Bobby Evans via jitendra) + HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the + lease soft-limit. (Kihwal Lee via szetszwo) + Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 672c55f72ed..1e36522a9a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -18,6 +18,31 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; + import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.DataInputStream; @@ -59,7 +84,6 @@ import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -83,6 +107,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; @@ -105,7 +130,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -127,6 +151,7 @@ public class DFSClient implements java.io.Closeable { private final InetSocketAddress nnAddress; final UserGroupInformation ugi; volatile boolean clientRunning = true; + volatile long lastLeaseRenewal; private volatile FsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; final String clientName; @@ -351,6 +376,12 @@ public class DFSClient implements java.io.Closeable { void putFileBeingWritten(final String src, final DFSOutputStream out) { synchronized(filesBeingWritten) { filesBeingWritten.put(src, out); + // update the last lease renewal time only when there was no + // writes. once there is one write stream open, the lease renewer + // thread keeps it updated well with in anyone's expiration time. + if (lastLeaseRenewal == 0) { + updateLastLeaseRenewal(); + } } } @@ -358,6 +389,9 @@ public class DFSClient implements java.io.Closeable { void removeFileBeingWritten(final String src) { synchronized(filesBeingWritten) { filesBeingWritten.remove(src); + if (filesBeingWritten.isEmpty()) { + lastLeaseRenewal = 0; + } } } @@ -373,6 +407,19 @@ public class DFSClient implements java.io.Closeable { return clientRunning; } + long getLastLeaseRenewal() { + return lastLeaseRenewal; + } + + void updateLastLeaseRenewal() { + synchronized(filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return; + } + lastLeaseRenewal = System.currentTimeMillis(); + } + } + /** * Renew leases. * @return true if lease was renewed. May return false if this @@ -380,8 +427,24 @@ public class DFSClient implements java.io.Closeable { **/ boolean renewLease() throws IOException { if (clientRunning && !isFilesBeingWrittenEmpty()) { - namenode.renewLease(clientName); - return true; + try { + namenode.renewLease(clientName); + updateLastLeaseRenewal(); + return true; + } catch (IOException e) { + // Abort if the lease has already expired. + final long elapsed = System.currentTimeMillis() - getLastLeaseRenewal(); + if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) { + LOG.warn("Failed to renew lease for " + clientName + " for " + + (elapsed/1000) + " seconds (>= soft-limit =" + + (HdfsConstants.LEASE_SOFTLIMIT_PERIOD/1000) + " seconds.) " + + "Closing all files being written ...", e); + closeAllFilesBeingWritten(true); + } else { + // Let the lease renewer handle it and retry. + throw e; + } + } } return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java index 862be0c184d..471f7342e4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java @@ -430,7 +430,8 @@ class LeaseRenewer { for(long lastRenewed = System.currentTimeMillis(); clientsRunning() && !Thread.interrupted(); Thread.sleep(getSleepPeriod())) { - if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) { + final long elapsed = System.currentTimeMillis() - lastRenewed; + if (elapsed >= getRenewalTime()) { try { renew(); if (LOG.isDebugEnabled()) { @@ -440,7 +441,7 @@ class LeaseRenewer { lastRenewed = System.currentTimeMillis(); } catch (SocketTimeoutException ie) { LOG.warn("Failed to renew lease for " + clientsString() + " for " - + (getRenewalTime()/1000) + " seconds. Aborting ...", ie); + + (elapsed/1000) + " seconds. Aborting ...", ie); synchronized (this) { for(DFSClient c : dfsclients) { c.abort(); @@ -449,8 +450,7 @@ class LeaseRenewer { break; } catch (IOException ie) { LOG.warn("Failed to renew lease for " + clientsString() + " for " - + (getRenewalTime()/1000) + " seconds. Will retry shortly ...", - ie); + + (elapsed/1000) + " seconds. Will retry shortly ...", ie); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index a90c9d2ef3a..aa41ef0563d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -18,29 +18,113 @@ package org.apache.hadoop.hdfs; import java.io.DataOutputStream; +import java.io.IOException; import java.security.PrivilegedExceptionAction; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.doNothing; public class TestLease { static boolean hasLease(MiniDFSCluster cluster, Path src) { return NameNodeAdapter.getLeaseManager(cluster.getNamesystem() ).getLeaseByPath(src.toString()) != null; } - - final Path dir = new Path("/test/lease/"); + + static final String dirString = "/test/lease"; + final Path dir = new Path(dirString); + static final Log LOG = LogFactory.getLog(TestLease.class); + Configuration conf = new HdfsConfiguration(); + + @Test + public void testLeaseAbort() throws Exception { + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + try { + cluster.waitActive(); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); + + DFSClient dfs = new DFSClient(null, spyNN, conf, null); + byte[] buf = new byte[1024]; + + FSDataOutputStream c_out = createFsOut(dfs, dirString + "c"); + c_out.write(buf, 0, 1024); + c_out.close(); + + DFSInputStream c_in = dfs.open(dirString + "c"); + FSDataOutputStream d_out = createFsOut(dfs, dirString + "d"); + + // stub the renew method. + doThrow(new RemoteException(InvalidToken.class.getName(), + "Your token is worthless")).when(spyNN).renewLease(anyString()); + + // We don't need to wait the lease renewer thread to act. + // call renewLease() manually. + // make it look like lease has already expired. + dfs.lastLeaseRenewal = System.currentTimeMillis() - 300000; + dfs.renewLease(); + + // this should not work. + try { + d_out.write(buf, 0, 1024); + d_out.close(); + Assert.fail("Write did not fail even after the fatal lease renewal failure"); + } catch (IOException e) { + LOG.info("Write failed as expected. ", e); + } + + // unstub + doNothing().when(spyNN).renewLease(anyString()); + + // existing input streams should work + try { + int num = c_in.read(buf, 0, 1); + if (num != 1) { + Assert.fail("Failed to read 1 byte"); + } + c_in.close(); + } catch (IOException e) { + LOG.error("Read failed with ", e); + Assert.fail("Read after lease renewal failure failed"); + } + + // new file writes should work. + try { + c_out = createFsOut(dfs, dirString + "c"); + c_out.write(buf, 0, 1024); + c_out.close(); + } catch (IOException e) { + LOG.error("Write failed with ", e); + Assert.fail("Write failed"); + } + } finally { + cluster.shutdown(); + } + } @Test public void testLease() throws Exception { - Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); try { FileSystem fs = cluster.getFileSystem(); @@ -94,6 +178,11 @@ public class TestLease { Assert.assertTrue(c3.leaserenewer != c5.leaserenewer); } + private FSDataOutputStream createFsOut(DFSClient dfs, String path) + throws IOException { + return new FSDataOutputStream(dfs.create(path, true), null); + } + static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class); static public DFSClient createDFSClientAs(UserGroupInformation ugi, final Configuration conf) throws Exception {