HDFS-14941. Potential editlog race condition can cause corrupted file. Contributed by Chen Liang and Konstantin Shvachko.

This commit is contained in:
Chen Liang 2019-11-06 09:56:19 -08:00 committed by Konstantin V Shvachko
parent c59b1b66a5
commit db887f66d8
9 changed files with 285 additions and 5 deletions

View File

@ -45,6 +45,19 @@ public abstract class SequentialNumber {
currentValue.set(value);
}
public boolean setIfGreater(long value) {
while(true) {
long local = currentValue.get();
if(value <= local) {
return false; // swap failed
}
if(currentValue.compareAndSet(local, value)) {
return true; // swap successful
}
// keep trying
}
}
/** Increment and then return the next value. */
public long nextValue() {
return currentValue.incrementAndGet();

View File

@ -41,6 +41,23 @@ public class BlockIdManager {
* The global generation stamp for this file system.
*/
private final GenerationStamp generationStamp = new GenerationStamp();
/**
* Most recent global generation stamp as seen on Active NameNode.
* Used by StandbyNode only.<p/>
* StandbyNode does not update its global {@link #generationStamp} during
* edits tailing. The global generation stamp on StandbyNode is updated
* <ol><li>when the block with the next generation stamp is actually
* received</li>
* <li>during fail-over it is bumped to the last value received from the
* Active NN through edits and stored as
* {@link #impendingGenerationStamp}</li></ol>
* The former helps to avoid a race condition with IBRs during edits tailing.
* The latter guarantees that generation stamps are never reused by new
* Active after fail-over.
* <p/> See HDFS-14941 for more details.
*/
private final GenerationStamp impendingGenerationStamp
= new GenerationStamp();
/**
* The value of the generation stamp when the first switch to sequential
* block IDs was made. Blocks with generation stamps below this value
@ -138,6 +155,35 @@ public class BlockIdManager {
generationStamp.setCurrentValue(stamp);
}
/**
* Set the currently highest gen stamp from active. Used
* by Standby only.
* @param stamp new genstamp
*/
public void setImpendingGenerationStamp(long stamp) {
impendingGenerationStamp.setIfGreater(stamp);
}
/**
* Set the current genstamp to the impending genstamp.
*/
public void applyImpendingGenerationStamp() {
setGenerationStampIfGreater(impendingGenerationStamp.getCurrentValue());
}
@VisibleForTesting
public long getImpendingGenerationStamp() {
return impendingGenerationStamp.getCurrentValue();
}
/**
* Set genstamp only when the given one is higher.
* @param stamp
*/
public void setGenerationStampIfGreater(long stamp) {
generationStamp.setIfGreater(stamp);
}
public long getGenerationStamp() {
return generationStamp.getCurrentValue();
}

View File

@ -3911,7 +3911,9 @@ public class BlockManager implements BlockStatsMXBean {
public BlockInfo addBlockCollection(BlockInfo block,
BlockCollection bc) {
return blocksMap.addBlockCollection(block, bc);
BlockInfo blockInfo = blocksMap.addBlockCollection(block, bc);
blockIdManager.setGenerationStampIfGreater(block.getGenerationStamp());
return blockInfo;
}
BlockCollection getBlockCollection(BlockInfo b) {

View File

@ -698,7 +698,7 @@ class FSDirWriteFileOp {
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private static void saveAllocatedBlock(
static void saveAllocatedBlock(
FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
DatanodeStorageInfo[] targets)
throws IOException {

View File

@ -826,8 +826,10 @@ public class FSEditLogLoader {
}
case OP_SET_GENSTAMP_V2: {
SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
blockManager.getBlockIdManager().setGenerationStamp(
setGenstampV2Op.genStampV2);
// update the impending gen stamp, but not the actual genstamp,
// see HDFS-14941
blockManager.getBlockIdManager()
.setImpendingGenerationStamp(setGenstampV2Op.genStampV2);
break;
}
case OP_ALLOCATE_BLOCK_ID: {

View File

@ -1788,7 +1788,15 @@ public abstract class FSEditLogOp {
}
}
/** Similar with {@link SetGenstampV1Op} */
/**
* This operation does not actually update gen stamp immediately,
* the new gen stamp is recorded as impending gen stamp.
* The global generation stamp on Standby Node is updated when
* the block with the next generation stamp is actually received.
* We keep logging this operation for backward compatibility.
* The impending gen stamp will take effect when the standby
* transition to become an active.
*/
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;

View File

@ -1198,6 +1198,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
blockManager.getBlockIdManager().applyImpendingGenerationStamp();
// Only need to re-process the queue, If not in SafeMode.
if (!isInSafeMode()) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.spy;
@ -31,12 +32,14 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
@ -193,6 +196,47 @@ public class NameNodeAdapter {
return fsn.getStats();
}
public static long getGenerationStamp(final FSNamesystem fsn)
throws IOException {
return fsn.getBlockManager().getBlockIdManager().getGenerationStamp();
}
public static long getImpendingGenerationStamp(final FSNamesystem fsn) {
return fsn.getBlockManager().getBlockIdManager()
.getImpendingGenerationStamp();
}
public static BlockInfo addBlockNoJournal(final FSNamesystem fsn,
final String src, final DatanodeStorageInfo[] targets)
throws IOException {
fsn.writeLock();
try {
INodeFile file = (INodeFile)fsn.getFSDirectory().getINode(src);
Block newBlock = fsn.createNewBlock();
INodesInPath inodesInPath = INodesInPath.fromINode(file);
FSDirWriteFileOp.saveAllocatedBlock(
fsn, src, inodesInPath, newBlock, targets);
return file.getLastBlock();
} finally {
fsn.writeUnlock();
}
}
public static void persistBlocks(final FSNamesystem fsn,
final String src, final INodeFile file) throws IOException {
fsn.writeLock();
try {
FSDirWriteFileOp.persistBlocks(fsn.getFSDirectory(), src, file, true);
} finally {
fsn.writeUnlock();
}
}
public static BlockInfo getStoredBlock(final FSNamesystem fsn,
final Block b) {
return fsn.getStoredBlock(b);
}
public static FSNamesystem spyOnNamesystem(NameNode nn) {
FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
FSNamesystem fsnOld = nn.namesystem;

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests the race condition that IBR and add block may result
* in inconsistent block genstamp.
*/
public class TestAddBlockTailing {
private static final int BLOCK_SIZE = 8192;
private static final String TEST_DIR = "/TestAddBlockTailing";
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static FSNamesystem fsn0;
private static FSNamesystem fsn1;
private static DataNode dn0;
@BeforeClass
public static void startUpCluster() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2);
qjmBuilder.getDfsBuilder().numDataNodes(1);
qjmhaCluster = qjmBuilder.build();
dfsCluster = qjmhaCluster.getDfsCluster();
dfsCluster.waitActive();
dfsCluster.transitionToActive(0);
dfs = dfsCluster.getFileSystem(0);
fsn0 = dfsCluster.getNameNode(0).getNamesystem();
fsn1 = dfsCluster.getNameNode(1).getNamesystem();
dfs.mkdirs(new Path(TEST_DIR), new FsPermission("755"));
dn0 = dfsCluster.getDataNodes().get(0);
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test
public void testStandbyAddBlockIBRRace() throws Exception {
String testFile = TEST_DIR +"/testStandbyAddBlockIBRRace";
// initial global generation stamp check
assertEquals("Global Generation stamps on NNs should be the same",
NameNodeAdapter.getGenerationStamp(fsn0),
NameNodeAdapter.getGenerationStamp(fsn1));
// create a file, add a block on NN0
// do not journal addBlock yet
dfs.create(new Path(testFile), true, dfs.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) 1, BLOCK_SIZE);
DatanodeManager dnManager = fsn0.getBlockManager().getDatanodeManager();
DatanodeStorageInfo[] targets =
dnManager.getDatanode(dn0.getDatanodeId()).getStorageInfos();
targets = new DatanodeStorageInfo[] {targets[0]};
BlockInfo newBlock = NameNodeAdapter.addBlockNoJournal(
fsn0, testFile, targets);
// NN1 tails increment generation stamp transaction
fsn0.getEditLog().logSync();
fsn1.getEditLogTailer().doTailEdits();
assertEquals("Global Generation stamps on NN0 and "
+ "impending on NN1 should be equal",
NameNodeAdapter.getGenerationStamp(fsn0),
NameNodeAdapter.getImpendingGenerationStamp(fsn1));
// NN1 processes IBR with the replica
StorageReceivedDeletedBlocks[] report = DFSTestUtil
.makeReportForReceivedBlock(newBlock,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
dn0.getFSDataset().getStorage(targets[0].getStorageID()));
fsn1.processIncrementalBlockReport(dn0.getDatanodeId(), report[0]);
// NN0 persists the block, i.e adds update block transaction
INodeFile file = (INodeFile)fsn0.getFSDirectory().getINode(testFile);
NameNodeAdapter.persistBlocks(fsn0, testFile, file);
// NN1 tails update block transaction
fsn0.getEditLog().logSync();
fsn1.getEditLogTailer().doTailEdits();
assertEquals("Global Generation stamps on NN0 and "
+ "impending on NN1 should be equal",
NameNodeAdapter.getGenerationStamp(fsn0),
NameNodeAdapter.getImpendingGenerationStamp(fsn1));
// The new block on NN1 should have the replica
BlockInfo newBlock1 = NameNodeAdapter.getStoredBlock(fsn1, newBlock);
assertTrue("New block on NN1 should contain the replica",
newBlock1.numNodes() > 0);
assertEquals("Generation stamps of the block on NNs should be the same",
newBlock.getGenerationStamp(), newBlock1.getGenerationStamp());
assertEquals("Global Generation stamps on NNs should be the same",
NameNodeAdapter.getGenerationStamp(fsn0),
NameNodeAdapter.getGenerationStamp(fsn1));
// Check that the generation stamp restores on Standby after failover
ClientProtocol rpc0 = dfsCluster.getNameNode(0).getRpcServer();
ClientProtocol rpc1 = dfsCluster.getNameNode(1).getRpcServer();
LocatedBlock lb = rpc0.getBlockLocations(testFile, 0, 0).get(0);
rpc0.updateBlockForPipeline(lb.getBlock(), dfs.getClient().getClientName());
long gs0 = NameNodeAdapter.getGenerationStamp(fsn0);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
assertEquals("Global Generation stamps on new active should be "
+ "the same as on the old one", gs0,
NameNodeAdapter.getGenerationStamp(fsn1));
rpc1.delete(testFile, false);
}
}