HDFS-15725. Lease Recovery never completes for a committed block which the DNs never finalize. Contributed by Stephen O'Donnell
(cherry picked from commit 52adda89c4
)
This commit is contained in:
parent
392f69471d
commit
e7df47c45c
|
@ -3421,17 +3421,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
" internalReleaseLease: Committed blocks are minimally" +
|
||||
" replicated, lease removed, file" + src + " closed.");
|
||||
return true; // closed!
|
||||
} else if (penultimateBlockMinStorage && lastBlock.getNumBytes() == 0) {
|
||||
// HDFS-14498 - this is a file with a final block of zero bytes and was
|
||||
// likely left in this state by a client which exited unexpectedly
|
||||
pendingFile.removeLastBlock(lastBlock);
|
||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||
iip.getLatestSnapshotId(), false);
|
||||
NameNode.stateChangeLog.warn("BLOCK*" +
|
||||
" internalReleaseLease: Committed last block is zero bytes with" +
|
||||
" insufficient replicas. Final block removed, lease removed, file "
|
||||
+ src + " closed.");
|
||||
return true;
|
||||
}
|
||||
// Cannot close file right now, since some blocks
|
||||
// are not yet minimally replicated.
|
||||
|
@ -3439,10 +3428,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
// if there are no valid replicas on data-nodes.
|
||||
String message = "DIR* NameSystem.internalReleaseLease: " +
|
||||
"Failed to release lease for file " + src +
|
||||
". Committed blocks are waiting to be minimally replicated." +
|
||||
" Try again later.";
|
||||
". Committed blocks are waiting to be minimally replicated.";
|
||||
NameNode.stateChangeLog.warn(message);
|
||||
throw new AlreadyBeingCreatedException(message);
|
||||
if (!penultimateBlockMinStorage) {
|
||||
throw new AlreadyBeingCreatedException(message);
|
||||
}
|
||||
// Intentionally fall through to UNDER_RECOVERY so BLOCK_RECOVERY is
|
||||
// attempted
|
||||
case UNDER_CONSTRUCTION:
|
||||
case UNDER_RECOVERY:
|
||||
BlockUnderConstructionFeature uc =
|
||||
|
|
|
@ -17,36 +17,46 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -337,7 +347,13 @@ public class TestLeaseRecovery {
|
|||
String file = "/test/f1";
|
||||
Path filePath = new Path(file);
|
||||
|
||||
createCommittedNotCompleteFile(client, file);
|
||||
createCommittedNotCompleteFile(client, file, null, 1);
|
||||
|
||||
INodeFile inode = cluster.getNamesystem().getFSDirectory()
|
||||
.getINode(filePath.toString()).asFile();
|
||||
assertTrue(inode.isUnderConstruction());
|
||||
assertEquals(1, inode.numBlocks());
|
||||
assertNotNull(inode.getLastBlock());
|
||||
|
||||
// Ensure a different client cannot append the file
|
||||
try {
|
||||
|
@ -347,9 +363,18 @@ public class TestLeaseRecovery {
|
|||
assertTrue(e.getMessage().contains("file lease is currently owned"));
|
||||
}
|
||||
|
||||
// Ensure the lease can be recovered on the first try
|
||||
boolean recovered = client.recoverLease(file);
|
||||
assertEquals(true, recovered);
|
||||
// Lease will not be recovered on the first try
|
||||
assertEquals(false, client.recoverLease(file));
|
||||
for (int i=0; i < 10 && !client.recoverLease(file); i++) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertTrue(client.recoverLease(file));
|
||||
|
||||
inode = cluster.getNamesystem().getFSDirectory()
|
||||
.getINode(filePath.toString()).asFile();
|
||||
assertTrue(!inode.isUnderConstruction());
|
||||
assertEquals(0, inode.numBlocks());
|
||||
assertNull(inode.getLastBlock());
|
||||
|
||||
// Ensure the recovered file can now be written
|
||||
FSDataOutputStream append = dfs.append(filePath);
|
||||
|
@ -381,7 +406,7 @@ public class TestLeaseRecovery {
|
|||
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
||||
String file = "/test/f1";
|
||||
|
||||
createCommittedNotCompleteFile(client, file);
|
||||
createCommittedNotCompleteFile(client, file, null, 1);
|
||||
waitLeaseRecovery(cluster);
|
||||
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
|
@ -401,23 +426,167 @@ public class TestLeaseRecovery {
|
|||
}
|
||||
}
|
||||
|
||||
private void createCommittedNotCompleteFile(DFSClient client, String file)
|
||||
throws IOException {
|
||||
@Test
|
||||
public void testAbortedRecovery() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
DFSClient client = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
client =
|
||||
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
||||
final String file = "/test/f1";
|
||||
|
||||
HdfsFileStatus stat = client.getNamenode()
|
||||
.create(file, new FsPermission("777"), client.clientName,
|
||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
||||
true, (short) 1, 1024 * 1024 * 128L,
|
||||
new CryptoProtocolVersion[0], null);
|
||||
|
||||
assertNotNull(NameNodeAdapter.getLeaseHolderForPath(
|
||||
cluster.getNameNode(), file));
|
||||
|
||||
// Add a block to the file
|
||||
ExtendedBlock block = client.getNamenode().addBlock(
|
||||
file, client.clientName, null, new DatanodeInfo[0], stat.getFileId(),
|
||||
new String[0], null).getBlock();
|
||||
|
||||
// update the pipeline to get a new genstamp.
|
||||
ExtendedBlock updatedBlock = client.getNamenode()
|
||||
.updateBlockForPipeline(block, client.clientName)
|
||||
.getBlock();
|
||||
// fake that some data was maybe written. commit block sync will
|
||||
// reconcile.
|
||||
updatedBlock.setNumBytes(1234);
|
||||
|
||||
// get the stored block and make it look like the DN sent a RBW IBR.
|
||||
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||
BlockInfo storedBlock = bm.getStoredBlock(block.getLocalBlock());
|
||||
BlockUnderConstructionFeature uc =
|
||||
storedBlock.getUnderConstructionFeature();
|
||||
uc.setExpectedLocations(updatedBlock.getLocalBlock(),
|
||||
uc.getExpectedStorageLocations(), BlockType.CONTIGUOUS);
|
||||
|
||||
// complete the file w/o updatePipeline to simulate client failure.
|
||||
client.getNamenode().complete(file, client.clientName, block,
|
||||
stat.getFileId());
|
||||
|
||||
assertNotNull(NameNodeAdapter.getLeaseHolderForPath(
|
||||
cluster.getNameNode(), file));
|
||||
|
||||
cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
String holder = NameNodeAdapter
|
||||
.getLeaseHolderForPath(cluster.getNameNode(), file);
|
||||
return holder == null;
|
||||
}
|
||||
}, 100, 20000);
|
||||
// nothing was actually written so the block should be dropped.
|
||||
assertTrue(storedBlock.isDeleted());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeaseManagerRecoversCommittedLastBlockWithContent()
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
DFSClient client = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
client =
|
||||
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
||||
String file = "/test/f2";
|
||||
|
||||
byte[] bytesToWrite = new byte[1];
|
||||
bytesToWrite[0] = 123;
|
||||
createCommittedNotCompleteFile(client, file, bytesToWrite, 3);
|
||||
|
||||
waitLeaseRecovery(cluster);
|
||||
|
||||
DistributedFileSystem hdfs = cluster.getFileSystem();
|
||||
|
||||
// Now the least has been recovered, attempt to append the file and then
|
||||
// ensure the earlier written and newly written data can be read back.
|
||||
FSDataOutputStream op = null;
|
||||
try {
|
||||
op = hdfs.append(new Path(file));
|
||||
op.write(23);
|
||||
} finally {
|
||||
if (op != null) {
|
||||
op.close();
|
||||
}
|
||||
}
|
||||
|
||||
FSDataInputStream stream = null;
|
||||
try {
|
||||
stream = cluster.getFileSystem().open(new Path(file));
|
||||
assertEquals(123, stream.readByte());
|
||||
assertEquals(23, stream.readByte());
|
||||
} finally {
|
||||
stream.close();
|
||||
}
|
||||
|
||||
// Finally check there are no leases for the file and hence the file is
|
||||
// closed.
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
String holder = NameNodeAdapter
|
||||
.getLeaseHolderForPath(cluster.getNameNode(), file);
|
||||
return holder == null;
|
||||
}, 100, 10000);
|
||||
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createCommittedNotCompleteFile(DFSClient client, String file,
|
||||
byte[] bytesToWrite, int repFactor) throws IOException {
|
||||
HdfsFileStatus stat = client.getNamenode()
|
||||
.create(file, new FsPermission("777"), "test client",
|
||||
.create(file, new FsPermission("777"), client.clientName,
|
||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
||||
true, (short) 1, 1024 * 1024 * 128L,
|
||||
new CryptoProtocolVersion[0], null);
|
||||
// Add a block to the file
|
||||
LocatedBlock blk = client.getNamenode()
|
||||
.addBlock(file, "test client", null,
|
||||
.addBlock(file, client.clientName, null,
|
||||
new DatanodeInfo[0], stat.getFileId(), new String[0], null);
|
||||
// Without writing anything to the file, or setting up the DN pipeline
|
||||
// attempt to close the file. This will fail (return false) as the NN will
|
||||
ExtendedBlock finalBlock = blk.getBlock();
|
||||
if (bytesToWrite != null) {
|
||||
// Here we create a output stream and then abort it so the block gets
|
||||
// created on the datanode, but we never send the message to tell the DN
|
||||
// to complete the block. This simulates the client crashing after it
|
||||
// wrote the data, but before the file gets closed.
|
||||
DFSOutputStream s = new DFSOutputStream(client, file, stat,
|
||||
EnumSet.of(CreateFlag.CREATE), null,
|
||||
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512),
|
||||
null, true);
|
||||
s.start();
|
||||
s.write(bytesToWrite);
|
||||
s.hflush();
|
||||
finalBlock = s.getBlock();
|
||||
s.abort();
|
||||
}
|
||||
// Attempt to close the file. This will fail (return false) as the NN will
|
||||
// be expecting the registered block to be reported from the DNs via IBR,
|
||||
// but that will never happen, as the pipeline was never established
|
||||
// but that will never happen, as we either did not write it, or we aborted
|
||||
// the stream preventing the "close block" message to be sent to the DN.
|
||||
boolean closed = client.getNamenode().complete(
|
||||
file, "test client", blk.getBlock(), stat.getFileId());
|
||||
file, client.clientName, finalBlock, stat.getFileId());
|
||||
assertEquals(false, closed);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue