diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7172144afcf..e2aa07861dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -481,6 +481,9 @@ Trunk (Unreleased) HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager (Haohui Mai via brandonli) + HDFS-5775. Consolidate the code for serialization in CacheManager + (Haohui Mai via brandonli) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 5cb8fe92332..e6fb5fc218e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -160,6 +160,8 @@ public final class CacheManager { */ private final ReentrantLock crmLock = new ReentrantLock(); + private final SerializerCompat serializerCompat = new SerializerCompat(); + /** * The CacheReplicationMonitor. */ @@ -926,11 +928,9 @@ private void processCacheReportImpl(final DatanodeDescriptor datanode, * @param sdPath path of the storage directory * @throws IOException */ - public void saveState(DataOutputStream out, String sdPath) + public void saveStateCompat(DataOutputStream out, String sdPath) throws IOException { - out.writeLong(nextDirectiveId); - savePools(out, sdPath); - saveDirectives(out, sdPath); + serializerCompat.save(out, sdPath); } /** @@ -939,105 +939,117 @@ public void saveState(DataOutputStream out, String sdPath) * @param in DataInput from which to restore state * @throws IOException */ - public void loadState(DataInput in) throws IOException { - nextDirectiveId = in.readLong(); - // pools need to be loaded first since directives point to their parent pool - loadPools(in); - loadDirectives(in); + public void loadStateCompat(DataInput in) throws IOException { + serializerCompat.load(in); } - /** - * Save cache pools to fsimage - */ - private void savePools(DataOutputStream out, - String sdPath) throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_POOLS, sdPath); - prog.beginStep(Phase.SAVING_CHECKPOINT, step); - prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); - Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); - out.writeInt(cachePools.size()); - for (CachePool pool: cachePools.values()) { - FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); - counter.increment(); + private final class SerializerCompat { + private void save(DataOutputStream out, String sdPath) throws IOException { + out.writeLong(nextDirectiveId); + savePools(out, sdPath); + saveDirectives(out, sdPath); } - prog.endStep(Phase.SAVING_CHECKPOINT, step); - } - /* - * Save cache entries to fsimage - */ - private void saveDirectives(DataOutputStream out, String sdPath) - throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_ENTRIES, sdPath); - prog.beginStep(Phase.SAVING_CHECKPOINT, step); - prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); - Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); - out.writeInt(directivesById.size()); - for (CacheDirective directive : directivesById.values()) { - FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); - counter.increment(); + private void load(DataInput in) throws IOException { + nextDirectiveId = in.readLong(); + // pools need to be loaded first since directives point to their parent pool + loadPools(in); + loadDirectives(in); } - prog.endStep(Phase.SAVING_CHECKPOINT, step); - } - /** - * Load cache pools from fsimage - */ - private void loadPools(DataInput in) - throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_POOLS); - prog.beginStep(Phase.LOADING_FSIMAGE, step); - int numberOfPools = in.readInt(); - prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); - Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); - for (int i = 0; i < numberOfPools; i++) { - addCachePool(FSImageSerialization.readCachePoolInfo(in)); - counter.increment(); - } - prog.endStep(Phase.LOADING_FSIMAGE, step); - } - - /** - * Load cache directives from the fsimage - */ - private void loadDirectives(DataInput in) throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_ENTRIES); - prog.beginStep(Phase.LOADING_FSIMAGE, step); - int numDirectives = in.readInt(); - prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); - Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); - for (int i = 0; i < numDirectives; i++) { - CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); - // Get pool reference by looking it up in the map - final String poolName = info.getPool(); - CachePool pool = cachePools.get(poolName); - if (pool == null) { - throw new IOException("Directive refers to pool " + poolName + - ", which does not exist."); + /** + * Save cache pools to fsimage + */ + private void savePools(DataOutputStream out, + String sdPath) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_POOLS, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); + out.writeInt(cachePools.size()); + for (CachePool pool: cachePools.values()) { + FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); + counter.increment(); } - CacheDirective directive = - new CacheDirective(info.getId(), info.getPath().toUri().getPath(), - info.getReplication(), info.getExpiration().getAbsoluteMillis()); - boolean addedDirective = pool.getDirectiveList().add(directive); - assert addedDirective; - if (directivesById.put(directive.getId(), directive) != null) { - throw new IOException("A directive with ID " + directive.getId() + - " already exists"); - } - List directives = - directivesByPath.get(directive.getPath()); - if (directives == null) { - directives = new LinkedList(); - directivesByPath.put(directive.getPath(), directives); - } - directives.add(directive); - counter.increment(); + prog.endStep(Phase.SAVING_CHECKPOINT, step); + } + + /* + * Save cache entries to fsimage + */ + private void saveDirectives(DataOutputStream out, String sdPath) + throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_ENTRIES, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); + out.writeInt(directivesById.size()); + for (CacheDirective directive : directivesById.values()) { + FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); + counter.increment(); + } + prog.endStep(Phase.SAVING_CHECKPOINT, step); + } + + /** + * Load cache pools from fsimage + */ + private void loadPools(DataInput in) + throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_POOLS); + prog.beginStep(Phase.LOADING_FSIMAGE, step); + int numberOfPools = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); + for (int i = 0; i < numberOfPools; i++) { + addCachePool(FSImageSerialization.readCachePoolInfo(in)); + counter.increment(); + } + prog.endStep(Phase.LOADING_FSIMAGE, step); + } + + /** + * Load cache directives from the fsimage + */ + private void loadDirectives(DataInput in) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_ENTRIES); + prog.beginStep(Phase.LOADING_FSIMAGE, step); + int numDirectives = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); + for (int i = 0; i < numDirectives; i++) { + CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); + // Get pool reference by looking it up in the map + final String poolName = info.getPool(); + CachePool pool = cachePools.get(poolName); + if (pool == null) { + throw new IOException("Directive refers to pool " + poolName + + ", which does not exist."); + } + CacheDirective directive = + new CacheDirective(info.getId(), info.getPath().toUri().getPath(), + info.getReplication(), info.getExpiration().getAbsoluteMillis()); + boolean addedDirective = pool.getDirectiveList().add(directive); + assert addedDirective; + if (directivesById.put(directive.getId(), directive) != null) { + throw new IOException("A directive with ID " + directive.getId() + + " already exists"); + } + List directives = + directivesByPath.get(directive.getPath()); + if (directives == null) { + directives = new LinkedList(); + directivesByPath.put(directive.getPath(), directives); + } + directives.add(directive); + counter.increment(); + } + prog.endStep(Phase.LOADING_FSIMAGE, step); } - prog.endStep(Phase.LOADING_FSIMAGE, step); } public void waitForRescanIfNeeded() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 7a3e066e8de..fd86f81fc4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -878,7 +878,7 @@ private void loadCacheManagerState(DataInput in) throws IOException { if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) { return; } - namesystem.getCacheManager().loadState(in); + namesystem.getCacheManager().loadStateCompat(in); } private int getLayoutVersion() { @@ -1034,7 +1034,7 @@ void save(File newFile, FSImageCompression compression) throws IOException { context.checkCancelled(); sourceNamesystem.saveSecretManagerStateCompat(out, sdPath); context.checkCancelled(); - sourceNamesystem.getCacheManager().saveState(out, sdPath); + sourceNamesystem.getCacheManager().saveStateCompat(out, sdPath); context.checkCancelled(); out.flush(); context.checkCancelled();