From afbdd8fdcfda16ab0b3b317d04fb296418a23290 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Tue, 10 Apr 2018 23:35:00 +0530 Subject: [PATCH] HDFS-13328. Abstract ReencryptionHandler recursive logic in separate class. Contributed by Surendra Singh Lilhore. (cherry picked from commit f89594f0b80e8efffdcb887daa4a18a2b0a228b3) --- .../hdfs/server/namenode/FSTreeTraverser.java | 339 +++++++++ .../server/namenode/ReencryptionHandler.java | 653 +++++++----------- .../server/namenode/ReencryptionUpdater.java | 2 +- .../server/namenode/TestReencryption.java | 3 - .../namenode/TestReencryptionHandler.java | 10 +- 5 files changed, 614 insertions(+), 393 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java new file mode 100644 index 00000000000..ff77029a6a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.util.ReadOnlyList; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * FSTreeTraverser traverse directory recursively and process files + * in batches. + */ +@InterfaceAudience.Private +public abstract class FSTreeTraverser { + + + public static final Logger LOG = LoggerFactory + .getLogger(FSTreeTraverser.class); + + private final FSDirectory dir; + + private long readLockReportingThresholdMs; + + private Timer timer; + + public FSTreeTraverser(FSDirectory dir, Configuration conf) { + this.dir = dir; + this.readLockReportingThresholdMs = conf.getLong( + DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, + DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT); + timer = new Timer(); + } + + public FSDirectory getFSDirectory() { + return dir; + } + + /** + * Iterate through all files directly inside parent, and recurse down + * directories. The listing is done in batch, and can optionally start after + * a position. The iteration of the inode tree is done in a depth-first + * fashion. But instead of holding all {@link INodeDirectory}'s in memory + * on the fly, only the path components to the current inode is held. This + * is to reduce memory consumption. + * + * @param parent + * The inode id of parent directory + * @param startId + * Id of the start inode. + * @param startAfter + * Full path of a file the traverse should start after. + * @param traverseInfo + * info which may required for processing the child's. + * @throws IOException + * @throws InterruptedException + */ + protected void traverseDir(final INodeDirectory parent, final long startId, + byte[] startAfter, final TraverseInfo traverseInfo) + throws IOException, InterruptedException { + List startAfters = new ArrayList<>(); + if (parent == null) { + return; + } + INode curr = parent; + // construct startAfters all the way up to the zone inode. + startAfters.add(startAfter); + while (curr.getId() != startId) { + startAfters.add(0, curr.getLocalNameBytes()); + curr = curr.getParent(); + } + curr = traverseDirInt(startId, parent, startAfters, traverseInfo); + while (!startAfters.isEmpty()) { + if (curr == null) { + // lock was reacquired, re-resolve path. + curr = resolvePaths(startId, startAfters); + } + curr = traverseDirInt(startId, curr, startAfters, traverseInfo); + } + } + + /** + * Iterates the parent directory, and add direct children files to current + * batch. If batch size meets configured threshold, current batch will be + * submitted for the processing. + *

+ * Locks could be released and reacquired when a batch submission is + * finished. + * + * @param startId + * Id of the start inode. + * @return The inode which was just processed, if lock is held in the entire + * process. Null if lock is released. + * @throws IOException + * @throws InterruptedException + */ + protected INode traverseDirInt(final long startId, INode curr, + List startAfters, final TraverseInfo traverseInfo) + throws IOException, InterruptedException { + assert dir.hasReadLock(); + assert dir.getFSNamesystem().hasReadLock(); + long lockStartTime = timer.monotonicNow(); + Preconditions.checkNotNull(curr, "Current inode can't be null"); + checkINodeReady(startId); + final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory() + : curr.getParent(); + ReadOnlyList children = parent + .getChildrenList(Snapshot.CURRENT_STATE_ID); + if (LOG.isDebugEnabled()) { + LOG.debug("Traversing directory {}", parent.getFullPathName()); + } + + final byte[] startAfter = startAfters.get(startAfters.size() - 1); + boolean lockReleased = false; + for (int i = INodeDirectory.nextChild(children, startAfter); i < children + .size(); ++i) { + final INode inode = children.get(i); + if (!processFileInode(inode, traverseInfo)) { + // inode wasn't processes. Recurse down if it's a dir, + // skip otherwise. + if (!inode.isDirectory()) { + continue; + } + + if (!canTraverseDir(inode)) { + continue; + } + // add 1 level to the depth-first search. + curr = inode; + if (!startAfters.isEmpty()) { + startAfters.remove(startAfters.size() - 1); + startAfters.add(curr.getLocalNameBytes()); + } + startAfters.add(HdfsFileStatus.EMPTY_NAME); + return lockReleased ? null : curr; + } + if (shouldSubmitCurrentBatch()) { + final byte[] currentStartAfter = inode.getLocalNameBytes(); + final String parentPath = parent.getFullPathName(); + lockReleased = true; + readUnlock(); + submitCurrentBatch(startId); + try { + throttle(); + checkPauseForTesting(); + } finally { + readLock(); + lockStartTime = timer.monotonicNow(); + } + checkINodeReady(startId); + + // Things could have changed when the lock was released. + // Re-resolve the parent inode. + FSPermissionChecker pc = dir.getPermissionChecker(); + INode newParent = dir + .resolvePath(pc, parentPath, FSDirectory.DirOp.READ) + .getLastINode(); + if (newParent == null || !newParent.equals(parent)) { + // parent dir is deleted or recreated. We're done. + return null; + } + children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID); + // -1 to counter the ++ on the for loop + i = INodeDirectory.nextChild(children, currentStartAfter) - 1; + } + if ((timer.monotonicNow() + - lockStartTime) > readLockReportingThresholdMs) { + readUnlock(); + try { + throttle(); + } finally { + readLock(); + lockStartTime = timer.monotonicNow(); + } + } + } + // Successfully finished this dir, adjust pointers to 1 level up, and + // startAfter this dir. + startAfters.remove(startAfters.size() - 1); + if (!startAfters.isEmpty()) { + startAfters.remove(startAfters.size() - 1); + startAfters.add(curr.getLocalNameBytes()); + } + curr = curr.getParent(); + return lockReleased ? null : curr; + } + + /** + * Resolve the cursor of traverse to an inode. + *

+ * The parent of the lowest level startAfter is returned. If somewhere in the + * middle of startAfters changed, the parent of the lowest unchanged level is + * returned. + * + * @param startId + * Id of the start inode. + * @param startAfters + * the cursor, represented by a list of path bytes. + * @return the parent inode corresponding to the startAfters, or null if the + * furthest parent is deleted. + */ + private INode resolvePaths(final long startId, List startAfters) + throws IOException { + // If the readlock was reacquired, we need to resolve the paths again + // in case things have changed. If our cursor file/dir is changed, + // continue from the next one. + INode zoneNode = dir.getInode(startId); + if (zoneNode == null) { + throw new FileNotFoundException("Zone " + startId + " is deleted."); + } + INodeDirectory parent = zoneNode.asDirectory(); + for (int i = 0; i < startAfters.size(); ++i) { + if (i == startAfters.size() - 1) { + // last startAfter does not need to be resolved, since search for + // nextChild will cover that automatically. + break; + } + INode curr = parent.getChild(startAfters.get(i), + Snapshot.CURRENT_STATE_ID); + if (curr == null) { + // inode at this level has changed. Update startAfters to point to + // the next dir at the parent level (and dropping any startAfters + // at lower levels). + for (; i < startAfters.size(); ++i) { + startAfters.remove(startAfters.size() - 1); + } + break; + } + parent = curr.asDirectory(); + } + return parent; + } + + protected void readLock() { + dir.getFSNamesystem().readLock(); + dir.readLock(); + } + + protected void readUnlock() { + dir.readUnlock(); + dir.getFSNamesystem().readUnlock("FSTreeTraverser"); + } + + + protected abstract void checkPauseForTesting() throws InterruptedException; + + /** + * Process an Inode. Add to current batch if it's a file, no-op otherwise. + * + * @param inode + * the inode + * @return true if inode is added to currentBatch and should be process for + * next operation. false otherwise: could be inode is not a file. + * @throws IOException + * @throws InterruptedException + */ + protected abstract boolean processFileInode(INode inode, + TraverseInfo traverseInfo) throws IOException, InterruptedException; + + /** + * Check whether current batch can be submitted for the processing. + * + * @return true if batch size meets meet the condition, otherwise false. + */ + protected abstract boolean shouldSubmitCurrentBatch(); + + /** + * Check whether inode is ready for traverse. Throws IOE if it's not. + * + * @param startId + * Id of the start inode. + * @throws IOException + */ + protected abstract void checkINodeReady(long startId) throws IOException; + + /** + * Submit the current batch for processing. + * + * @param startId + * Id of the start inode. + * @throws IOException + * @throws InterruptedException + */ + protected abstract void submitCurrentBatch(long startId) + throws IOException, InterruptedException; + + /** + * Throttles the FSTreeTraverser. + * + * @throws InterruptedException + */ + protected abstract void throttle() throws InterruptedException; + + /** + * Check whether dir is traversable or not. + * + * @param inode + * Dir inode + * @return true if dir is traversable otherwise false. + * @throws IOException + */ + protected abstract boolean canTraverseDir(INode inode) throws IOException; + + /** + * Class will represent the additional info required for traverse. + */ + public static class TraverseInfo { + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index 01c20382f0c..7c5dbdb4cd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -30,18 +30,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.ReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker; -import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -118,6 +116,8 @@ public class ReencryptionHandler implements Runnable { // be single-threaded, see class javadoc for more details. private ReencryptionBatch currentBatch; + private final ReencryptionPendingInodeIdCollector traverser; + private final ReencryptionUpdater reencryptionUpdater; private ExecutorService updaterExecutor; @@ -186,16 +186,6 @@ public class ReencryptionHandler implements Runnable { reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count); } - private synchronized void checkPauseForTesting() throws InterruptedException { - assert !dir.hasReadLock(); - assert !dir.getFSNamesystem().hasReadLock(); - while (shouldPauseForTesting) { - LOG.info("Sleeping in the re-encrypt handler for unit test."); - wait(); - LOG.info("Continuing re-encrypt handler after pausing."); - } - } - ReencryptionHandler(final EncryptionZoneManager ezMgr, final Configuration conf) { this.ezManager = ezMgr; @@ -256,6 +246,7 @@ public class ReencryptionHandler implements Runnable { reencryptionUpdater = new ReencryptionUpdater(dir, batchService, this, conf); currentBatch = new ReencryptionBatch(reencryptBatchSize); + traverser = new ReencryptionPendingInodeIdCollector(dir, this, conf); } ReencryptionStatus getReencryptionStatus() { @@ -339,7 +330,7 @@ public class ReencryptionHandler implements Runnable { synchronized (this) { wait(interval); } - checkPauseForTesting(); + traverser.checkPauseForTesting(); } catch (InterruptedException ie) { LOG.info("Re-encrypt handler interrupted. Exiting"); Thread.currentThread().interrupt(); @@ -397,7 +388,7 @@ public class ReencryptionHandler implements Runnable { final INode zoneNode; final ZoneReencryptionStatus zs; - readLock(); + traverser.readLock(); try { zoneNode = dir.getInode(zoneId); // start re-encrypting the zone from the beginning @@ -419,18 +410,19 @@ public class ReencryptionHandler implements Runnable { zoneId); if (zs.getLastCheckpointFile() == null) { // new re-encryption - reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME, - zs.getEzKeyVersionName()); + traverser.traverseDir(zoneNode.asDirectory(), zoneId, + HdfsFileStatus.EMPTY_NAME, + new ZoneTraverseInfo(zs.getEzKeyVersionName())); } else { // resuming from a past re-encryption restoreFromLastProcessedFile(zoneId, zs); } // save the last batch and mark complete - submitCurrentBatch(zoneId); + traverser.submitCurrentBatch(zoneId); LOG.info("Submission completed of zone {} for re-encryption.", zoneId); reencryptionUpdater.markZoneSubmissionDone(zoneId); } finally { - readUnlock(); + traverser.readUnlock(); } } @@ -479,131 +471,8 @@ public class ReencryptionHandler implements Runnable { dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ); parent = lpfIIP.getLastINode().getParent(); startAfter = lpfIIP.getLastINode().getLocalNameBytes(); - reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName()); - } - - /** - * Iterate through all files directly inside parent, and recurse down - * directories. The listing is done in batch, and can optionally start after - * a position. - *

- * Each batch is then send to the threadpool, where KMS will be contacted and - * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed - * from the threadpool. - *

- * The iteration of the inode tree is done in a depth-first fashion. But - * instead of holding all INodeDirectory's in memory on the fly, only the - * path components to the current inode is held. This is to reduce memory - * consumption. - * - * @param parent The inode id of parent directory - * @param zoneId Id of the EZ inode - * @param startAfter Full path of a file the re-encrypt should start after. - * @throws IOException - * @throws InterruptedException - */ - private void reencryptDir(final INodeDirectory parent, final long zoneId, - byte[] startAfter, final String ezKeyVerName) - throws IOException, InterruptedException { - List startAfters = new ArrayList<>(); - if (parent == null) { - return; - } - INode curr = parent; - // construct startAfters all the way up to the zone inode. - startAfters.add(startAfter); - while (curr.getId() != zoneId) { - startAfters.add(0, curr.getLocalNameBytes()); - curr = curr.getParent(); - } - curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName); - while (!startAfters.isEmpty()) { - if (curr == null) { - // lock was reacquired, re-resolve path. - curr = resolvePaths(zoneId, startAfters); - } - curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName); - } - } - - /** - * Resolve the cursor of re-encryption to an inode. - *

- * The parent of the lowest level startAfter is returned. If somewhere in the - * middle of startAfters changed, the parent of the lowest unchanged level is - * returned. - * - * @param zoneId Id of the EZ inode. - * @param startAfters the cursor, represented by a list of path bytes. - * @return the parent inode corresponding to the startAfters, or null if - * the EZ node (furthest parent) is deleted. - */ - private INode resolvePaths(final long zoneId, List startAfters) - throws IOException { - // If the readlock was reacquired, we need to resolve the paths again - // in case things have changed. If our cursor file/dir is changed, - // continue from the next one. - INode zoneNode = dir.getInode(zoneId); - if (zoneNode == null) { - throw new FileNotFoundException("Zone " + zoneId + " is deleted."); - } - INodeDirectory parent = zoneNode.asDirectory(); - for (int i = 0; i < startAfters.size(); ++i) { - if (i == startAfters.size() - 1) { - // last startAfter does not need to be resolved, since search for - // nextChild will cover that automatically. - break; - } - INode curr = - parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID); - if (curr == null) { - // inode at this level has changed. Update startAfters to point to - // the next dir at the parent level (and dropping any startAfters - // at lower levels). - for (; i < startAfters.size(); ++i) { - startAfters.remove(startAfters.size() - 1); - } - break; - } - parent = curr.asDirectory(); - } - return parent; - } - - /** - * Submit the current batch to the thread pool. - * - * @param zoneId Id of the EZ INode - * @throws IOException - * @throws InterruptedException - */ - private void submitCurrentBatch(final long zoneId) - throws IOException, InterruptedException { - assert dir.hasReadLock(); - if (currentBatch.isEmpty()) { - return; - } - ZoneSubmissionTracker zst; - synchronized (this) { - zst = submissions.get(zoneId); - if (zst == null) { - zst = new ZoneSubmissionTracker(); - submissions.put(zoneId, zst); - } - } - Future future = batchService - .submit(new EDEKReencryptCallable(zoneId, currentBatch, this)); - zst.addTask(future); - LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", - currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); - currentBatch = new ReencryptionBatch(reencryptBatchSize); - // flip the pause flag if this is nth submission. - // The actual pause need to happen outside of the lock. - if (pauseAfterNthSubmission > 0) { - if (--pauseAfterNthSubmission == 0) { - shouldPauseForTesting = true; - } - } + traverser.traverseDir(parent, zoneId, startAfter, + new ZoneTraverseInfo(zs.getEzKeyVersionName())); } final class ReencryptionBatch { @@ -711,248 +580,6 @@ public class ReencryptionHandler implements Runnable { } } - /** - * Iterates the parent directory, and add direct children files to - * current batch. If batch size meets configured threshold, a Callable - * is created and sent to the thread pool, which will communicate to the KMS - * to get new edeks. - *

- * Locks could be released and reacquired when a Callable is created. - * - * @param zoneId Id of the EZ INode - * @return The inode which was just processed, if lock is held in the entire - * process. Null if lock is released. - * @throws IOException - * @throws InterruptedException - */ - private INode reencryptDirInt(final long zoneId, INode curr, - List startAfters, final String ezKeyVerName) - throws IOException, InterruptedException { - assert dir.hasReadLock(); - assert dir.getFSNamesystem().hasReadLock(); - Preconditions.checkNotNull(curr, "Current inode can't be null"); - checkZoneReady(zoneId); - final INodeDirectory parent = - curr.isDirectory() ? curr.asDirectory() : curr.getParent(); - ReadOnlyList children = - parent.getChildrenList(Snapshot.CURRENT_STATE_ID); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-encrypting directory {}", parent.getFullPathName()); - } - - final byte[] startAfter = startAfters.get(startAfters.size() - 1); - boolean lockReleased = false; - for (int i = INodeDirectory.nextChild(children, startAfter); - i < children.size(); ++i) { - final INode inode = children.get(i); - if (!reencryptINode(inode, ezKeyVerName)) { - // inode wasn't added for re-encryption. Recurse down if it's a dir, - // skip otherwise. - if (!inode.isDirectory()) { - continue; - } - if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) { - // nested EZ, ignore. - LOG.info("{}({}) is a nested EZ, skipping for re-encryption", - inode.getFullPathName(), inode.getId()); - continue; - } - // add 1 level to the depth-first search. - curr = inode; - if (!startAfters.isEmpty()) { - startAfters.remove(startAfters.size() - 1); - startAfters.add(curr.getLocalNameBytes()); - } - startAfters.add(HdfsFileStatus.EMPTY_NAME); - return lockReleased ? null : curr; - } - if (currentBatch.size() >= reencryptBatchSize) { - final byte[] currentStartAfter = inode.getLocalNameBytes(); - final String parentPath = parent.getFullPathName(); - submitCurrentBatch(zoneId); - lockReleased = true; - readUnlock(); - try { - throttle(); - checkPauseForTesting(); - } finally { - readLock(); - } - checkZoneReady(zoneId); - - // Things could have changed when the lock was released. - // Re-resolve the parent inode. - FSPermissionChecker pc = dir.getPermissionChecker(); - INode newParent = - dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ) - .getLastINode(); - if (newParent == null || !newParent.equals(parent)) { - // parent dir is deleted or recreated. We're done. - return null; - } - children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID); - // -1 to counter the ++ on the for loop - i = INodeDirectory.nextChild(children, currentStartAfter) - 1; - } - } - // Successfully finished this dir, adjust pointers to 1 level up, and - // startAfter this dir. - startAfters.remove(startAfters.size() - 1); - if (!startAfters.isEmpty()) { - startAfters.remove(startAfters.size() - 1); - startAfters.add(curr.getLocalNameBytes()); - } - curr = curr.getParent(); - return lockReleased ? null : curr; - } - - private void readLock() { - dir.getFSNamesystem().readLock(); - dir.readLock(); - throttleTimerLocked.start(); - } - - private void readUnlock() { - dir.readUnlock(); - dir.getFSNamesystem().readUnlock("reencryptHandler"); - throttleTimerLocked.stop(); - } - - /** - * Throttles the ReencryptionHandler in 3 aspects: - * 1. Prevents generating more Callables than the CPU could possibly handle. - * 2. Prevents generating more Callables than the ReencryptionUpdater can - * handle, under its own throttling - * 3. Prevents contending FSN/FSD read locks. This is done based on the - * DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration. - *

- * Item 1 and 2 are to control NN heap usage. - * - * @throws InterruptedException - */ - @VisibleForTesting - void throttle() throws InterruptedException { - // 1. - final int numCores = Runtime.getRuntime().availableProcessors(); - if (taskQueue.size() >= numCores) { - LOG.debug("Re-encryption handler throttling because queue size {} is" - + "larger than number of cores {}", taskQueue.size(), numCores); - while (taskQueue.size() >= numCores) { - Thread.sleep(100); - } - } - - // 2. if tasks are piling up on the updater, don't create new callables - // until the queue size goes down. - final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2; - int numTasks = numTasksSubmitted(); - if (numTasks >= maxTasksPiled) { - LOG.debug("Re-encryption handler throttling because total tasks pending" - + " re-encryption updater is {}", numTasks); - while (numTasks >= maxTasksPiled) { - Thread.sleep(500); - numTasks = numTasksSubmitted(); - } - } - - // 3. - if (throttleLimitHandlerRatio >= 1.0) { - return; - } - final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS) - * throttleLimitHandlerRatio); - final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-encryption handler throttling expect: {}, actual: {}," - + " throttleTimerAll:{}", expect, actual, - throttleTimerAll.now(TimeUnit.MILLISECONDS)); - } - if (expect - actual < 0) { - // in case throttleLimitHandlerRatio is very small, expect will be 0. - // so sleepMs should not be calculated from expect, to really meet the - // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs - // should be 1000 - throttleTimerAll.now() - final long sleepMs = - (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll - .now(TimeUnit.MILLISECONDS); - LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs); - Thread.sleep(sleepMs); - } - throttleTimerAll.reset().start(); - throttleTimerLocked.reset(); - } - - private synchronized int numTasksSubmitted() { - int ret = 0; - for (ZoneSubmissionTracker zst : submissions.values()) { - ret += zst.getTasks().size(); - } - return ret; - } - - /** - * Process an Inode for re-encryption. Add to current batch if it's a file, - * no-op otherwise. - * - * @param inode the inode - * @return true if inode is added to currentBatch and should be re-encrypted. - * false otherwise: could be inode is not a file, or inode's edek's - * key version is not changed. - * @throws IOException - * @throws InterruptedException - */ - private boolean reencryptINode(final INode inode, final String ezKeyVerName) - throws IOException, InterruptedException { - assert dir.hasReadLock(); - if (LOG.isTraceEnabled()) { - LOG.trace("Processing {} for re-encryption", inode.getFullPathName()); - } - if (!inode.isFile()) { - return false; - } - FileEncryptionInfo feInfo = FSDirEncryptionZoneOp - .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode)); - if (feInfo == null) { - LOG.warn("File {} skipped re-encryption because it is not encrypted! " - + "This is very likely a bug.", inode.getId()); - return false; - } - if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("File {} skipped re-encryption because edek's key version" - + " name is not changed.", inode.getFullPathName()); - } - return false; - } - currentBatch.add(inode.asFile()); - return true; - } - - /** - * Check whether zone is ready for re-encryption. Throws IOE if it's not. - * 1. If EZ is deleted. - * 2. if the re-encryption is canceled. - * 3. If NN is not active or is in safe mode. - * - * @throws IOException if zone does not exist / is cancelled, or if NN is not - * ready for write. - */ - void checkZoneReady(final long zoneId) - throws RetriableException, SafeModeException, IOException { - final ZoneReencryptionStatus zs = - getReencryptionStatus().getZoneStatus(zoneId); - if (zs == null) { - throw new IOException("Zone " + zoneId + " status cannot be found."); - } - if (zs.isCanceled()) { - throw new IOException("Re-encryption is canceled for zone " + zoneId); - } - dir.getFSNamesystem() - .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt."); - // re-encryption should be cancelled when NN goes to standby. Just - // double checking for sanity. - dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE); - } /** * Called when a new zone is submitted for re-encryption. This will interrupt @@ -963,4 +590,258 @@ public class ReencryptionHandler implements Runnable { LOG.debug("Notifying handler for new re-encryption command."); this.notify(); } + + public ReencryptionPendingInodeIdCollector getTraverser() { + return traverser; + } + + /** + * ReencryptionPendingInodeIdCollector which throttle based on configured + * throttle ratio. + */ + class ReencryptionPendingInodeIdCollector extends FSTreeTraverser { + + private final ReencryptionHandler reencryptionHandler; + + ReencryptionPendingInodeIdCollector(FSDirectory dir, + ReencryptionHandler rHandler, Configuration conf) { + super(dir, conf); + this.reencryptionHandler = rHandler; + } + + @Override + protected void checkPauseForTesting() + throws InterruptedException { + assert !dir.hasReadLock(); + assert !dir.getFSNamesystem().hasReadLock(); + while (shouldPauseForTesting) { + LOG.info("Sleeping in the re-encrypt handler for unit test."); + synchronized (reencryptionHandler) { + reencryptionHandler.wait(30000); + } + LOG.info("Continuing re-encrypt handler after pausing."); + } + } + + /** + * Process an Inode for re-encryption. Add to current batch if it's a file, + * no-op otherwise. + * + * @param inode + * the inode + * @return true if inode is added to currentBatch and should be + * re-encrypted. false otherwise: could be inode is not a file, or + * inode's edek's key version is not changed. + * @throws IOException + * @throws InterruptedException + */ + @Override + public boolean processFileInode(INode inode, TraverseInfo traverseInfo) + throws IOException, InterruptedException { + assert dir.hasReadLock(); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for re-encryption", inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo( + dir, INodesInPath.fromINode(inode)); + if (feInfo == null) { + LOG.warn("File {} skipped re-encryption because it is not encrypted! " + + "This is very likely a bug.", inode.getId()); + return false; + } + if (traverseInfo instanceof ZoneTraverseInfo + && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals( + feInfo.getEzKeyVersionName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("File {} skipped re-encryption because edek's key version" + + " name is not changed.", inode.getFullPathName()); + } + return false; + } + currentBatch.add(inode.asFile()); + return true; + } + + /** + * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1. + * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not + * active or is in safe mode. + * + * @throws IOException + * if zone does not exist / is cancelled, or if NN is not ready + * for write. + */ + @Override + protected void checkINodeReady(long zoneId) throws IOException { + final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus( + zoneId); + if (zs == null) { + throw new IOException("Zone " + zoneId + " status cannot be found."); + } + if (zs.isCanceled()) { + throw new IOException("Re-encryption is canceled for zone " + zoneId); + } + dir.getFSNamesystem().checkNameNodeSafeMode( + "NN is in safe mode, cannot re-encrypt."); + // re-encryption should be cancelled when NN goes to standby. Just + // double checking for sanity. + dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE); + } + + /** + * Submit the current batch to the thread pool. + * + * @param zoneId + * Id of the EZ INode + * @throws IOException + * @throws InterruptedException + */ + @Override + protected void submitCurrentBatch(final long zoneId) throws IOException, + InterruptedException { + if (currentBatch.isEmpty()) { + return; + } + ZoneSubmissionTracker zst; + synchronized (ReencryptionHandler.this) { + zst = submissions.get(zoneId); + if (zst == null) { + zst = new ZoneSubmissionTracker(); + submissions.put(zoneId, zst); + } + } + Future future = batchService.submit(new EDEKReencryptCallable(zoneId, + currentBatch, reencryptionHandler)); + zst.addTask(future); + LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", + currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); + currentBatch = new ReencryptionBatch(reencryptBatchSize); + // flip the pause flag if this is nth submission. + // The actual pause need to happen outside of the lock. + if (pauseAfterNthSubmission > 0) { + if (--pauseAfterNthSubmission == 0) { + shouldPauseForTesting = true; + } + } + } + + /** + * Throttles the ReencryptionHandler in 3 aspects: + * 1. Prevents generating more Callables than the CPU could possibly + * handle. + * 2. Prevents generating more Callables than the ReencryptionUpdater + * can handle, under its own throttling. + * 3. Prevents contending FSN/FSD read locks. This is done based + * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration. + *

+ * Item 1 and 2 are to control NN heap usage. + * + * @throws InterruptedException + */ + @VisibleForTesting + @Override + protected void throttle() throws InterruptedException { + assert !dir.hasReadLock(); + assert !dir.getFSNamesystem().hasReadLock(); + final int numCores = Runtime.getRuntime().availableProcessors(); + if (taskQueue.size() >= numCores) { + LOG.debug("Re-encryption handler throttling because queue size {} is" + + "larger than number of cores {}", taskQueue.size(), numCores); + while (taskQueue.size() >= numCores) { + Thread.sleep(100); + } + } + + // 2. if tasks are piling up on the updater, don't create new callables + // until the queue size goes down. + final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2; + int numTasks = numTasksSubmitted(); + if (numTasks >= maxTasksPiled) { + LOG.debug("Re-encryption handler throttling because total tasks pending" + + " re-encryption updater is {}", numTasks); + while (numTasks >= maxTasksPiled) { + Thread.sleep(500); + numTasks = numTasksSubmitted(); + } + } + + // 3. + if (throttleLimitHandlerRatio >= 1.0) { + return; + } + final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS) + * throttleLimitHandlerRatio); + final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-encryption handler throttling expect: {}, actual: {}," + + " throttleTimerAll:{}", expect, actual, + throttleTimerAll.now(TimeUnit.MILLISECONDS)); + } + if (expect - actual < 0) { + // in case throttleLimitHandlerRatio is very small, expect will be 0. + // so sleepMs should not be calculated from expect, to really meet the + // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs + // should be 1000 - throttleTimerAll.now() + final long sleepMs = (long) (actual / throttleLimitHandlerRatio) + - throttleTimerAll.now(TimeUnit.MILLISECONDS); + LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs); + Thread.sleep(sleepMs); + } + throttleTimerAll.reset().start(); + throttleTimerLocked.reset(); + } + + private int numTasksSubmitted() { + int ret = 0; + synchronized (ReencryptionHandler.this) { + for (ZoneSubmissionTracker zst : submissions.values()) { + ret += zst.getTasks().size(); + } + } + return ret; + } + + @Override + public boolean shouldSubmitCurrentBatch() { + return currentBatch.size() >= reencryptBatchSize; + } + + @Override + public boolean canTraverseDir(INode inode) throws IOException { + if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) { + // nested EZ, ignore. + LOG.info("{}({}) is a nested EZ, skipping for re-encryption", + inode.getFullPathName(), inode.getId()); + return false; + } + return true; + } + + @Override + protected void readLock() { + super.readLock(); + throttleTimerLocked.start(); + } + + @Override + protected void readUnlock() { + super.readUnlock(); + throttleTimerLocked.stop(); + } + } + + private class ZoneTraverseInfo extends TraverseInfo { + private String ezKeyVerName; + + ZoneTraverseInfo(String ezKeyVerName) { + this.ezKeyVerName = ezKeyVerName; + } + + public String getEzKeyVerName() { + return ezKeyVerName; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java index 3b7badbfe20..a5923a7836c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java @@ -464,7 +464,7 @@ public final class ReencryptionUpdater implements Runnable { final String zonePath; dir.writeLock(); try { - handler.checkZoneReady(task.zoneId); + handler.getTraverser().checkINodeReady(task.zoneId); final INode zoneNode = dir.getInode(task.zoneId); if (zoneNode == null) { // ez removed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java index aca9a736783..d36b14756be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -64,7 +63,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; - import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -72,7 +70,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import org.junit.rules.Timeout; import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.LoggerFactory; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java index e2035ed1da6..3481b42a5a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java @@ -75,6 +75,10 @@ public class TestReencryptionHandler { CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH); Mockito.when(ezm.getProvider()).thenReturn( KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp)); + FSDirectory fsd = Mockito.mock(FSDirectory.class); + FSNamesystem fns = Mockito.mock(FSNamesystem.class); + Mockito.when(fsd.getFSNamesystem()).thenReturn(fns); + Mockito.when(ezm.getFSDirectory()).thenReturn(fsd); return new ReencryptionHandler(ezm, conf); } @@ -99,7 +103,7 @@ public class TestReencryptionHandler { Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked); Whitebox.setInternalState(rh, "taskQueue", queue); final StopWatch sw = new StopWatch().start(); - rh.throttle(); + rh.getTraverser().throttle(); sw.stop(); assertTrue("should have throttled for at least 8 second", sw.now(TimeUnit.MILLISECONDS) > 8000); @@ -130,7 +134,7 @@ public class TestReencryptionHandler { submissions = new HashMap<>(); Whitebox.setInternalState(rh, "submissions", submissions); StopWatch sw = new StopWatch().start(); - rh.throttle(); + rh.getTraverser().throttle(); sw.stop(); assertTrue("should not have throttled", sw.now(TimeUnit.MILLISECONDS) < 1000); @@ -189,7 +193,7 @@ public class TestReencryptionHandler { Whitebox.setInternalState(rh, "submissions", submissions); final StopWatch sw = new StopWatch().start(); removeTaskThread.start(); - rh.throttle(); + rh.getTraverser().throttle(); sw.stop(); LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS)); assertTrue("should have throttled for at least 3 second",