HDFS-15764. Notify Namenode missing or new block on disk as soon as possible. Contributed by Yang Yun.

This commit is contained in:
Ayush Saxena 2021-03-28 16:13:59 +05:30
parent 72037a63b1
commit cd2501e54b
6 changed files with 133 additions and 17 deletions

View File

@ -876,6 +876,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; "dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
public static final int public static final int
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1; DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1;
public static final String
DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY =
"dfs.datanode.directoryscan.max.notify.count";
public static final long
DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT = 5;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

View File

@ -459,4 +459,8 @@ public boolean getPmemCacheRecoveryEnabled() {
public long getProcessCommandsThresholdMs() { public long getProcessCommandsThresholdMs() {
return processCommandsThresholdMs; return processCommandsThresholdMs;
} }
public long getBlockReportInterval() {
return blockReportInterval;
}
} }

View File

@ -281,6 +281,11 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final InstrumentedReadWriteLock datasetRWLock; final InstrumentedReadWriteLock datasetRWLock;
private final Condition datasetWriteLockCondition; private final Condition datasetWriteLockCondition;
private static String blockPoolId = ""; private static String blockPoolId = "";
// Make limited notify times from DirectoryScanner to NameNode.
private long maxDirScannerNotifyCount;
private long curDirScannerNotifyCount;
private long lastDirScannerNotifyTime;
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
@ -408,6 +413,10 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
maxDataLength = conf.getInt( maxDataLength = conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
maxDirScannerNotifyCount = conf.getLong(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT);
lastDirScannerNotifyTime = System.currentTimeMillis();
} }
@Override @Override
@ -2609,6 +2618,11 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
Block corruptBlock = null; Block corruptBlock = null;
ReplicaInfo memBlockInfo; ReplicaInfo memBlockInfo;
long startTimeMs = Time.monotonicNow(); long startTimeMs = Time.monotonicNow();
if (startTimeMs - lastDirScannerNotifyTime >
datanode.getDnConf().getBlockReportInterval()) {
curDirScannerNotifyCount = 0;
lastDirScannerNotifyTime = startTimeMs;
}
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId); memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && if (memBlockInfo != null &&
@ -2661,6 +2675,11 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
// Block is in memory and not on the disk // Block is in memory and not on the disk
// Remove the block from volumeMap // Remove the block from volumeMap
volumeMap.remove(bpid, blockId); volumeMap.remove(bpid, blockId);
if (curDirScannerNotifyCount < maxDirScannerNotifyCount) {
curDirScannerNotifyCount++;
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid,
memBlockInfo), memBlockInfo.getStorageUuid());
}
if (vol.isTransientStorage()) { if (vol.isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(bpid, blockId, true); ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
} }
@ -2687,6 +2706,12 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
.setDirectoryToUse(diskFile.getParentFile()) .setDirectoryToUse(diskFile.getParentFile())
.build(); .build();
volumeMap.add(bpid, diskBlockInfo); volumeMap.add(bpid, diskBlockInfo);
if (curDirScannerNotifyCount < maxDirScannerNotifyCount) {
maxDirScannerNotifyCount++;
datanode.notifyNamenodeReceivedBlock(
new ExtendedBlock(bpid, diskBlockInfo), null,
vol.getStorageID(), vol.isTransientStorage());
}
if (vol.isTransientStorage()) { if (vol.isTransientStorage()) {
long lockedBytesReserved = long lockedBytesReserved =
cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ? cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ?

View File

@ -6191,4 +6191,12 @@
accessed or modified before the specified time interval. accessed or modified before the specified time interval.
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.directoryscan.max.notify.count</name>
<value>5</value>
<description>
Defines the maximum number of blocks that the DirectoryScanner may notify
namenode right way for received or deleted blocks after one round.
</description>
</property>
</configuration> </configuration>

View File

@ -249,8 +249,7 @@ private String getMetaFile(long id) {
} }
/** Create a block file in a random volume. */ /** Create a block file in a random volume. */
private long createBlockFile() throws IOException { private long createBlockFile(long id) throws IOException {
long id = getFreeBlockId();
try ( try (
FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size(); int numVolumes = volumes.size();
@ -282,8 +281,7 @@ private long createMetaFile() throws IOException {
} }
/** Create block file and corresponding metafile in a rondom volume. */ /** Create block file and corresponding metafile in a rondom volume. */
private long createBlockMetaFile() throws IOException { private long createBlockMetaFile(long id) throws IOException {
long id = getFreeBlockId();
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) { try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size(); int numVolumes = refs.size();
@ -548,7 +546,7 @@ public void runTest(int parallelism) throws Exception {
// Test4: A block file exists for which there is no metafile and // Test4: A block file exists for which there is no metafile and
// a block in memory // a block in memory
blockId = createBlockFile(); blockId = createBlockFile(blockId);
totalBlocks++; totalBlocks++;
scan(totalBlocks, 1, 1, 0, 1, 0); scan(totalBlocks, 1, 1, 0, 1, 0);
verifyAddition(blockId, HdfsConstants.GRANDFATHER_GENERATION_STAMP, 0); verifyAddition(blockId, HdfsConstants.GRANDFATHER_GENERATION_STAMP, 0);
@ -563,8 +561,12 @@ public void runTest(int parallelism) throws Exception {
scan(totalBlocks, 0, 0, 0, 0, 0); scan(totalBlocks, 0, 0, 0, 0, 0);
// Test6: A block file and metafile exists for which there is no block in // Test6: A block file and metafile exists for which there is no block in
// memory blockId = deleteBlockFile();
blockId = createBlockMetaFile(); scan(totalBlocks, 1, 0, 1, 0, 0);
totalBlocks--;
verifyDeletion(blockId);
blockId = createBlockMetaFile(blockId);
totalBlocks++; totalBlocks++;
scan(totalBlocks, 1, 0, 0, 1, 0); scan(totalBlocks, 1, 0, 0, 1, 0);
verifyAddition(blockId, DEFAULT_GEN_STAMP, 0); verifyAddition(blockId, DEFAULT_GEN_STAMP, 0);
@ -577,9 +579,10 @@ public void runTest(int parallelism) throws Exception {
scan(totalBlocks, 10, 10, 0, 0, 10); scan(totalBlocks, 10, 10, 0, 0, 10);
scan(totalBlocks, 0, 0, 0, 0, 0); scan(totalBlocks, 0, 0, 0, 0, 0);
// Test8: Delete bunch of block files // Test8: Delete bunch of block files and record the ids.
List<Long> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
blockId = deleteBlockFile(); ids.add(deleteBlockFile());
} }
scan(totalBlocks, 10, 0, 10, 0, 0); scan(totalBlocks, 10, 0, 10, 0, 0);
totalBlocks -= 10; totalBlocks -= 10;
@ -587,7 +590,7 @@ public void runTest(int parallelism) throws Exception {
// Test9: create a bunch of blocks files // Test9: create a bunch of blocks files
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
blockId = createBlockFile(); blockId = createBlockFile(ids.get(i));
} }
totalBlocks += 10; totalBlocks += 10;
scan(totalBlocks, 10, 10, 0, 10, 0); scan(totalBlocks, 10, 10, 0, 10, 0);
@ -601,8 +604,15 @@ public void runTest(int parallelism) throws Exception {
scan(totalBlocks, 0, 0, 0, 0, 0); scan(totalBlocks, 0, 0, 0, 0, 0);
// Test11: create a bunch block files and meta files // Test11: create a bunch block files and meta files
ids.clear();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
blockId = createBlockMetaFile(); ids.add(deleteBlockFile());
}
scan(totalBlocks, 10, 0, 10, 0, 0);
totalBlocks -= 10;
for (int i = 0; i < 10; i++) {
blockId = createBlockMetaFile(ids.get(i));
} }
totalBlocks += 10; totalBlocks += 10;
scan(totalBlocks, 10, 0, 0, 10, 0); scan(totalBlocks, 10, 0, 0, 10, 0);
@ -616,9 +626,16 @@ public void runTest(int parallelism) throws Exception {
scan(totalBlocks, 0, 0, 0, 0, 0); scan(totalBlocks, 0, 0, 0, 0, 0);
// Test13: all the conditions combined // Test13: all the conditions combined
long blockId1 = deleteBlockFile();
long blockId2 = deleteBlockFile();
scan(totalBlocks, 2, 0, 2, 0, 0);
totalBlocks -= 2;
verifyDeletion(blockId1);
verifyDeletion(blockId2);
createMetaFile(); createMetaFile();
createBlockFile(); createBlockFile(blockId1);
createBlockMetaFile(); createBlockMetaFile(blockId2);
deleteMetaFile(); deleteMetaFile();
deleteBlockFile(); deleteBlockFile();
truncateBlockFile(); truncateBlockFile();
@ -631,9 +648,6 @@ public void runTest(int parallelism) throws Exception {
assertTrue("Report complier threads logged no execution time", assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L); scanner.timeRunningMs.get() > 0L);
// Test15: validate clean shutdown of DirectoryScanner
//// assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not
// sim
scanner.shutdown(); scanner.shutdown();
assertFalse(scanner.getRunStatus()); assertFalse(scanner.getRunStatus());

View File

@ -22,6 +22,7 @@
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
@ -94,6 +95,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
@ -1745,4 +1747,62 @@ public void testGetMetadataLengthOfFinalizedReplica() throws IOException {
assertTrue(blockDir.delete()); assertTrue(blockDir.delete());
} }
} }
}
@Test
public void testNotifyNamenodeMissingOrNewBlock() throws Exception {
long blockSize = 1024;
int heatbeatInterval = 1;
HdfsConfiguration c = new HdfsConfiguration();
c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heatbeatInterval);
c.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(c).
numDataNodes(1).build();
try {
cluster.waitActive();
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/f1"),
blockSize, (short)1, 0);
String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi fsdataset = dn.getFSDataset();
List<ReplicaInfo> replicaInfos =
fsdataset.getSortedFinalizedBlocks(bpid);
assertEquals(1, replicaInfos.size());
ReplicaInfo replicaInfo = replicaInfos.get(0);
String blockPath = replicaInfo.getBlockURI().getPath();
String metaPath = replicaInfo.getMetadataURI().getPath();
String blockTempPath = blockPath + ".tmp";
String metaTempPath = metaPath + ".tmp";
File blockFile = new File(blockPath);
File blockTempFile = new File(blockTempPath);
File metaFile = new File(metaPath);
File metaTempFile = new File(metaTempPath);
// remove block and meta file of the block
blockFile.renameTo(blockTempFile);
metaFile.renameTo(metaTempFile);
assertFalse(blockFile.exists());
assertFalse(metaFile.exists());
FsVolumeSpi.ScanInfo info = new FsVolumeSpi.ScanInfo(
replicaInfo.getBlockId(), blockFile.getAbsoluteFile(),
metaFile.getAbsoluteFile(), replicaInfo.getVolume());
fsdataset.checkAndUpdate(bpid, info);
BlockManager blockManager = cluster.getNameNode().
getNamesystem().getBlockManager();
GenericTestUtils.waitFor(() ->
blockManager.getLowRedundancyBlocksCount() == 1, 100, 5000);
// move the block and meta file back
blockTempFile.renameTo(blockFile);
metaTempFile.renameTo(metaFile);
fsdataset.checkAndUpdate(bpid, info);
GenericTestUtils.waitFor(() ->
blockManager.getLowRedundancyBlocksCount() == 0, 100, 5000);
} finally {
cluster.shutdown();
}
}
}