diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java index 690a0e9f6e8..d641ea1408c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java @@ -39,6 +39,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -72,6 +73,7 @@ public final class ReencryptionUpdater implements Runnable { private final StopWatch throttleTimerLocked = new StopWatch(); private volatile long faultRetryInterval = 60000; + private volatile boolean isRunning = false; /** * Class to track re-encryption submissions of a single zone. It contains @@ -201,6 +203,11 @@ public final class ReencryptionUpdater implements Runnable { pauseZoneId = zoneId; } + @VisibleForTesting + boolean isRunning() { + return isRunning; + } + private final FSDirectory dir; private final CompletionService batchService; private final ReencryptionHandler handler; @@ -242,6 +249,7 @@ public final class ReencryptionUpdater implements Runnable { @Override public void run() { + isRunning = true; throttleTimerAll.start(); while (true) { try { @@ -250,11 +258,13 @@ public final class ReencryptionUpdater implements Runnable { } catch (InterruptedException ie) { LOG.warn("Re-encryption updater thread interrupted. Exiting."); Thread.currentThread().interrupt(); + isRunning = false; return; - } catch (IOException ioe) { - LOG.warn("Re-encryption updater thread exception.", ioe); + } catch (IOException | CancellationException e) { + LOG.warn("Re-encryption updater thread exception.", e); } catch (Throwable t) { LOG.error("Re-encryption updater thread exiting.", t); + isRunning = false; return; } } @@ -405,6 +415,7 @@ public final class ReencryptionUpdater implements Runnable { if (completed.isCancelled()) { LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}", task.zoneId, task.lastFile); + return; } boolean shouldRetry; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java index 7ba3f9173dd..4b5be2e1553 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; @@ -1086,12 +1087,9 @@ public class TestReencryption { getEzManager().resumeReencryptForTesting(); Thread.sleep(3000); - EncryptionZoneManager ezm = getEzManager(); - ReencryptionHandler handler = (ReencryptionHandler) Whitebox - .getInternalState(ezm, "reencryptionHandler"); Map tasks = (Map) Whitebox - .getInternalState(handler, "submissions"); + .getInternalState(getHandler(), "submissions"); List futures = new LinkedList<>(); for (ZoneSubmissionTracker zst : tasks.values()) { for (Future f : zst.getTasks()) { @@ -1493,6 +1491,88 @@ public class TestReencryption { assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted()); } + @Test + public void testCancelFuture() throws Exception { + final AtomicBoolean callableRunning = new AtomicBoolean(false); + class MyInjector extends EncryptionFaultInjector { + private volatile int exceptionCount = 0; + + MyInjector(int numFailures) { + exceptionCount = numFailures; + } + + @Override + public void reencryptEncryptedKeys() throws IOException { + if (exceptionCount > 0) { + exceptionCount--; + try { + callableRunning.set(true); + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException ie) { + LOG.info("Fault injector interrupted", ie); + } + } + } + } + final MyInjector injector = new MyInjector(1); + EncryptionFaultInjector.instance = injector; + + /* Setup test dir: + * /zones/zone/[0-9] + * /dir/f + */ + final int len = 8196; + final Path zoneParent = new Path("/zones"); + final Path zone = new Path(zoneParent, "zone"); + fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true); + dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH); + for (int i = 0; i < 10; ++i) { + DFSTestUtil + .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1, + 0xFEED); + } + final Path subdir = new Path("/dir"); + fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true); + DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED); + + // re-encrypt 10 files, so 2 callables. Hang 1, pause the updater so the + // callable is taken from the executor but not processed. + fsn.getProvider().rollNewVersion(TEST_KEY); + fsn.getProvider().flush(); + getEzManager().pauseReencryptForTesting(); + dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START); + waitForQueuedZones(1); + getEzManager().resumeReencryptForTesting(); + + LOG.info("Waiting for re-encrypt callables to run"); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return callableRunning.get(); + } + }, 100, 10000); + + getEzManager().pauseReencryptUpdaterForTesting(); + dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL); + + // now resume updater and verify status. + getEzManager().resumeReencryptUpdaterForTesting(); + waitForZoneCompletes(zone.toString()); + + RemoteIterator it = + dfsAdmin.listReencryptionStatus(); + assertTrue(it.hasNext()); + final ZoneReencryptionStatus zs = it.next(); + assertEquals(zone.toString(), zs.getZoneName()); + assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState()); + assertTrue(zs.isCanceled()); + assertTrue(zs.getCompletionTime() > 0); + assertTrue(zs.getCompletionTime() > zs.getSubmissionTime()); + assertEquals(0, zs.getFilesReencrypted()); + + assertTrue(getUpdater().isRunning()); + } + @Test public void testReencryptCancelForUpdater() throws Exception { /* Setup test dir: @@ -1822,12 +1902,7 @@ public class TestReencryption { fsn.getProvider().rollNewVersion(TEST_KEY); fsn.getProvider().flush(); - final EncryptionZoneManager ezm = getEzManager(); - final ReencryptionHandler handler = (ReencryptionHandler) Whitebox - .getInternalState(ezm, "reencryptionHandler"); - final ReencryptionUpdater updater = (ReencryptionUpdater) Whitebox - .getInternalState(handler, "reencryptionUpdater"); - Whitebox.setInternalState(updater, "faultRetryInterval", 50); + Whitebox.setInternalState(getUpdater(), "faultRetryInterval", 50); dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START); waitForReencryptedZones(1); assertEquals(0, injector.exceptionCount); @@ -1844,4 +1919,14 @@ public class TestReencryption { assertEquals(10, zs.getFilesReencrypted()); assertEquals(0, zs.getNumReencryptionFailures()); } + + private ReencryptionHandler getHandler() { + return (ReencryptionHandler) Whitebox + .getInternalState(getEzManager(), "reencryptionHandler"); + } + + private ReencryptionUpdater getUpdater() { + return (ReencryptionUpdater) Whitebox + .getInternalState(getHandler(), "reencryptionUpdater"); + } } \ No newline at end of file