diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8fdddd7faeb..4c0949872f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3421,17 +3421,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, " internalReleaseLease: Committed blocks are minimally" + " replicated, lease removed, file" + src + " closed."); return true; // closed! - } else if (penultimateBlockMinStorage && lastBlock.getNumBytes() == 0) { - // HDFS-14498 - this is a file with a final block of zero bytes and was - // likely left in this state by a client which exited unexpectedly - pendingFile.removeLastBlock(lastBlock); - finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId(), false); - NameNode.stateChangeLog.warn("BLOCK*" + - " internalReleaseLease: Committed last block is zero bytes with" + - " insufficient replicas. Final block removed, lease removed, file " - + src + " closed."); - return true; } // Cannot close file right now, since some blocks // are not yet minimally replicated. @@ -3439,10 +3428,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // if there are no valid replicas on data-nodes. String message = "DIR* NameSystem.internalReleaseLease: " + "Failed to release lease for file " + src + - ". Committed blocks are waiting to be minimally replicated." + - " Try again later."; + ". Committed blocks are waiting to be minimally replicated."; NameNode.stateChangeLog.warn(message); - throw new AlreadyBeingCreatedException(message); + if (!penultimateBlockMinStorage) { + throw new AlreadyBeingCreatedException(message); + } + // Intentionally fall through to UNDER_RECOVERY so BLOCK_RECOVERY is + // attempted case UNDER_CONSTRUCTION: case UNDER_RECOVERY: BlockUnderConstructionFeature uc = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java index e96968d2800..c044a4c05bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java @@ -17,36 +17,46 @@ */ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.util.EnumSet; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DataChecksum; import org.junit.After; import org.junit.Test; @@ -337,7 +347,13 @@ public class TestLeaseRecovery { String file = "/test/f1"; Path filePath = new Path(file); - createCommittedNotCompleteFile(client, file); + createCommittedNotCompleteFile(client, file, null, 1); + + INodeFile inode = cluster.getNamesystem().getFSDirectory() + .getINode(filePath.toString()).asFile(); + assertTrue(inode.isUnderConstruction()); + assertEquals(1, inode.numBlocks()); + assertNotNull(inode.getLastBlock()); // Ensure a different client cannot append the file try { @@ -347,9 +363,18 @@ public class TestLeaseRecovery { assertTrue(e.getMessage().contains("file lease is currently owned")); } - // Ensure the lease can be recovered on the first try - boolean recovered = client.recoverLease(file); - assertEquals(true, recovered); + // Lease will not be recovered on the first try + assertEquals(false, client.recoverLease(file)); + for (int i=0; i < 10 && !client.recoverLease(file); i++) { + Thread.sleep(1000); + } + assertTrue(client.recoverLease(file)); + + inode = cluster.getNamesystem().getFSDirectory() + .getINode(filePath.toString()).asFile(); + assertTrue(!inode.isUnderConstruction()); + assertEquals(0, inode.numBlocks()); + assertNull(inode.getLastBlock()); // Ensure the recovered file can now be written FSDataOutputStream append = dfs.append(filePath); @@ -381,7 +406,7 @@ public class TestLeaseRecovery { new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); String file = "/test/f1"; - createCommittedNotCompleteFile(client, file); + createCommittedNotCompleteFile(client, file, null, 1); waitLeaseRecovery(cluster); GenericTestUtils.waitFor(() -> { @@ -401,23 +426,167 @@ public class TestLeaseRecovery { } } - private void createCommittedNotCompleteFile(DFSClient client, String file) - throws IOException { + @Test + public void testAbortedRecovery() throws Exception { + Configuration conf = new Configuration(); + DFSClient client = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + client = + new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); + final String file = "/test/f1"; + + HdfsFileStatus stat = client.getNamenode() + .create(file, new FsPermission("777"), client.clientName, + new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), + true, (short) 1, 1024 * 1024 * 128L, + new CryptoProtocolVersion[0], null); + + assertNotNull(NameNodeAdapter.getLeaseHolderForPath( + cluster.getNameNode(), file)); + + // Add a block to the file + ExtendedBlock block = client.getNamenode().addBlock( + file, client.clientName, null, new DatanodeInfo[0], stat.getFileId(), + new String[0], null).getBlock(); + + // update the pipeline to get a new genstamp. + ExtendedBlock updatedBlock = client.getNamenode() + .updateBlockForPipeline(block, client.clientName) + .getBlock(); + // fake that some data was maybe written. commit block sync will + // reconcile. + updatedBlock.setNumBytes(1234); + + // get the stored block and make it look like the DN sent a RBW IBR. + BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockInfo storedBlock = bm.getStoredBlock(block.getLocalBlock()); + BlockUnderConstructionFeature uc = + storedBlock.getUnderConstructionFeature(); + uc.setExpectedLocations(updatedBlock.getLocalBlock(), + uc.getExpectedStorageLocations(), BlockType.CONTIGUOUS); + + // complete the file w/o updatePipeline to simulate client failure. + client.getNamenode().complete(file, client.clientName, block, + stat.getFileId()); + + assertNotNull(NameNodeAdapter.getLeaseHolderForPath( + cluster.getNameNode(), file)); + + cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + String holder = NameNodeAdapter + .getLeaseHolderForPath(cluster.getNameNode(), file); + return holder == null; + } + }, 100, 20000); + // nothing was actually written so the block should be dropped. + assertTrue(storedBlock.isDeleted()); + } finally { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + if (client != null) { + client.close(); + } + } + } + + @Test + public void testLeaseManagerRecoversCommittedLastBlockWithContent() + throws Exception { + Configuration conf = new Configuration(); + DFSClient client = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + client = + new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); + String file = "/test/f2"; + + byte[] bytesToWrite = new byte[1]; + bytesToWrite[0] = 123; + createCommittedNotCompleteFile(client, file, bytesToWrite, 3); + + waitLeaseRecovery(cluster); + + DistributedFileSystem hdfs = cluster.getFileSystem(); + + // Now the least has been recovered, attempt to append the file and then + // ensure the earlier written and newly written data can be read back. + FSDataOutputStream op = null; + try { + op = hdfs.append(new Path(file)); + op.write(23); + } finally { + if (op != null) { + op.close(); + } + } + + FSDataInputStream stream = null; + try { + stream = cluster.getFileSystem().open(new Path(file)); + assertEquals(123, stream.readByte()); + assertEquals(23, stream.readByte()); + } finally { + stream.close(); + } + + // Finally check there are no leases for the file and hence the file is + // closed. + GenericTestUtils.waitFor(() -> { + String holder = NameNodeAdapter + .getLeaseHolderForPath(cluster.getNameNode(), file); + return holder == null; + }, 100, 10000); + + } finally { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + if (client != null) { + client.close(); + } + } + } + + private void createCommittedNotCompleteFile(DFSClient client, String file, + byte[] bytesToWrite, int repFactor) throws IOException { HdfsFileStatus stat = client.getNamenode() - .create(file, new FsPermission("777"), "test client", + .create(file, new FsPermission("777"), client.clientName, new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, 1024 * 1024 * 128L, new CryptoProtocolVersion[0], null); // Add a block to the file LocatedBlock blk = client.getNamenode() - .addBlock(file, "test client", null, + .addBlock(file, client.clientName, null, new DatanodeInfo[0], stat.getFileId(), new String[0], null); - // Without writing anything to the file, or setting up the DN pipeline - // attempt to close the file. This will fail (return false) as the NN will + ExtendedBlock finalBlock = blk.getBlock(); + if (bytesToWrite != null) { + // Here we create a output stream and then abort it so the block gets + // created on the datanode, but we never send the message to tell the DN + // to complete the block. This simulates the client crashing after it + // wrote the data, but before the file gets closed. + DFSOutputStream s = new DFSOutputStream(client, file, stat, + EnumSet.of(CreateFlag.CREATE), null, + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512), + null, true); + s.start(); + s.write(bytesToWrite); + s.hflush(); + finalBlock = s.getBlock(); + s.abort(); + } + // Attempt to close the file. This will fail (return false) as the NN will // be expecting the registered block to be reported from the DNs via IBR, - // but that will never happen, as the pipeline was never established + // but that will never happen, as we either did not write it, or we aborted + // the stream preventing the "close block" message to be sent to the DN. boolean closed = client.getNamenode().complete( - file, "test client", blk.getBlock(), stat.getFileId()); + file, client.clientName, finalBlock, stat.getFileId()); assertEquals(false, closed); }