HDFS-5845. SecondaryNameNode dies when checkpointing with cache pools.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e0bbcb688
commit
ec9c6aaac8
@ -540,6 +540,9 @@ Release 2.3.0 - UNRELEASED
|
||||
HDFS-5721. sharedEditsImage in Namenode#initializeSharedEdits() should be
|
||||
closed before method returns (Ted Yu via todd)
|
||||
|
||||
HDFS-5845. SecondaryNameNode dies when checkpointing with cache pools.
|
||||
(wang)
|
||||
|
||||
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-4985. Add storage type to the protocol and expose it in block report
|
||||
|
@ -193,6 +193,17 @@ public final class CacheManager {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets all tracked directives and pools. Called during 2NN checkpointing to
|
||||
* reset FSNamesystem state. See {FSNamesystem{@link #clear()}.
|
||||
*/
|
||||
void clear() {
|
||||
directivesById.clear();
|
||||
directivesByPath.clear();
|
||||
cachePools.clear();
|
||||
nextDirectiveId = 1;
|
||||
}
|
||||
|
||||
public void startMonitorThread() {
|
||||
crmLock.lock();
|
||||
try {
|
||||
|
@ -521,6 +521,7 @@ void clear() {
|
||||
leaseManager.removeAllLeases();
|
||||
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
|
||||
snapshotManager.clearSnapshottableDirs();
|
||||
cacheManager.clear();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -1001,7 +1001,12 @@ static void doMerge(
|
||||
sig.mostRecentCheckpointTxId + " even though it should have " +
|
||||
"just been downloaded");
|
||||
}
|
||||
dstImage.reloadFromImageFile(file, dstNamesystem);
|
||||
dstNamesystem.writeLock();
|
||||
try {
|
||||
dstImage.reloadFromImageFile(file, dstNamesystem);
|
||||
} finally {
|
||||
dstNamesystem.writeUnlock();
|
||||
}
|
||||
dstNamesystem.dir.imageLoadComplete();
|
||||
}
|
||||
// error simulation code for junit test
|
||||
|
@ -69,6 +69,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
@ -528,77 +529,111 @@ public void testAddRemoveDirectives() throws Exception {
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testCacheManagerRestart() throws Exception {
|
||||
// Create and validate a pool
|
||||
final String pool = "poolparty";
|
||||
String groupName = "partygroup";
|
||||
FsPermission mode = new FsPermission((short)0777);
|
||||
long limit = 747;
|
||||
dfs.addCachePool(new CachePoolInfo(pool)
|
||||
.setGroupName(groupName)
|
||||
.setMode(mode)
|
||||
.setLimit(limit));
|
||||
RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
|
||||
assertTrue("No cache pools found", pit.hasNext());
|
||||
CachePoolInfo info = pit.next().getInfo();
|
||||
assertEquals(pool, info.getPoolName());
|
||||
assertEquals(groupName, info.getGroupName());
|
||||
assertEquals(mode, info.getMode());
|
||||
assertEquals(limit, (long)info.getLimit());
|
||||
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
||||
SecondaryNameNode secondary = null;
|
||||
try {
|
||||
// Start a secondary namenode
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
|
||||
"0.0.0.0:0");
|
||||
secondary = new SecondaryNameNode(conf);
|
||||
|
||||
// Create some cache entries
|
||||
int numEntries = 10;
|
||||
String entryPrefix = "/party-";
|
||||
long prevId = -1;
|
||||
final Date expiry = new Date();
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
prevId = dfs.addCacheDirective(
|
||||
new CacheDirectiveInfo.Builder().
|
||||
setPath(new Path(entryPrefix + i)).setPool(pool).
|
||||
setExpiration(
|
||||
CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
|
||||
build());
|
||||
// Create and validate a pool
|
||||
final String pool = "poolparty";
|
||||
String groupName = "partygroup";
|
||||
FsPermission mode = new FsPermission((short)0777);
|
||||
long limit = 747;
|
||||
dfs.addCachePool(new CachePoolInfo(pool)
|
||||
.setGroupName(groupName)
|
||||
.setMode(mode)
|
||||
.setLimit(limit));
|
||||
RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
|
||||
assertTrue("No cache pools found", pit.hasNext());
|
||||
CachePoolInfo info = pit.next().getInfo();
|
||||
assertEquals(pool, info.getPoolName());
|
||||
assertEquals(groupName, info.getGroupName());
|
||||
assertEquals(mode, info.getMode());
|
||||
assertEquals(limit, (long)info.getLimit());
|
||||
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
||||
|
||||
// Create some cache entries
|
||||
int numEntries = 10;
|
||||
String entryPrefix = "/party-";
|
||||
long prevId = -1;
|
||||
final Date expiry = new Date();
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
prevId = dfs.addCacheDirective(
|
||||
new CacheDirectiveInfo.Builder().
|
||||
setPath(new Path(entryPrefix + i)).setPool(pool).
|
||||
setExpiration(
|
||||
CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
|
||||
build());
|
||||
}
|
||||
RemoteIterator<CacheDirectiveEntry> dit
|
||||
= dfs.listCacheDirectives(null);
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
||||
CacheDirectiveInfo cd = dit.next().getInfo();
|
||||
assertEquals(i+1, cd.getId().longValue());
|
||||
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
||||
assertEquals(pool, cd.getPool());
|
||||
}
|
||||
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
||||
|
||||
// Checkpoint once to set some cache pools and directives on 2NN side
|
||||
secondary.doCheckpoint();
|
||||
|
||||
// Add some more CacheManager state
|
||||
final String imagePool = "imagePool";
|
||||
dfs.addCachePool(new CachePoolInfo(imagePool));
|
||||
prevId = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
|
||||
.setPath(new Path("/image")).setPool(imagePool).build());
|
||||
|
||||
// Save a new image to force a fresh fsimage download
|
||||
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
dfs.saveNamespace();
|
||||
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
|
||||
// Checkpoint again forcing a reload of FSN state
|
||||
boolean fetchImage = secondary.doCheckpoint();
|
||||
assertTrue("Secondary should have fetched a new fsimage from NameNode",
|
||||
fetchImage);
|
||||
|
||||
// Remove temp pool and directive
|
||||
dfs.removeCachePool(imagePool);
|
||||
|
||||
// Restart namenode
|
||||
cluster.restartNameNode();
|
||||
|
||||
// Check that state came back up
|
||||
pit = dfs.listCachePools();
|
||||
assertTrue("No cache pools found", pit.hasNext());
|
||||
info = pit.next().getInfo();
|
||||
assertEquals(pool, info.getPoolName());
|
||||
assertEquals(pool, info.getPoolName());
|
||||
assertEquals(groupName, info.getGroupName());
|
||||
assertEquals(mode, info.getMode());
|
||||
assertEquals(limit, (long)info.getLimit());
|
||||
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
||||
|
||||
dit = dfs.listCacheDirectives(null);
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
||||
CacheDirectiveInfo cd = dit.next().getInfo();
|
||||
assertEquals(i+1, cd.getId().longValue());
|
||||
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
||||
assertEquals(pool, cd.getPool());
|
||||
assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
|
||||
}
|
||||
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
||||
|
||||
long nextId = dfs.addCacheDirective(
|
||||
new CacheDirectiveInfo.Builder().
|
||||
setPath(new Path("/foobar")).setPool(pool).build());
|
||||
assertEquals(prevId + 1, nextId);
|
||||
} finally {
|
||||
if (secondary != null) {
|
||||
secondary.shutdown();
|
||||
}
|
||||
}
|
||||
RemoteIterator<CacheDirectiveEntry> dit
|
||||
= dfs.listCacheDirectives(null);
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
||||
CacheDirectiveInfo cd = dit.next().getInfo();
|
||||
assertEquals(i+1, cd.getId().longValue());
|
||||
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
||||
assertEquals(pool, cd.getPool());
|
||||
}
|
||||
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
||||
|
||||
// Restart namenode
|
||||
cluster.restartNameNode();
|
||||
|
||||
// Check that state came back up
|
||||
pit = dfs.listCachePools();
|
||||
assertTrue("No cache pools found", pit.hasNext());
|
||||
info = pit.next().getInfo();
|
||||
assertEquals(pool, info.getPoolName());
|
||||
assertEquals(pool, info.getPoolName());
|
||||
assertEquals(groupName, info.getGroupName());
|
||||
assertEquals(mode, info.getMode());
|
||||
assertEquals(limit, (long)info.getLimit());
|
||||
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
||||
|
||||
dit = dfs.listCacheDirectives(null);
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
||||
CacheDirectiveInfo cd = dit.next().getInfo();
|
||||
assertEquals(i+1, cd.getId().longValue());
|
||||
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
||||
assertEquals(pool, cd.getPool());
|
||||
assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
|
||||
}
|
||||
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
||||
|
||||
long nextId = dfs.addCacheDirective(
|
||||
new CacheDirectiveInfo.Builder().
|
||||
setPath(new Path("/foobar")).setPool(pool).build());
|
||||
assertEquals(prevId + 1, nextId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1637,7 +1637,7 @@ public void testEditFailureOnFirstCheckpoint() throws IOException {
|
||||
* Test that the secondary namenode correctly deletes temporary edits
|
||||
* on startup.
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
@Test(timeout = 60000)
|
||||
public void testDeleteTemporaryEditsOnStartup() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
SecondaryNameNode secondary = null;
|
||||
|
Loading…
x
Reference in New Issue
Block a user