HDFS-6825. Edit log corruption due to delayed block removal. Contributed by Yongjun Zhang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618684 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
110cf46a07
commit
bd616552cc
|
@ -511,6 +511,9 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu and Colin Patrick McCabe via umamahesh)
|
||||
|
||||
HDFS-6825. Edit log corruption due to delayed block removal.
|
||||
(Yongjun Zhang via wang)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -373,6 +373,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
|
|||
sb.append("{blockUCState=").append(blockUCState)
|
||||
.append(", primaryNodeIndex=").append(primaryNodeIndex)
|
||||
.append(", replicas=[");
|
||||
if (replicas != null) {
|
||||
Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
|
||||
if (iter.hasNext()) {
|
||||
iter.next().appendStringTo(sb);
|
||||
|
@ -381,6 +382,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
|
|||
iter.next().appendStringTo(sb);
|
||||
}
|
||||
}
|
||||
}
|
||||
sb.append("]}");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4300,7 +4300,30 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throw new IOException("Block (=" + lastblock + ") not found");
|
||||
}
|
||||
}
|
||||
INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
|
||||
//
|
||||
// The implementation of delete operation (see @deleteInternal method)
|
||||
// first removes the file paths from namespace, and delays the removal
|
||||
// of blocks to later time for better performance. When
|
||||
// commitBlockSynchronization (this method) is called in between, the
|
||||
// blockCollection of storedBlock could have been assigned to null by
|
||||
// the delete operation, throw IOException here instead of NPE; if the
|
||||
// file path is already removed from namespace by the delete operation,
|
||||
// throw FileNotFoundException here, so not to proceed to the end of
|
||||
// this method to add a CloseOp to the edit log for an already deleted
|
||||
// file (See HDFS-6825).
|
||||
//
|
||||
BlockCollection blockCollection = storedBlock.getBlockCollection();
|
||||
if (blockCollection == null) {
|
||||
throw new IOException("The blockCollection of " + storedBlock
|
||||
+ " is null, likely because the file owning this block was"
|
||||
+ " deleted and the block removal is delayed");
|
||||
}
|
||||
INodeFile iFile = ((INode)blockCollection).asFile();
|
||||
if (isFileDeleted(iFile)) {
|
||||
throw new FileNotFoundException("File not found: "
|
||||
+ iFile.getFullPathName() + ", likely due to delayed block"
|
||||
+ " removal");
|
||||
}
|
||||
if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unexpected block (=" + lastblock
|
||||
|
@ -6297,9 +6320,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
private boolean isFileDeleted(INodeFile file) {
|
||||
// Not in the inodeMap or in the snapshot but marked deleted.
|
||||
if (dir.getInode(file.getId()) == null ||
|
||||
file.getParent() == null || (file.isWithSnapshot() &&
|
||||
file.getFileWithSnapshotFeature().isCurrentFileDeleted())) {
|
||||
if (dir.getInode(file.getId()) == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// look at the path hierarchy to see if one parent is deleted by recursive
|
||||
// deletion
|
||||
INode tmpChild = file;
|
||||
INodeDirectory tmpParent = file.getParent();
|
||||
while (true) {
|
||||
if (tmpParent == null ||
|
||||
tmpParent.searchChildren(tmpChild.getLocalNameBytes()) < 0) {
|
||||
return true;
|
||||
}
|
||||
if (tmpParent.isRoot()) {
|
||||
break;
|
||||
}
|
||||
tmpChild = tmpParent;
|
||||
tmpParent = tmpParent.getParent();
|
||||
}
|
||||
|
||||
if (file.isWithSnapshot() &&
|
||||
file.getFileWithSnapshotFeature().isCurrentFileDeleted()) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -157,7 +157,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
|
|||
return quota;
|
||||
}
|
||||
|
||||
private int searchChildren(byte[] name) {
|
||||
int searchChildren(byte[] name) {
|
||||
return children == null? -1: Collections.binarySearch(children, name);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,9 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
|
@ -1300,4 +1303,33 @@ public class DFSTestUtil {
|
|||
sockDir.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the node which is expected to run the recovery of the
|
||||
* given block, which is known to be under construction inside the
|
||||
* given NameNOde.
|
||||
*/
|
||||
public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
|
||||
ExtendedBlock blk) {
|
||||
BlockManager bm0 = nn.getNamesystem().getBlockManager();
|
||||
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
|
||||
assertTrue("Block " + blk + " should be under construction, " +
|
||||
"got: " + storedBlock,
|
||||
storedBlock instanceof BlockInfoUnderConstruction);
|
||||
BlockInfoUnderConstruction ucBlock =
|
||||
(BlockInfoUnderConstruction)storedBlock;
|
||||
// We expect that the replica with the most recent heart beat will be
|
||||
// the one to be in charge of the synchronization / recovery protocol.
|
||||
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
|
||||
DatanodeStorageInfo expectedPrimary = storages[0];
|
||||
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
|
||||
for (int i = 1; i < storages.length; i++) {
|
||||
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
|
||||
if (lastUpdate > mostRecentLastUpdate) {
|
||||
expectedPrimary = storages[i];
|
||||
mostRecentLastUpdate = lastUpdate;
|
||||
}
|
||||
}
|
||||
return expectedPrimary.getDatanodeDescriptor();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,17 @@ public class TestCommitBlockSynchronization {
|
|||
|
||||
FSNamesystem namesystem = new FSNamesystem(conf, image);
|
||||
namesystem.setImageLoaded(true);
|
||||
|
||||
// set file's parent as root and put the file to inodeMap, so
|
||||
// FSNamesystem's isFileDeleted() method will return false on this file
|
||||
if (file.getParent() == null) {
|
||||
INodeDirectory parent = mock(INodeDirectory.class);
|
||||
parent.setLocalName(new byte[0]);
|
||||
parent.addChild(file);
|
||||
file.setParent(parent);
|
||||
}
|
||||
namesystem.dir.getINodeMap().put(file);
|
||||
|
||||
FSNamesystem namesystemSpy = spy(namesystem);
|
||||
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
|
||||
block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -27,19 +29,30 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
|
||||
|
@ -49,6 +62,7 @@ import org.mockito.internal.util.reflection.Whitebox;
|
|||
* whole duration.
|
||||
*/
|
||||
public class TestDeleteRace {
|
||||
private static final int BLOCK_SIZE = 4096;
|
||||
private static final Log LOG = LogFactory.getLog(TestDeleteRace.class);
|
||||
private static final Configuration conf = new HdfsConfiguration();
|
||||
private MiniDFSCluster cluster;
|
||||
|
@ -201,7 +215,126 @@ public class TestDeleteRace {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test race between delete operation and commitBlockSynchronization method.
|
||||
* See HDFS-6825.
|
||||
* @param hasSnapshot
|
||||
* @throws Exception
|
||||
*/
|
||||
private void testDeleteAndCommitBlockSynchronizationRace(boolean hasSnapshot)
|
||||
throws Exception {
|
||||
LOG.info("Start testing, hasSnapshot: " + hasSnapshot);
|
||||
final String testPaths[] = {
|
||||
"/test-file",
|
||||
"/testdir/testdir1/test-file"
|
||||
};
|
||||
final Path rootPath = new Path("/");
|
||||
final Configuration conf = new Configuration();
|
||||
// Disable permissions so that another user can recover the lease.
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
FSDataOutputStream stm = null;
|
||||
Map<DataNode, DatanodeProtocolClientSideTranslatorPB> dnMap =
|
||||
new HashMap<DataNode, DatanodeProtocolClientSideTranslatorPB>();
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(3)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
int stId = 0;
|
||||
for (String testPath : testPaths) {
|
||||
LOG.info("test on " + testPath + " snapshot: " + hasSnapshot);
|
||||
Path fPath = new Path(testPath);
|
||||
//find grandest non-root parent
|
||||
Path grandestNonRootParent = fPath;
|
||||
while (!grandestNonRootParent.getParent().equals(rootPath)) {
|
||||
grandestNonRootParent = grandestNonRootParent.getParent();
|
||||
}
|
||||
stm = fs.create(fPath);
|
||||
LOG.info("test on " + testPath + " created " + fPath);
|
||||
|
||||
// write a half block
|
||||
AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2);
|
||||
stm.hflush();
|
||||
|
||||
if (hasSnapshot) {
|
||||
SnapshotTestHelper.createSnapshot(fs, rootPath,
|
||||
"st" + String.valueOf(stId));
|
||||
++stId;
|
||||
}
|
||||
|
||||
// Look into the block manager on the active node for the block
|
||||
// under construction.
|
||||
NameNode nn = cluster.getNameNode();
|
||||
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, fPath);
|
||||
DatanodeDescriptor expectedPrimary =
|
||||
DFSTestUtil.getExpectedPrimaryNode(nn, blk);
|
||||
LOG.info("Expecting block recovery to be triggered on DN " +
|
||||
expectedPrimary);
|
||||
|
||||
// Find the corresponding DN daemon, and spy on its connection to the
|
||||
// active.
|
||||
DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
|
||||
DatanodeProtocolClientSideTranslatorPB nnSpy = dnMap.get(primaryDN);
|
||||
if (nnSpy == null) {
|
||||
nnSpy = DataNodeTestUtils.spyOnBposToNN(primaryDN, nn);
|
||||
dnMap.put(primaryDN, nnSpy);
|
||||
}
|
||||
|
||||
// Delay the commitBlockSynchronization call
|
||||
DelayAnswer delayer = new DelayAnswer(LOG);
|
||||
Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization(
|
||||
Mockito.eq(blk),
|
||||
Mockito.anyInt(), // new genstamp
|
||||
Mockito.anyLong(), // new length
|
||||
Mockito.eq(true), // close file
|
||||
Mockito.eq(false), // delete block
|
||||
(DatanodeID[]) Mockito.anyObject(), // new targets
|
||||
(String[]) Mockito.anyObject()); // new target storages
|
||||
|
||||
fs.recoverLease(fPath);
|
||||
|
||||
LOG.info("Waiting for commitBlockSynchronization call from primary");
|
||||
delayer.waitForCall();
|
||||
|
||||
LOG.info("Deleting recursively " + grandestNonRootParent);
|
||||
fs.delete(grandestNonRootParent, true);
|
||||
|
||||
delayer.proceed();
|
||||
LOG.info("Now wait for result");
|
||||
delayer.waitForResult();
|
||||
Throwable t = delayer.getThrown();
|
||||
if (t != null) {
|
||||
LOG.info("Result exception (snapshot: " + hasSnapshot + "): " + t);
|
||||
}
|
||||
} // end of loop each fPath
|
||||
LOG.info("Now check we can restart");
|
||||
cluster.restartNameNodes();
|
||||
LOG.info("Restart finished");
|
||||
} finally {
|
||||
if (stm != null) {
|
||||
IOUtils.closeStream(stm);
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=600000)
|
||||
public void testDeleteAndCommitBlockSynchonizationRaceNoSnapshot()
|
||||
throws Exception {
|
||||
testDeleteAndCommitBlockSynchronizationRace(false);
|
||||
}
|
||||
|
||||
@Test(timeout=600000)
|
||||
public void testDeleteAndCommitBlockSynchronizationRaceHasSnapshot()
|
||||
throws Exception {
|
||||
testDeleteAndCommitBlockSynchronizationRace(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -356,7 +356,8 @@ public class TestPipelinesFailover {
|
|||
|
||||
NameNode nn0 = cluster.getNameNode(0);
|
||||
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
|
||||
DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk);
|
||||
DatanodeDescriptor expectedPrimary =
|
||||
DFSTestUtil.getExpectedPrimaryNode(nn0, blk);
|
||||
LOG.info("Expecting block recovery to be triggered on DN " +
|
||||
expectedPrimary);
|
||||
|
||||
|
@ -506,37 +507,6 @@ public class TestPipelinesFailover {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @return the node which is expected to run the recovery of the
|
||||
* given block, which is known to be under construction inside the
|
||||
* given NameNOde.
|
||||
*/
|
||||
private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
|
||||
ExtendedBlock blk) {
|
||||
BlockManager bm0 = nn.getNamesystem().getBlockManager();
|
||||
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
|
||||
assertTrue("Block " + blk + " should be under construction, " +
|
||||
"got: " + storedBlock,
|
||||
storedBlock instanceof BlockInfoUnderConstruction);
|
||||
BlockInfoUnderConstruction ucBlock =
|
||||
(BlockInfoUnderConstruction)storedBlock;
|
||||
// We expect that the replica with the most recent heart beat will be
|
||||
// the one to be in charge of the synchronization / recovery protocol.
|
||||
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
|
||||
DatanodeStorageInfo expectedPrimary = storages[0];
|
||||
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
|
||||
for (int i = 1; i < storages.length; i++) {
|
||||
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
|
||||
if (lastUpdate > mostRecentLastUpdate) {
|
||||
expectedPrimary = storages[i];
|
||||
mostRecentLastUpdate = lastUpdate;
|
||||
}
|
||||
}
|
||||
return expectedPrimary.getDatanodeDescriptor();
|
||||
}
|
||||
|
||||
private DistributedFileSystem createFsAsOtherUser(
|
||||
final MiniDFSCluster cluster, final Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
|
|
Loading…
Reference in New Issue