|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
|
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
|
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
|
@ -528,77 +529,111 @@ public class TestCacheDirectives {
|
|
|
|
|
|
|
|
|
|
@Test(timeout=60000)
|
|
|
|
|
public void testCacheManagerRestart() throws Exception {
|
|
|
|
|
// Create and validate a pool
|
|
|
|
|
final String pool = "poolparty";
|
|
|
|
|
String groupName = "partygroup";
|
|
|
|
|
FsPermission mode = new FsPermission((short)0777);
|
|
|
|
|
long limit = 747;
|
|
|
|
|
dfs.addCachePool(new CachePoolInfo(pool)
|
|
|
|
|
.setGroupName(groupName)
|
|
|
|
|
.setMode(mode)
|
|
|
|
|
.setLimit(limit));
|
|
|
|
|
RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
|
|
|
|
|
assertTrue("No cache pools found", pit.hasNext());
|
|
|
|
|
CachePoolInfo info = pit.next().getInfo();
|
|
|
|
|
assertEquals(pool, info.getPoolName());
|
|
|
|
|
assertEquals(groupName, info.getGroupName());
|
|
|
|
|
assertEquals(mode, info.getMode());
|
|
|
|
|
assertEquals(limit, (long)info.getLimit());
|
|
|
|
|
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
|
|
|
|
SecondaryNameNode secondary = null;
|
|
|
|
|
try {
|
|
|
|
|
// Start a secondary namenode
|
|
|
|
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
|
|
|
|
|
"0.0.0.0:0");
|
|
|
|
|
secondary = new SecondaryNameNode(conf);
|
|
|
|
|
|
|
|
|
|
// Create some cache entries
|
|
|
|
|
int numEntries = 10;
|
|
|
|
|
String entryPrefix = "/party-";
|
|
|
|
|
long prevId = -1;
|
|
|
|
|
final Date expiry = new Date();
|
|
|
|
|
for (int i=0; i<numEntries; i++) {
|
|
|
|
|
prevId = dfs.addCacheDirective(
|
|
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
|
|
setPath(new Path(entryPrefix + i)).setPool(pool).
|
|
|
|
|
setExpiration(
|
|
|
|
|
CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
|
|
|
|
|
build());
|
|
|
|
|
}
|
|
|
|
|
RemoteIterator<CacheDirectiveEntry> dit
|
|
|
|
|
= dfs.listCacheDirectives(null);
|
|
|
|
|
for (int i=0; i<numEntries; i++) {
|
|
|
|
|
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
|
|
|
|
CacheDirectiveInfo cd = dit.next().getInfo();
|
|
|
|
|
assertEquals(i+1, cd.getId().longValue());
|
|
|
|
|
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
|
|
|
|
assertEquals(pool, cd.getPool());
|
|
|
|
|
}
|
|
|
|
|
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
|
|
|
|
|
|
|
|
|
// Restart namenode
|
|
|
|
|
cluster.restartNameNode();
|
|
|
|
|
|
|
|
|
|
// Check that state came back up
|
|
|
|
|
pit = dfs.listCachePools();
|
|
|
|
|
assertTrue("No cache pools found", pit.hasNext());
|
|
|
|
|
info = pit.next().getInfo();
|
|
|
|
|
assertEquals(pool, info.getPoolName());
|
|
|
|
|
assertEquals(pool, info.getPoolName());
|
|
|
|
|
assertEquals(groupName, info.getGroupName());
|
|
|
|
|
assertEquals(mode, info.getMode());
|
|
|
|
|
assertEquals(limit, (long)info.getLimit());
|
|
|
|
|
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
|
|
|
|
|
|
|
|
|
dit = dfs.listCacheDirectives(null);
|
|
|
|
|
for (int i=0; i<numEntries; i++) {
|
|
|
|
|
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
|
|
|
|
CacheDirectiveInfo cd = dit.next().getInfo();
|
|
|
|
|
assertEquals(i+1, cd.getId().longValue());
|
|
|
|
|
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
|
|
|
|
assertEquals(pool, cd.getPool());
|
|
|
|
|
assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
|
|
|
|
|
}
|
|
|
|
|
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
|
|
|
|
// Create and validate a pool
|
|
|
|
|
final String pool = "poolparty";
|
|
|
|
|
String groupName = "partygroup";
|
|
|
|
|
FsPermission mode = new FsPermission((short)0777);
|
|
|
|
|
long limit = 747;
|
|
|
|
|
dfs.addCachePool(new CachePoolInfo(pool)
|
|
|
|
|
.setGroupName(groupName)
|
|
|
|
|
.setMode(mode)
|
|
|
|
|
.setLimit(limit));
|
|
|
|
|
RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
|
|
|
|
|
assertTrue("No cache pools found", pit.hasNext());
|
|
|
|
|
CachePoolInfo info = pit.next().getInfo();
|
|
|
|
|
assertEquals(pool, info.getPoolName());
|
|
|
|
|
assertEquals(groupName, info.getGroupName());
|
|
|
|
|
assertEquals(mode, info.getMode());
|
|
|
|
|
assertEquals(limit, (long)info.getLimit());
|
|
|
|
|
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
|
|
|
|
|
|
|
|
|
// Create some cache entries
|
|
|
|
|
int numEntries = 10;
|
|
|
|
|
String entryPrefix = "/party-";
|
|
|
|
|
long prevId = -1;
|
|
|
|
|
final Date expiry = new Date();
|
|
|
|
|
for (int i=0; i<numEntries; i++) {
|
|
|
|
|
prevId = dfs.addCacheDirective(
|
|
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
|
|
setPath(new Path(entryPrefix + i)).setPool(pool).
|
|
|
|
|
setExpiration(
|
|
|
|
|
CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
|
|
|
|
|
build());
|
|
|
|
|
}
|
|
|
|
|
RemoteIterator<CacheDirectiveEntry> dit
|
|
|
|
|
= dfs.listCacheDirectives(null);
|
|
|
|
|
for (int i=0; i<numEntries; i++) {
|
|
|
|
|
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
|
|
|
|
CacheDirectiveInfo cd = dit.next().getInfo();
|
|
|
|
|
assertEquals(i+1, cd.getId().longValue());
|
|
|
|
|
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
|
|
|
|
assertEquals(pool, cd.getPool());
|
|
|
|
|
}
|
|
|
|
|
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
|
|
|
|
|
|
|
|
|
// Checkpoint once to set some cache pools and directives on 2NN side
|
|
|
|
|
secondary.doCheckpoint();
|
|
|
|
|
|
|
|
|
|
// Add some more CacheManager state
|
|
|
|
|
final String imagePool = "imagePool";
|
|
|
|
|
dfs.addCachePool(new CachePoolInfo(imagePool));
|
|
|
|
|
prevId = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
|
|
|
|
|
.setPath(new Path("/image")).setPool(imagePool).build());
|
|
|
|
|
|
|
|
|
|
long nextId = dfs.addCacheDirective(
|
|
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
|
|
setPath(new Path("/foobar")).setPool(pool).build());
|
|
|
|
|
assertEquals(prevId + 1, nextId);
|
|
|
|
|
// Save a new image to force a fresh fsimage download
|
|
|
|
|
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
|
|
|
dfs.saveNamespace();
|
|
|
|
|
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
|
|
|
|
|
|
|
|
|
// Checkpoint again forcing a reload of FSN state
|
|
|
|
|
boolean fetchImage = secondary.doCheckpoint();
|
|
|
|
|
assertTrue("Secondary should have fetched a new fsimage from NameNode",
|
|
|
|
|
fetchImage);
|
|
|
|
|
|
|
|
|
|
// Remove temp pool and directive
|
|
|
|
|
dfs.removeCachePool(imagePool);
|
|
|
|
|
|
|
|
|
|
// Restart namenode
|
|
|
|
|
cluster.restartNameNode();
|
|
|
|
|
|
|
|
|
|
// Check that state came back up
|
|
|
|
|
pit = dfs.listCachePools();
|
|
|
|
|
assertTrue("No cache pools found", pit.hasNext());
|
|
|
|
|
info = pit.next().getInfo();
|
|
|
|
|
assertEquals(pool, info.getPoolName());
|
|
|
|
|
assertEquals(pool, info.getPoolName());
|
|
|
|
|
assertEquals(groupName, info.getGroupName());
|
|
|
|
|
assertEquals(mode, info.getMode());
|
|
|
|
|
assertEquals(limit, (long)info.getLimit());
|
|
|
|
|
assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
|
|
|
|
|
|
|
|
|
dit = dfs.listCacheDirectives(null);
|
|
|
|
|
for (int i=0; i<numEntries; i++) {
|
|
|
|
|
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
|
|
|
|
CacheDirectiveInfo cd = dit.next().getInfo();
|
|
|
|
|
assertEquals(i+1, cd.getId().longValue());
|
|
|
|
|
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
|
|
|
|
assertEquals(pool, cd.getPool());
|
|
|
|
|
assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
|
|
|
|
|
}
|
|
|
|
|
assertFalse("Unexpected # of cache directives found", dit.hasNext());
|
|
|
|
|
|
|
|
|
|
long nextId = dfs.addCacheDirective(
|
|
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
|
|
setPath(new Path("/foobar")).setPool(pool).build());
|
|
|
|
|
assertEquals(prevId + 1, nextId);
|
|
|
|
|
} finally {
|
|
|
|
|
if (secondary != null) {
|
|
|
|
|
secondary.shutdown();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|