HDFS-13731. ReencryptionUpdater fails with ConcurrentModificationException during processCheckpoints. Contributed by Zsolt Venczel.

(cherry picked from commit 3e18b957eb)
This commit is contained in:
Zsolt Venczel 2018-08-28 15:11:58 -07:00 committed by Xiao Chen
parent 450ba6790d
commit 5cbb9b1ca9
2 changed files with 30 additions and 28 deletions

View File

@ -713,10 +713,10 @@ public class ReencryptionHandler implements Runnable {
zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst);
}
Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
currentBatch, reencryptionHandler));
zst.addTask(future);
}
Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
currentBatch, reencryptionHandler));
zst.addTask(future);
LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
currentBatch = new ReencryptionBatch(reencryptBatchSize);

View File

@ -383,32 +383,34 @@ public final class ReencryptionUpdater implements Runnable {
final LinkedList<Future> tasks = tracker.getTasks();
final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
ListIterator<Future> iter = tasks.listIterator();
while (iter.hasNext()) {
Future<ReencryptionTask> curr = iter.next();
if (curr.isCancelled()) {
break;
synchronized (handler) {
while (iter.hasNext()) {
Future<ReencryptionTask> curr = iter.next();
if (curr.isCancelled()) {
break;
}
if (!curr.isDone() || !curr.get().processed) {
// still has earlier tasks not completed, skip here.
break;
}
ReencryptionTask task = curr.get();
LOG.debug("Updating re-encryption checkpoint with completed task."
+ " last: {} size:{}.", task.lastFile, task.batch.size());
assert zoneId == task.zoneId;
try {
final XAttr xattr = FSDirEncryptionZoneOp
.updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
task.numFilesUpdated, task.numFailures);
xAttrs.clear();
xAttrs.add(xattr);
} catch (IOException ie) {
LOG.warn("Failed to update re-encrypted progress to xattr" +
" for zone {}", zonePath, ie);
++task.numFailures;
}
++tracker.numCheckpointed;
iter.remove();
}
if (!curr.isDone() || !curr.get().processed) {
// still has earlier tasks not completed, skip here.
break;
}
ReencryptionTask task = curr.get();
LOG.debug("Updating re-encryption checkpoint with completed task."
+ " last: {} size:{}.", task.lastFile, task.batch.size());
assert zoneId == task.zoneId;
try {
final XAttr xattr = FSDirEncryptionZoneOp
.updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
task.numFilesUpdated, task.numFailures);
xAttrs.clear();
xAttrs.add(xattr);
} catch (IOException ie) {
LOG.warn("Failed to update re-encrypted progress to xattr for zone {}",
zonePath, ie);
++task.numFailures;
}
++tracker.numCheckpointed;
iter.remove();
}
if (tracker.isCompleted()) {
LOG.debug("Removed re-encryption tracker for zone {} because it completed"