HDFS-13328. Abstract ReencryptionHandler recursive logic in separate class. Contributed by Surendra Singh Lilhore.

(cherry picked from commit f89594f0b8)
This commit is contained in:
Rakesh Radhakrishnan 2018-04-10 23:35:00 +05:30
parent fe7a70e586
commit afbdd8fdcf
5 changed files with 614 additions and 393 deletions

View File

@ -0,0 +1,339 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* FSTreeTraverser traverse directory recursively and process files
* in batches.
*/
@InterfaceAudience.Private
public abstract class FSTreeTraverser {
public static final Logger LOG = LoggerFactory
.getLogger(FSTreeTraverser.class);
private final FSDirectory dir;
private long readLockReportingThresholdMs;
private Timer timer;
public FSTreeTraverser(FSDirectory dir, Configuration conf) {
this.dir = dir;
this.readLockReportingThresholdMs = conf.getLong(
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
timer = new Timer();
}
public FSDirectory getFSDirectory() {
return dir;
}
/**
* Iterate through all files directly inside parent, and recurse down
* directories. The listing is done in batch, and can optionally start after
* a position. The iteration of the inode tree is done in a depth-first
* fashion. But instead of holding all {@link INodeDirectory}'s in memory
* on the fly, only the path components to the current inode is held. This
* is to reduce memory consumption.
*
* @param parent
* The inode id of parent directory
* @param startId
* Id of the start inode.
* @param startAfter
* Full path of a file the traverse should start after.
* @param traverseInfo
* info which may required for processing the child's.
* @throws IOException
* @throws InterruptedException
*/
protected void traverseDir(final INodeDirectory parent, final long startId,
byte[] startAfter, final TraverseInfo traverseInfo)
throws IOException, InterruptedException {
List<byte[]> startAfters = new ArrayList<>();
if (parent == null) {
return;
}
INode curr = parent;
// construct startAfters all the way up to the zone inode.
startAfters.add(startAfter);
while (curr.getId() != startId) {
startAfters.add(0, curr.getLocalNameBytes());
curr = curr.getParent();
}
curr = traverseDirInt(startId, parent, startAfters, traverseInfo);
while (!startAfters.isEmpty()) {
if (curr == null) {
// lock was reacquired, re-resolve path.
curr = resolvePaths(startId, startAfters);
}
curr = traverseDirInt(startId, curr, startAfters, traverseInfo);
}
}
/**
* Iterates the parent directory, and add direct children files to current
* batch. If batch size meets configured threshold, current batch will be
* submitted for the processing.
* <p>
* Locks could be released and reacquired when a batch submission is
* finished.
*
* @param startId
* Id of the start inode.
* @return The inode which was just processed, if lock is held in the entire
* process. Null if lock is released.
* @throws IOException
* @throws InterruptedException
*/
protected INode traverseDirInt(final long startId, INode curr,
List<byte[]> startAfters, final TraverseInfo traverseInfo)
throws IOException, InterruptedException {
assert dir.hasReadLock();
assert dir.getFSNamesystem().hasReadLock();
long lockStartTime = timer.monotonicNow();
Preconditions.checkNotNull(curr, "Current inode can't be null");
checkINodeReady(startId);
final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory()
: curr.getParent();
ReadOnlyList<INode> children = parent
.getChildrenList(Snapshot.CURRENT_STATE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug("Traversing directory {}", parent.getFullPathName());
}
final byte[] startAfter = startAfters.get(startAfters.size() - 1);
boolean lockReleased = false;
for (int i = INodeDirectory.nextChild(children, startAfter); i < children
.size(); ++i) {
final INode inode = children.get(i);
if (!processFileInode(inode, traverseInfo)) {
// inode wasn't processes. Recurse down if it's a dir,
// skip otherwise.
if (!inode.isDirectory()) {
continue;
}
if (!canTraverseDir(inode)) {
continue;
}
// add 1 level to the depth-first search.
curr = inode;
if (!startAfters.isEmpty()) {
startAfters.remove(startAfters.size() - 1);
startAfters.add(curr.getLocalNameBytes());
}
startAfters.add(HdfsFileStatus.EMPTY_NAME);
return lockReleased ? null : curr;
}
if (shouldSubmitCurrentBatch()) {
final byte[] currentStartAfter = inode.getLocalNameBytes();
final String parentPath = parent.getFullPathName();
lockReleased = true;
readUnlock();
submitCurrentBatch(startId);
try {
throttle();
checkPauseForTesting();
} finally {
readLock();
lockStartTime = timer.monotonicNow();
}
checkINodeReady(startId);
// Things could have changed when the lock was released.
// Re-resolve the parent inode.
FSPermissionChecker pc = dir.getPermissionChecker();
INode newParent = dir
.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
.getLastINode();
if (newParent == null || !newParent.equals(parent)) {
// parent dir is deleted or recreated. We're done.
return null;
}
children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
// -1 to counter the ++ on the for loop
i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
}
if ((timer.monotonicNow()
- lockStartTime) > readLockReportingThresholdMs) {
readUnlock();
try {
throttle();
} finally {
readLock();
lockStartTime = timer.monotonicNow();
}
}
}
// Successfully finished this dir, adjust pointers to 1 level up, and
// startAfter this dir.
startAfters.remove(startAfters.size() - 1);
if (!startAfters.isEmpty()) {
startAfters.remove(startAfters.size() - 1);
startAfters.add(curr.getLocalNameBytes());
}
curr = curr.getParent();
return lockReleased ? null : curr;
}
/**
* Resolve the cursor of traverse to an inode.
* <p>
* The parent of the lowest level startAfter is returned. If somewhere in the
* middle of startAfters changed, the parent of the lowest unchanged level is
* returned.
*
* @param startId
* Id of the start inode.
* @param startAfters
* the cursor, represented by a list of path bytes.
* @return the parent inode corresponding to the startAfters, or null if the
* furthest parent is deleted.
*/
private INode resolvePaths(final long startId, List<byte[]> startAfters)
throws IOException {
// If the readlock was reacquired, we need to resolve the paths again
// in case things have changed. If our cursor file/dir is changed,
// continue from the next one.
INode zoneNode = dir.getInode(startId);
if (zoneNode == null) {
throw new FileNotFoundException("Zone " + startId + " is deleted.");
}
INodeDirectory parent = zoneNode.asDirectory();
for (int i = 0; i < startAfters.size(); ++i) {
if (i == startAfters.size() - 1) {
// last startAfter does not need to be resolved, since search for
// nextChild will cover that automatically.
break;
}
INode curr = parent.getChild(startAfters.get(i),
Snapshot.CURRENT_STATE_ID);
if (curr == null) {
// inode at this level has changed. Update startAfters to point to
// the next dir at the parent level (and dropping any startAfters
// at lower levels).
for (; i < startAfters.size(); ++i) {
startAfters.remove(startAfters.size() - 1);
}
break;
}
parent = curr.asDirectory();
}
return parent;
}
protected void readLock() {
dir.getFSNamesystem().readLock();
dir.readLock();
}
protected void readUnlock() {
dir.readUnlock();
dir.getFSNamesystem().readUnlock("FSTreeTraverser");
}
protected abstract void checkPauseForTesting() throws InterruptedException;
/**
* Process an Inode. Add to current batch if it's a file, no-op otherwise.
*
* @param inode
* the inode
* @return true if inode is added to currentBatch and should be process for
* next operation. false otherwise: could be inode is not a file.
* @throws IOException
* @throws InterruptedException
*/
protected abstract boolean processFileInode(INode inode,
TraverseInfo traverseInfo) throws IOException, InterruptedException;
/**
* Check whether current batch can be submitted for the processing.
*
* @return true if batch size meets meet the condition, otherwise false.
*/
protected abstract boolean shouldSubmitCurrentBatch();
/**
* Check whether inode is ready for traverse. Throws IOE if it's not.
*
* @param startId
* Id of the start inode.
* @throws IOException
*/
protected abstract void checkINodeReady(long startId) throws IOException;
/**
* Submit the current batch for processing.
*
* @param startId
* Id of the start inode.
* @throws IOException
* @throws InterruptedException
*/
protected abstract void submitCurrentBatch(long startId)
throws IOException, InterruptedException;
/**
* Throttles the FSTreeTraverser.
*
* @throws InterruptedException
*/
protected abstract void throttle() throws InterruptedException;
/**
* Check whether dir is traversable or not.
*
* @param inode
* Dir inode
* @return true if dir is traversable otherwise false.
* @throws IOException
*/
protected abstract boolean canTraverseDir(INode inode) throws IOException;
/**
* Class will represent the additional info required for traverse.
*/
public static class TraverseInfo {
}
}

View File

@ -30,18 +30,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
@ -118,6 +116,8 @@ public class ReencryptionHandler implements Runnable {
// be single-threaded, see class javadoc for more details. // be single-threaded, see class javadoc for more details.
private ReencryptionBatch currentBatch; private ReencryptionBatch currentBatch;
private final ReencryptionPendingInodeIdCollector traverser;
private final ReencryptionUpdater reencryptionUpdater; private final ReencryptionUpdater reencryptionUpdater;
private ExecutorService updaterExecutor; private ExecutorService updaterExecutor;
@ -186,16 +186,6 @@ public class ReencryptionHandler implements Runnable {
reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count); reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
} }
private synchronized void checkPauseForTesting() throws InterruptedException {
assert !dir.hasReadLock();
assert !dir.getFSNamesystem().hasReadLock();
while (shouldPauseForTesting) {
LOG.info("Sleeping in the re-encrypt handler for unit test.");
wait();
LOG.info("Continuing re-encrypt handler after pausing.");
}
}
ReencryptionHandler(final EncryptionZoneManager ezMgr, ReencryptionHandler(final EncryptionZoneManager ezMgr,
final Configuration conf) { final Configuration conf) {
this.ezManager = ezMgr; this.ezManager = ezMgr;
@ -256,6 +246,7 @@ public class ReencryptionHandler implements Runnable {
reencryptionUpdater = reencryptionUpdater =
new ReencryptionUpdater(dir, batchService, this, conf); new ReencryptionUpdater(dir, batchService, this, conf);
currentBatch = new ReencryptionBatch(reencryptBatchSize); currentBatch = new ReencryptionBatch(reencryptBatchSize);
traverser = new ReencryptionPendingInodeIdCollector(dir, this, conf);
} }
ReencryptionStatus getReencryptionStatus() { ReencryptionStatus getReencryptionStatus() {
@ -339,7 +330,7 @@ public class ReencryptionHandler implements Runnable {
synchronized (this) { synchronized (this) {
wait(interval); wait(interval);
} }
checkPauseForTesting(); traverser.checkPauseForTesting();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.info("Re-encrypt handler interrupted. Exiting"); LOG.info("Re-encrypt handler interrupted. Exiting");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -397,7 +388,7 @@ public class ReencryptionHandler implements Runnable {
final INode zoneNode; final INode zoneNode;
final ZoneReencryptionStatus zs; final ZoneReencryptionStatus zs;
readLock(); traverser.readLock();
try { try {
zoneNode = dir.getInode(zoneId); zoneNode = dir.getInode(zoneId);
// start re-encrypting the zone from the beginning // start re-encrypting the zone from the beginning
@ -419,18 +410,19 @@ public class ReencryptionHandler implements Runnable {
zoneId); zoneId);
if (zs.getLastCheckpointFile() == null) { if (zs.getLastCheckpointFile() == null) {
// new re-encryption // new re-encryption
reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME, traverser.traverseDir(zoneNode.asDirectory(), zoneId,
zs.getEzKeyVersionName()); HdfsFileStatus.EMPTY_NAME,
new ZoneTraverseInfo(zs.getEzKeyVersionName()));
} else { } else {
// resuming from a past re-encryption // resuming from a past re-encryption
restoreFromLastProcessedFile(zoneId, zs); restoreFromLastProcessedFile(zoneId, zs);
} }
// save the last batch and mark complete // save the last batch and mark complete
submitCurrentBatch(zoneId); traverser.submitCurrentBatch(zoneId);
LOG.info("Submission completed of zone {} for re-encryption.", zoneId); LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
reencryptionUpdater.markZoneSubmissionDone(zoneId); reencryptionUpdater.markZoneSubmissionDone(zoneId);
} finally { } finally {
readUnlock(); traverser.readUnlock();
} }
} }
@ -479,131 +471,8 @@ public class ReencryptionHandler implements Runnable {
dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ); dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
parent = lpfIIP.getLastINode().getParent(); parent = lpfIIP.getLastINode().getParent();
startAfter = lpfIIP.getLastINode().getLocalNameBytes(); startAfter = lpfIIP.getLastINode().getLocalNameBytes();
reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName()); traverser.traverseDir(parent, zoneId, startAfter,
} new ZoneTraverseInfo(zs.getEzKeyVersionName()));
/**
* Iterate through all files directly inside parent, and recurse down
* directories. The listing is done in batch, and can optionally start after
* a position.
* <p>
* Each batch is then send to the threadpool, where KMS will be contacted and
* edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
* from the threadpool.
* <p>
* The iteration of the inode tree is done in a depth-first fashion. But
* instead of holding all INodeDirectory's in memory on the fly, only the
* path components to the current inode is held. This is to reduce memory
* consumption.
*
* @param parent The inode id of parent directory
* @param zoneId Id of the EZ inode
* @param startAfter Full path of a file the re-encrypt should start after.
* @throws IOException
* @throws InterruptedException
*/
private void reencryptDir(final INodeDirectory parent, final long zoneId,
byte[] startAfter, final String ezKeyVerName)
throws IOException, InterruptedException {
List<byte[]> startAfters = new ArrayList<>();
if (parent == null) {
return;
}
INode curr = parent;
// construct startAfters all the way up to the zone inode.
startAfters.add(startAfter);
while (curr.getId() != zoneId) {
startAfters.add(0, curr.getLocalNameBytes());
curr = curr.getParent();
}
curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
while (!startAfters.isEmpty()) {
if (curr == null) {
// lock was reacquired, re-resolve path.
curr = resolvePaths(zoneId, startAfters);
}
curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
}
}
/**
* Resolve the cursor of re-encryption to an inode.
* <p>
* The parent of the lowest level startAfter is returned. If somewhere in the
* middle of startAfters changed, the parent of the lowest unchanged level is
* returned.
*
* @param zoneId Id of the EZ inode.
* @param startAfters the cursor, represented by a list of path bytes.
* @return the parent inode corresponding to the startAfters, or null if
* the EZ node (furthest parent) is deleted.
*/
private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
throws IOException {
// If the readlock was reacquired, we need to resolve the paths again
// in case things have changed. If our cursor file/dir is changed,
// continue from the next one.
INode zoneNode = dir.getInode(zoneId);
if (zoneNode == null) {
throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
}
INodeDirectory parent = zoneNode.asDirectory();
for (int i = 0; i < startAfters.size(); ++i) {
if (i == startAfters.size() - 1) {
// last startAfter does not need to be resolved, since search for
// nextChild will cover that automatically.
break;
}
INode curr =
parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
if (curr == null) {
// inode at this level has changed. Update startAfters to point to
// the next dir at the parent level (and dropping any startAfters
// at lower levels).
for (; i < startAfters.size(); ++i) {
startAfters.remove(startAfters.size() - 1);
}
break;
}
parent = curr.asDirectory();
}
return parent;
}
/**
* Submit the current batch to the thread pool.
*
* @param zoneId Id of the EZ INode
* @throws IOException
* @throws InterruptedException
*/
private void submitCurrentBatch(final long zoneId)
throws IOException, InterruptedException {
assert dir.hasReadLock();
if (currentBatch.isEmpty()) {
return;
}
ZoneSubmissionTracker zst;
synchronized (this) {
zst = submissions.get(zoneId);
if (zst == null) {
zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst);
}
}
Future future = batchService
.submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
zst.addTask(future);
LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
currentBatch = new ReencryptionBatch(reencryptBatchSize);
// flip the pause flag if this is nth submission.
// The actual pause need to happen outside of the lock.
if (pauseAfterNthSubmission > 0) {
if (--pauseAfterNthSubmission == 0) {
shouldPauseForTesting = true;
}
}
} }
final class ReencryptionBatch { final class ReencryptionBatch {
@ -711,248 +580,6 @@ public class ReencryptionHandler implements Runnable {
} }
} }
/**
* Iterates the parent directory, and add direct children files to
* current batch. If batch size meets configured threshold, a Callable
* is created and sent to the thread pool, which will communicate to the KMS
* to get new edeks.
* <p>
* Locks could be released and reacquired when a Callable is created.
*
* @param zoneId Id of the EZ INode
* @return The inode which was just processed, if lock is held in the entire
* process. Null if lock is released.
* @throws IOException
* @throws InterruptedException
*/
private INode reencryptDirInt(final long zoneId, INode curr,
List<byte[]> startAfters, final String ezKeyVerName)
throws IOException, InterruptedException {
assert dir.hasReadLock();
assert dir.getFSNamesystem().hasReadLock();
Preconditions.checkNotNull(curr, "Current inode can't be null");
checkZoneReady(zoneId);
final INodeDirectory parent =
curr.isDirectory() ? curr.asDirectory() : curr.getParent();
ReadOnlyList<INode> children =
parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
}
final byte[] startAfter = startAfters.get(startAfters.size() - 1);
boolean lockReleased = false;
for (int i = INodeDirectory.nextChild(children, startAfter);
i < children.size(); ++i) {
final INode inode = children.get(i);
if (!reencryptINode(inode, ezKeyVerName)) {
// inode wasn't added for re-encryption. Recurse down if it's a dir,
// skip otherwise.
if (!inode.isDirectory()) {
continue;
}
if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
// nested EZ, ignore.
LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
inode.getFullPathName(), inode.getId());
continue;
}
// add 1 level to the depth-first search.
curr = inode;
if (!startAfters.isEmpty()) {
startAfters.remove(startAfters.size() - 1);
startAfters.add(curr.getLocalNameBytes());
}
startAfters.add(HdfsFileStatus.EMPTY_NAME);
return lockReleased ? null : curr;
}
if (currentBatch.size() >= reencryptBatchSize) {
final byte[] currentStartAfter = inode.getLocalNameBytes();
final String parentPath = parent.getFullPathName();
submitCurrentBatch(zoneId);
lockReleased = true;
readUnlock();
try {
throttle();
checkPauseForTesting();
} finally {
readLock();
}
checkZoneReady(zoneId);
// Things could have changed when the lock was released.
// Re-resolve the parent inode.
FSPermissionChecker pc = dir.getPermissionChecker();
INode newParent =
dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
.getLastINode();
if (newParent == null || !newParent.equals(parent)) {
// parent dir is deleted or recreated. We're done.
return null;
}
children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
// -1 to counter the ++ on the for loop
i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
}
}
// Successfully finished this dir, adjust pointers to 1 level up, and
// startAfter this dir.
startAfters.remove(startAfters.size() - 1);
if (!startAfters.isEmpty()) {
startAfters.remove(startAfters.size() - 1);
startAfters.add(curr.getLocalNameBytes());
}
curr = curr.getParent();
return lockReleased ? null : curr;
}
private void readLock() {
dir.getFSNamesystem().readLock();
dir.readLock();
throttleTimerLocked.start();
}
private void readUnlock() {
dir.readUnlock();
dir.getFSNamesystem().readUnlock("reencryptHandler");
throttleTimerLocked.stop();
}
/**
* Throttles the ReencryptionHandler in 3 aspects:
* 1. Prevents generating more Callables than the CPU could possibly handle.
* 2. Prevents generating more Callables than the ReencryptionUpdater can
* handle, under its own throttling
* 3. Prevents contending FSN/FSD read locks. This is done based on the
* DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
* <p>
* Item 1 and 2 are to control NN heap usage.
*
* @throws InterruptedException
*/
@VisibleForTesting
void throttle() throws InterruptedException {
// 1.
final int numCores = Runtime.getRuntime().availableProcessors();
if (taskQueue.size() >= numCores) {
LOG.debug("Re-encryption handler throttling because queue size {} is"
+ "larger than number of cores {}", taskQueue.size(), numCores);
while (taskQueue.size() >= numCores) {
Thread.sleep(100);
}
}
// 2. if tasks are piling up on the updater, don't create new callables
// until the queue size goes down.
final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
int numTasks = numTasksSubmitted();
if (numTasks >= maxTasksPiled) {
LOG.debug("Re-encryption handler throttling because total tasks pending"
+ " re-encryption updater is {}", numTasks);
while (numTasks >= maxTasksPiled) {
Thread.sleep(500);
numTasks = numTasksSubmitted();
}
}
// 3.
if (throttleLimitHandlerRatio >= 1.0) {
return;
}
final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
* throttleLimitHandlerRatio);
final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled()) {
LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+ " throttleTimerAll:{}", expect, actual,
throttleTimerAll.now(TimeUnit.MILLISECONDS));
}
if (expect - actual < 0) {
// in case throttleLimitHandlerRatio is very small, expect will be 0.
// so sleepMs should not be calculated from expect, to really meet the
// ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
// should be 1000 - throttleTimerAll.now()
final long sleepMs =
(long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
.now(TimeUnit.MILLISECONDS);
LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
Thread.sleep(sleepMs);
}
throttleTimerAll.reset().start();
throttleTimerLocked.reset();
}
private synchronized int numTasksSubmitted() {
int ret = 0;
for (ZoneSubmissionTracker zst : submissions.values()) {
ret += zst.getTasks().size();
}
return ret;
}
/**
* Process an Inode for re-encryption. Add to current batch if it's a file,
* no-op otherwise.
*
* @param inode the inode
* @return true if inode is added to currentBatch and should be re-encrypted.
* false otherwise: could be inode is not a file, or inode's edek's
* key version is not changed.
* @throws IOException
* @throws InterruptedException
*/
private boolean reencryptINode(final INode inode, final String ezKeyVerName)
throws IOException, InterruptedException {
assert dir.hasReadLock();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
}
if (!inode.isFile()) {
return false;
}
FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
.getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
if (feInfo == null) {
LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+ "This is very likely a bug.", inode.getId());
return false;
}
if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("File {} skipped re-encryption because edek's key version"
+ " name is not changed.", inode.getFullPathName());
}
return false;
}
currentBatch.add(inode.asFile());
return true;
}
/**
* Check whether zone is ready for re-encryption. Throws IOE if it's not.
* 1. If EZ is deleted.
* 2. if the re-encryption is canceled.
* 3. If NN is not active or is in safe mode.
*
* @throws IOException if zone does not exist / is cancelled, or if NN is not
* ready for write.
*/
void checkZoneReady(final long zoneId)
throws RetriableException, SafeModeException, IOException {
final ZoneReencryptionStatus zs =
getReencryptionStatus().getZoneStatus(zoneId);
if (zs == null) {
throw new IOException("Zone " + zoneId + " status cannot be found.");
}
if (zs.isCanceled()) {
throw new IOException("Re-encryption is canceled for zone " + zoneId);
}
dir.getFSNamesystem()
.checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
// re-encryption should be cancelled when NN goes to standby. Just
// double checking for sanity.
dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
}
/** /**
* Called when a new zone is submitted for re-encryption. This will interrupt * Called when a new zone is submitted for re-encryption. This will interrupt
@ -963,4 +590,258 @@ public class ReencryptionHandler implements Runnable {
LOG.debug("Notifying handler for new re-encryption command."); LOG.debug("Notifying handler for new re-encryption command.");
this.notify(); this.notify();
} }
public ReencryptionPendingInodeIdCollector getTraverser() {
return traverser;
}
/**
* ReencryptionPendingInodeIdCollector which throttle based on configured
* throttle ratio.
*/
class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {
private final ReencryptionHandler reencryptionHandler;
ReencryptionPendingInodeIdCollector(FSDirectory dir,
ReencryptionHandler rHandler, Configuration conf) {
super(dir, conf);
this.reencryptionHandler = rHandler;
}
@Override
protected void checkPauseForTesting()
throws InterruptedException {
assert !dir.hasReadLock();
assert !dir.getFSNamesystem().hasReadLock();
while (shouldPauseForTesting) {
LOG.info("Sleeping in the re-encrypt handler for unit test.");
synchronized (reencryptionHandler) {
reencryptionHandler.wait(30000);
}
LOG.info("Continuing re-encrypt handler after pausing.");
}
}
/**
* Process an Inode for re-encryption. Add to current batch if it's a file,
* no-op otherwise.
*
* @param inode
* the inode
* @return true if inode is added to currentBatch and should be
* re-encrypted. false otherwise: could be inode is not a file, or
* inode's edek's key version is not changed.
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
throws IOException, InterruptedException {
assert dir.hasReadLock();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
}
if (!inode.isFile()) {
return false;
}
FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
dir, INodesInPath.fromINode(inode));
if (feInfo == null) {
LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+ "This is very likely a bug.", inode.getId());
return false;
}
if (traverseInfo instanceof ZoneTraverseInfo
&& ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
feInfo.getEzKeyVersionName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("File {} skipped re-encryption because edek's key version"
+ " name is not changed.", inode.getFullPathName());
}
return false;
}
currentBatch.add(inode.asFile());
return true;
}
/**
* Check whether zone is ready for re-encryption. Throws IOE if it's not. 1.
* If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
* active or is in safe mode.
*
* @throws IOException
* if zone does not exist / is cancelled, or if NN is not ready
* for write.
*/
@Override
protected void checkINodeReady(long zoneId) throws IOException {
final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
zoneId);
if (zs == null) {
throw new IOException("Zone " + zoneId + " status cannot be found.");
}
if (zs.isCanceled()) {
throw new IOException("Re-encryption is canceled for zone " + zoneId);
}
dir.getFSNamesystem().checkNameNodeSafeMode(
"NN is in safe mode, cannot re-encrypt.");
// re-encryption should be cancelled when NN goes to standby. Just
// double checking for sanity.
dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
}
/**
* Submit the current batch to the thread pool.
*
* @param zoneId
* Id of the EZ INode
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void submitCurrentBatch(final long zoneId) throws IOException,
InterruptedException {
if (currentBatch.isEmpty()) {
return;
}
ZoneSubmissionTracker zst;
synchronized (ReencryptionHandler.this) {
zst = submissions.get(zoneId);
if (zst == null) {
zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst);
}
}
Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
currentBatch, reencryptionHandler));
zst.addTask(future);
LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
currentBatch = new ReencryptionBatch(reencryptBatchSize);
// flip the pause flag if this is nth submission.
// The actual pause need to happen outside of the lock.
if (pauseAfterNthSubmission > 0) {
if (--pauseAfterNthSubmission == 0) {
shouldPauseForTesting = true;
}
}
}
/**
* Throttles the ReencryptionHandler in 3 aspects:
* 1. Prevents generating more Callables than the CPU could possibly
* handle.
* 2. Prevents generating more Callables than the ReencryptionUpdater
* can handle, under its own throttling.
* 3. Prevents contending FSN/FSD read locks. This is done based
* on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
* <p>
* Item 1 and 2 are to control NN heap usage.
*
* @throws InterruptedException
*/
@VisibleForTesting
@Override
protected void throttle() throws InterruptedException {
assert !dir.hasReadLock();
assert !dir.getFSNamesystem().hasReadLock();
final int numCores = Runtime.getRuntime().availableProcessors();
if (taskQueue.size() >= numCores) {
LOG.debug("Re-encryption handler throttling because queue size {} is"
+ "larger than number of cores {}", taskQueue.size(), numCores);
while (taskQueue.size() >= numCores) {
Thread.sleep(100);
}
}
// 2. if tasks are piling up on the updater, don't create new callables
// until the queue size goes down.
final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
int numTasks = numTasksSubmitted();
if (numTasks >= maxTasksPiled) {
LOG.debug("Re-encryption handler throttling because total tasks pending"
+ " re-encryption updater is {}", numTasks);
while (numTasks >= maxTasksPiled) {
Thread.sleep(500);
numTasks = numTasksSubmitted();
}
}
// 3.
if (throttleLimitHandlerRatio >= 1.0) {
return;
}
final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
* throttleLimitHandlerRatio);
final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled()) {
LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+ " throttleTimerAll:{}", expect, actual,
throttleTimerAll.now(TimeUnit.MILLISECONDS));
}
if (expect - actual < 0) {
// in case throttleLimitHandlerRatio is very small, expect will be 0.
// so sleepMs should not be calculated from expect, to really meet the
// ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
// should be 1000 - throttleTimerAll.now()
final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
- throttleTimerAll.now(TimeUnit.MILLISECONDS);
LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
Thread.sleep(sleepMs);
}
throttleTimerAll.reset().start();
throttleTimerLocked.reset();
}
private int numTasksSubmitted() {
int ret = 0;
synchronized (ReencryptionHandler.this) {
for (ZoneSubmissionTracker zst : submissions.values()) {
ret += zst.getTasks().size();
}
}
return ret;
}
@Override
public boolean shouldSubmitCurrentBatch() {
return currentBatch.size() >= reencryptBatchSize;
}
@Override
public boolean canTraverseDir(INode inode) throws IOException {
if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
// nested EZ, ignore.
LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
inode.getFullPathName(), inode.getId());
return false;
}
return true;
}
@Override
protected void readLock() {
super.readLock();
throttleTimerLocked.start();
}
@Override
protected void readUnlock() {
super.readUnlock();
throttleTimerLocked.stop();
}
}
private class ZoneTraverseInfo extends TraverseInfo {
private String ezKeyVerName;
ZoneTraverseInfo(String ezKeyVerName) {
this.ezKeyVerName = ezKeyVerName;
}
public String getEzKeyVerName() {
return ezKeyVerName;
}
}
} }

View File

@ -464,7 +464,7 @@ public final class ReencryptionUpdater implements Runnable {
final String zonePath; final String zonePath;
dir.writeLock(); dir.writeLock();
try { try {
handler.checkZoneReady(task.zoneId); handler.getTraverser().checkINodeReady(task.zoneId);
final INode zoneNode = dir.getInode(task.zoneId); final INode zoneNode = dir.getInode(task.zoneId);
if (zoneNode == null) { if (zoneNode == null) {
// ez removed. // ez removed.

View File

@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -64,7 +63,6 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -72,7 +70,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -75,6 +75,10 @@ public class TestReencryptionHandler {
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH); CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
Mockito.when(ezm.getProvider()).thenReturn( Mockito.when(ezm.getProvider()).thenReturn(
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp)); KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
FSDirectory fsd = Mockito.mock(FSDirectory.class);
FSNamesystem fns = Mockito.mock(FSNamesystem.class);
Mockito.when(fsd.getFSNamesystem()).thenReturn(fns);
Mockito.when(ezm.getFSDirectory()).thenReturn(fsd);
return new ReencryptionHandler(ezm, conf); return new ReencryptionHandler(ezm, conf);
} }
@ -99,7 +103,7 @@ public class TestReencryptionHandler {
Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked); Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
Whitebox.setInternalState(rh, "taskQueue", queue); Whitebox.setInternalState(rh, "taskQueue", queue);
final StopWatch sw = new StopWatch().start(); final StopWatch sw = new StopWatch().start();
rh.throttle(); rh.getTraverser().throttle();
sw.stop(); sw.stop();
assertTrue("should have throttled for at least 8 second", assertTrue("should have throttled for at least 8 second",
sw.now(TimeUnit.MILLISECONDS) > 8000); sw.now(TimeUnit.MILLISECONDS) > 8000);
@ -130,7 +134,7 @@ public class TestReencryptionHandler {
submissions = new HashMap<>(); submissions = new HashMap<>();
Whitebox.setInternalState(rh, "submissions", submissions); Whitebox.setInternalState(rh, "submissions", submissions);
StopWatch sw = new StopWatch().start(); StopWatch sw = new StopWatch().start();
rh.throttle(); rh.getTraverser().throttle();
sw.stop(); sw.stop();
assertTrue("should not have throttled", assertTrue("should not have throttled",
sw.now(TimeUnit.MILLISECONDS) < 1000); sw.now(TimeUnit.MILLISECONDS) < 1000);
@ -189,7 +193,7 @@ public class TestReencryptionHandler {
Whitebox.setInternalState(rh, "submissions", submissions); Whitebox.setInternalState(rh, "submissions", submissions);
final StopWatch sw = new StopWatch().start(); final StopWatch sw = new StopWatch().start();
removeTaskThread.start(); removeTaskThread.start();
rh.throttle(); rh.getTraverser().throttle();
sw.stop(); sw.stop();
LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS)); LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS));
assertTrue("should have throttled for at least 3 second", assertTrue("should have throttled for at least 3 second",