HDFS-6929. NN periodically unlinks lazy persist files with missing replicas from namespace. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
This commit is contained in:
arp 2014-08-27 15:59:31 -07:00 committed by Jitendra Pandey
parent c2721edc50
commit 15372df48d
3 changed files with 158 additions and 1 deletions

View File

@ -60,6 +60,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROL
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@ -436,6 +438,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
// A daemon to periodically clean up corrupt lazyPersist files
// from the name space.
Daemon lazyPersistFileScrubber = null;
/**
* When an active namenode will roll its own edit log, in # edits
*/
@ -445,6 +451,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
private final int editLogRollerInterval;
/**
* How frequently we scan and unlink corrupt lazyPersist files.
* (In seconds)
*/
private final int lazyPersistFileScrubIntervalSec;
private volatile boolean hasResourcesAvailable = false;
private volatile boolean fsRunning = true;
@ -862,6 +874,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
this.inodeId = new INodeId();
this.lazyPersistFileScrubIntervalSec = conf.getInt(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
if (this.lazyPersistFileScrubIntervalSec == 0) {
throw new IllegalArgumentException(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero.");
}
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
@ -931,7 +952,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
@VisibleForTesting
static RetryCache initRetryCache(Configuration conf) {
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
if (enable) {
float heapPercent = conf.getFloat(
@ -1175,6 +1196,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
editLogRollerThreshold, editLogRollerInterval));
nnEditLogRoller.start();
if (lazyPersistFileScrubIntervalSec > 0) {
lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber(
lazyPersistFileScrubIntervalSec));
lazyPersistFileScrubber.start();
}
cacheManager.startMonitorThread();
blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
} finally {
@ -1228,6 +1255,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
nnEditLogRoller.interrupt();
}
if (lazyPersistFileScrubber != null) {
((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop();
lazyPersistFileScrubber.interrupt();
}
if (dir != null && getFSImage() != null) {
if (getFSImage().editLog != null) {
getFSImage().editLog.close();
@ -2778,6 +2809,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ src + " for client " + clientMachine);
}
INodeFile myFile = INodeFile.valueOf(inode, src, true);
if (myFile.getLazyPersistFlag()) {
throw new UnsupportedOperationException(
"Cannot append to lazy persist file " + src);
}
// Opening an existing file for write - may need to recover lease.
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@ -5111,6 +5147,71 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
/**
* Daemon to periodically scan the namespace for lazyPersist files
* with missing blocks and unlink them.
*/
class LazyPersistFileScrubber implements Runnable {
private volatile boolean shouldRun = true;
final int scrubIntervalSec;
public LazyPersistFileScrubber(final int scrubIntervalSec) {
this.scrubIntervalSec = scrubIntervalSec;
}
/**
* Periodically go over the list of lazyPersist files with missing
* blocks and unlink them from the namespace.
*/
private void clearCorruptLazyPersistFiles()
throws SafeModeException, AccessControlException,
UnresolvedLinkException, IOException {
List<BlockCollection> filesToDelete = new ArrayList<BlockCollection>();
writeLock();
try {
final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator();
while (it.hasNext()) {
Block b = it.next();
BlockInfo blockInfo = blockManager.getStoredBlock(b);
if (blockInfo.getBlockCollection().getLazyPersistFlag()) {
filesToDelete.add(blockInfo.getBlockCollection());
}
}
for (BlockCollection bc : filesToDelete) {
LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
deleteInternal(bc.getName(), false, false, false);
}
} finally {
writeUnlock();
}
}
@Override
public void run() {
while (fsRunning && shouldRun) {
try {
clearCorruptLazyPersistFiles();
Thread.sleep(scrubIntervalSec * 1000);
} catch (InterruptedException e) {
FSNamesystem.LOG.info(
"LazyPersistFileScrubber was interrupted, exiting");
break;
} catch (Exception e) {
FSNamesystem.LOG.error(
"Ignoring exception in LazyPersistFileScrubber:", e);
}
}
}
public void stop() {
shouldRun = false;
}
}
public FSImage getFSImage() {
return fsImage;
}

View File

@ -397,6 +397,16 @@
</description>
</property>
<property>
<name>dfs.namenode.lazypersist.file.scrub.interval.sec</name>
<value>300</value>
<description>
The NameNode periodically scans the namespace for LazyPersist files with
missing blocks and unlinks them from the namespace. This configuration key
controls the interval between successive scans. Set it to a negative value
to disable this behavior.
</description>
</property>
<property>
<name>dfs.block.access.token.enable</name>
<value>false</value>

View File

@ -204,6 +204,52 @@ public class TestLazyPersistFiles {
}
}
/**
* If one or more replicas of a lazyPersist file are lost, then the file
* must be discarded by the NN, instead of being kept around as a
* 'corrupt' file.
*/
@Test (timeout=300000)
public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
new StorageType[] {RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true);
makeTestFile(path2, BLOCK_SIZE, false);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT);
// Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead.
cluster.shutdownDataNodes();
Thread.sleep(30000L);
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
// Next, wait for the replication monitor to mark the file as
// corrupt, plus some delta.
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
// Wait for the LazyPersistFileScrubber to run, plus some delta.
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 does not exist anymore, whereas path2 does.
assert(!fs.exists(path1));
assert(fs.exists(path2));
// We should have only one block that needs replication i.e. the one
// belonging to path2.
assertThat(cluster.getNameNode()
.getNamesystem()
.getBlockManager()
.getUnderReplicatedBlocksCount(),
is(1L));
}
@Test (timeout=300000)
public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException {