HDFS-6929. NN periodically unlinks lazy persist files with missing replicas from namespace. (Arpit Agarwal)
This commit is contained in:
parent
3f64c4aaf0
commit
2e987148e0
|
@ -18,3 +18,7 @@
|
||||||
|
|
||||||
HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
|
HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-6929. NN periodically unlinks lazy persist files with missing
|
||||||
|
replicas from namespace. (Arpit Agarwal)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROL
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||||
|
@ -441,6 +443,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
|
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
|
||||||
|
|
||||||
Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
|
Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
|
||||||
|
|
||||||
|
// A daemon to periodically clean up corrupt lazyPersist files
|
||||||
|
// from the name space.
|
||||||
|
Daemon lazyPersistFileScrubber = null;
|
||||||
/**
|
/**
|
||||||
* When an active namenode will roll its own edit log, in # edits
|
* When an active namenode will roll its own edit log, in # edits
|
||||||
*/
|
*/
|
||||||
|
@ -450,6 +456,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
*/
|
*/
|
||||||
private final int editLogRollerInterval;
|
private final int editLogRollerInterval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How frequently we scan and unlink corrupt lazyPersist files.
|
||||||
|
* (In seconds)
|
||||||
|
*/
|
||||||
|
private final int lazyPersistFileScrubIntervalSec;
|
||||||
|
|
||||||
private volatile boolean hasResourcesAvailable = false;
|
private volatile boolean hasResourcesAvailable = false;
|
||||||
private volatile boolean fsRunning = true;
|
private volatile boolean fsRunning = true;
|
||||||
|
|
||||||
|
@ -857,6 +869,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
|
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
|
||||||
this.inodeId = new INodeId();
|
this.inodeId = new INodeId();
|
||||||
|
|
||||||
|
this.lazyPersistFileScrubIntervalSec = conf.getInt(
|
||||||
|
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
|
||||||
|
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
|
||||||
|
|
||||||
|
if (this.lazyPersistFileScrubIntervalSec == 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero.");
|
||||||
|
}
|
||||||
|
|
||||||
// For testing purposes, allow the DT secret manager to be started regardless
|
// For testing purposes, allow the DT secret manager to be started regardless
|
||||||
// of whether security is enabled.
|
// of whether security is enabled.
|
||||||
alwaysUseDelegationTokensForTests = conf.getBoolean(
|
alwaysUseDelegationTokensForTests = conf.getBoolean(
|
||||||
|
@ -1159,6 +1180,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
editLogRollerThreshold, editLogRollerInterval));
|
editLogRollerThreshold, editLogRollerInterval));
|
||||||
nnEditLogRoller.start();
|
nnEditLogRoller.start();
|
||||||
|
|
||||||
|
if (lazyPersistFileScrubIntervalSec > 0) {
|
||||||
|
lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber(
|
||||||
|
lazyPersistFileScrubIntervalSec));
|
||||||
|
lazyPersistFileScrubber.start();
|
||||||
|
}
|
||||||
|
|
||||||
cacheManager.startMonitorThread();
|
cacheManager.startMonitorThread();
|
||||||
blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
|
blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1211,6 +1238,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
|
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
|
||||||
nnEditLogRoller.interrupt();
|
nnEditLogRoller.interrupt();
|
||||||
}
|
}
|
||||||
|
if (lazyPersistFileScrubber != null) {
|
||||||
|
((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop();
|
||||||
|
lazyPersistFileScrubber.interrupt();
|
||||||
|
}
|
||||||
if (dir != null && getFSImage() != null) {
|
if (dir != null && getFSImage() != null) {
|
||||||
if (getFSImage().editLog != null) {
|
if (getFSImage().editLog != null) {
|
||||||
getFSImage().editLog.close();
|
getFSImage().editLog.close();
|
||||||
|
@ -2683,6 +2714,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
+ src + " for client " + clientMachine);
|
+ src + " for client " + clientMachine);
|
||||||
}
|
}
|
||||||
INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
||||||
|
|
||||||
|
if (myFile.getLazyPersistFlag()) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Cannot append to lazy persist file " + src);
|
||||||
|
}
|
||||||
// Opening an existing file for write - may need to recover lease.
|
// Opening an existing file for write - may need to recover lease.
|
||||||
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
|
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
|
||||||
|
|
||||||
|
@ -5001,6 +5037,71 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Daemon to periodically scan the namespace for lazyPersist files
|
||||||
|
* with missing blocks and unlink them.
|
||||||
|
*/
|
||||||
|
class LazyPersistFileScrubber implements Runnable {
|
||||||
|
private volatile boolean shouldRun = true;
|
||||||
|
final int scrubIntervalSec;
|
||||||
|
public LazyPersistFileScrubber(final int scrubIntervalSec) {
|
||||||
|
this.scrubIntervalSec = scrubIntervalSec;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodically go over the list of lazyPersist files with missing
|
||||||
|
* blocks and unlink them from the namespace.
|
||||||
|
*/
|
||||||
|
private void clearCorruptLazyPersistFiles()
|
||||||
|
throws SafeModeException, AccessControlException,
|
||||||
|
UnresolvedLinkException, IOException {
|
||||||
|
|
||||||
|
List<BlockCollection> filesToDelete = new ArrayList<BlockCollection>();
|
||||||
|
|
||||||
|
writeLock();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator();
|
||||||
|
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Block b = it.next();
|
||||||
|
BlockInfo blockInfo = blockManager.getStoredBlock(b);
|
||||||
|
if (blockInfo.getBlockCollection().getLazyPersistFlag()) {
|
||||||
|
filesToDelete.add(blockInfo.getBlockCollection());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (BlockCollection bc : filesToDelete) {
|
||||||
|
LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
|
||||||
|
deleteInternal(bc.getName(), false, false, false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (fsRunning && shouldRun) {
|
||||||
|
try {
|
||||||
|
clearCorruptLazyPersistFiles();
|
||||||
|
Thread.sleep(scrubIntervalSec * 1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
FSNamesystem.LOG.info(
|
||||||
|
"LazyPersistFileScrubber was interrupted, exiting");
|
||||||
|
break;
|
||||||
|
} catch (Exception e) {
|
||||||
|
FSNamesystem.LOG.error(
|
||||||
|
"Ignoring exception in LazyPersistFileScrubber:", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
shouldRun = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public FSImage getFSImage() {
|
public FSImage getFSImage() {
|
||||||
return fsImage;
|
return fsImage;
|
||||||
}
|
}
|
||||||
|
|
|
@ -397,6 +397,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.lazypersist.file.scrub.interval.sec</name>
|
||||||
|
<value>300</value>
|
||||||
|
<description>
|
||||||
|
The NameNode periodically scans the namespace for LazyPersist files with
|
||||||
|
missing blocks and unlinks them from the namespace. This configuration key
|
||||||
|
controls the interval between successive scans. Set it to a negative value
|
||||||
|
to disable this behavior.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.block.access.token.enable</name>
|
<name>dfs.block.access.token.enable</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
|
|
|
@ -204,6 +204,52 @@ public class TestLazyPersistFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If one or more replicas of a lazyPersist file are lost, then the file
|
||||||
|
* must be discarded by the NN, instead of being kept around as a
|
||||||
|
* 'corrupt' file.
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testLazyPersistFilesAreDiscarded()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR,
|
||||||
|
new StorageType[] {RAM_DISK, DEFAULT },
|
||||||
|
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
|
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||||
|
|
||||||
|
makeTestFile(path1, BLOCK_SIZE, true);
|
||||||
|
makeTestFile(path2, BLOCK_SIZE, false);
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
ensureFileReplicasOnStorageType(path2, DEFAULT);
|
||||||
|
|
||||||
|
// Stop the DataNode and sleep for the time it takes the NN to
|
||||||
|
// detect the DN as being dead.
|
||||||
|
cluster.shutdownDataNodes();
|
||||||
|
Thread.sleep(30000L);
|
||||||
|
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
|
||||||
|
|
||||||
|
// Next, wait for the replication monitor to mark the file as
|
||||||
|
// corrupt, plus some delta.
|
||||||
|
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
|
||||||
|
|
||||||
|
// Wait for the LazyPersistFileScrubber to run, plus some delta.
|
||||||
|
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
// Ensure that path1 does not exist anymore, whereas path2 does.
|
||||||
|
assert(!fs.exists(path1));
|
||||||
|
assert(fs.exists(path2));
|
||||||
|
|
||||||
|
// We should have only one block that needs replication i.e. the one
|
||||||
|
// belonging to path2.
|
||||||
|
assertThat(cluster.getNameNode()
|
||||||
|
.getNamesystem()
|
||||||
|
.getBlockManager()
|
||||||
|
.getUnderReplicatedBlocksCount(),
|
||||||
|
is(1L));
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testLazyPersistBlocksAreSaved()
|
public void testLazyPersistBlocksAreSaved()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue