HDFS-5775. Consolidate the code for serialization in CacheManager. Contributed by Haohui Mai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1558599 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2014-01-15 23:15:24 +00:00
parent ca5d73d1ab
commit a506df8e48
3 changed files with 112 additions and 97 deletions

View File

@ -481,6 +481,9 @@ Trunk (Unreleased)
HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager
(Haohui Mai via brandonli) (Haohui Mai via brandonli)
HDFS-5775. Consolidate the code for serialization in CacheManager
(Haohui Mai via brandonli)
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -160,6 +160,8 @@ public final class CacheManager {
*/ */
private final ReentrantLock crmLock = new ReentrantLock(); private final ReentrantLock crmLock = new ReentrantLock();
private final SerializerCompat serializerCompat = new SerializerCompat();
/** /**
* The CacheReplicationMonitor. * The CacheReplicationMonitor.
*/ */
@ -926,11 +928,9 @@ public final class CacheManager {
* @param sdPath path of the storage directory * @param sdPath path of the storage directory
* @throws IOException * @throws IOException
*/ */
public void saveState(DataOutputStream out, String sdPath) public void saveStateCompat(DataOutputStream out, String sdPath)
throws IOException { throws IOException {
out.writeLong(nextDirectiveId); serializerCompat.save(out, sdPath);
savePools(out, sdPath);
saveDirectives(out, sdPath);
} }
/** /**
@ -939,105 +939,117 @@ public final class CacheManager {
* @param in DataInput from which to restore state * @param in DataInput from which to restore state
* @throws IOException * @throws IOException
*/ */
public void loadState(DataInput in) throws IOException { public void loadStateCompat(DataInput in) throws IOException {
nextDirectiveId = in.readLong(); serializerCompat.load(in);
// pools need to be loaded first since directives point to their parent pool
loadPools(in);
loadDirectives(in);
} }
/** private final class SerializerCompat {
* Save cache pools to fsimage private void save(DataOutputStream out, String sdPath) throws IOException {
*/ out.writeLong(nextDirectiveId);
private void savePools(DataOutputStream out, savePools(out, sdPath);
String sdPath) throws IOException { saveDirectives(out, sdPath);
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
counter.increment();
} }
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/* private void load(DataInput in) throws IOException {
* Save cache entries to fsimage nextDirectiveId = in.readLong();
*/ // pools need to be loaded first since directives point to their parent pool
private void saveDirectives(DataOutputStream out, String sdPath) loadPools(in);
throws IOException { loadDirectives(in);
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(directivesById.size());
for (CacheDirective directive : directivesById.values()) {
FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
counter.increment();
} }
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/** /**
* Load cache pools from fsimage * Save cache pools to fsimage
*/ */
private void loadPools(DataInput in) private void savePools(DataOutputStream out,
throws IOException { String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress(); StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS); Step step = new Step(StepType.CACHE_POOLS, sdPath);
prog.beginStep(Phase.LOADING_FSIMAGE, step); prog.beginStep(Phase.SAVING_CHECKPOINT, step);
int numberOfPools = in.readInt(); prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); out.writeInt(cachePools.size());
for (int i = 0; i < numberOfPools; i++) { for (CachePool pool: cachePools.values()) {
addCachePool(FSImageSerialization.readCachePoolInfo(in)); FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
counter.increment(); counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/**
* Load cache directives from the fsimage
*/
private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numDirectives = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numDirectives; i++) {
CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
// Get pool reference by looking it up in the map
final String poolName = info.getPool();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
} }
CacheDirective directive = prog.endStep(Phase.SAVING_CHECKPOINT, step);
new CacheDirective(info.getId(), info.getPath().toUri().getPath(), }
info.getReplication(), info.getExpiration().getAbsoluteMillis());
boolean addedDirective = pool.getDirectiveList().add(directive); /*
assert addedDirective; * Save cache entries to fsimage
if (directivesById.put(directive.getId(), directive) != null) { */
throw new IOException("A directive with ID " + directive.getId() + private void saveDirectives(DataOutputStream out, String sdPath)
" already exists"); throws IOException {
} StartupProgress prog = NameNode.getStartupProgress();
List<CacheDirective> directives = Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
directivesByPath.get(directive.getPath()); prog.beginStep(Phase.SAVING_CHECKPOINT, step);
if (directives == null) { prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
directives = new LinkedList<CacheDirective>(); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
directivesByPath.put(directive.getPath(), directives); out.writeInt(directivesById.size());
} for (CacheDirective directive : directivesById.values()) {
directives.add(directive); FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
counter.increment(); counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/**
* Load cache pools from fsimage
*/
private void loadPools(DataInput in)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfPools = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfPools; i++) {
addCachePool(FSImageSerialization.readCachePoolInfo(in));
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/**
* Load cache directives from the fsimage
*/
private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numDirectives = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numDirectives; i++) {
CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
// Get pool reference by looking it up in the map
final String poolName = info.getPool();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
}
CacheDirective directive =
new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
info.getReplication(), info.getExpiration().getAbsoluteMillis());
boolean addedDirective = pool.getDirectiveList().add(directive);
assert addedDirective;
if (directivesById.put(directive.getId(), directive) != null) {
throw new IOException("A directive with ID " + directive.getId() +
" already exists");
}
List<CacheDirective> directives =
directivesByPath.get(directive.getPath());
if (directives == null) {
directives = new LinkedList<CacheDirective>();
directivesByPath.put(directive.getPath(), directives);
}
directives.add(directive);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
} }
prog.endStep(Phase.LOADING_FSIMAGE, step);
} }
public void waitForRescanIfNeeded() { public void waitForRescanIfNeeded() {

View File

@ -878,7 +878,7 @@ public class FSImageFormat {
if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) { if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) {
return; return;
} }
namesystem.getCacheManager().loadState(in); namesystem.getCacheManager().loadStateCompat(in);
} }
private int getLayoutVersion() { private int getLayoutVersion() {
@ -1034,7 +1034,7 @@ public class FSImageFormat {
context.checkCancelled(); context.checkCancelled();
sourceNamesystem.saveSecretManagerStateCompat(out, sdPath); sourceNamesystem.saveSecretManagerStateCompat(out, sdPath);
context.checkCancelled(); context.checkCancelled();
sourceNamesystem.getCacheManager().saveState(out, sdPath); sourceNamesystem.getCacheManager().saveStateCompat(out, sdPath);
context.checkCancelled(); context.checkCancelled();
out.flush(); out.flush();
context.checkCancelled(); context.checkCancelled();