HDFS-12383. Re-encryption updater should handle canceled tasks better.
This commit is contained in:
parent
275980bb1e
commit
633c1ea455
|
@ -39,6 +39,7 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -72,6 +73,7 @@ public final class ReencryptionUpdater implements Runnable {
|
|||
private final StopWatch throttleTimerLocked = new StopWatch();
|
||||
|
||||
private volatile long faultRetryInterval = 60000;
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
/**
|
||||
* Class to track re-encryption submissions of a single zone. It contains
|
||||
|
@ -201,6 +203,11 @@ public final class ReencryptionUpdater implements Runnable {
|
|||
pauseZoneId = zoneId;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isRunning() {
|
||||
return isRunning;
|
||||
}
|
||||
|
||||
private final FSDirectory dir;
|
||||
private final CompletionService<ReencryptionTask> batchService;
|
||||
private final ReencryptionHandler handler;
|
||||
|
@ -242,6 +249,7 @@ public final class ReencryptionUpdater implements Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
isRunning = true;
|
||||
throttleTimerAll.start();
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -250,11 +258,13 @@ public final class ReencryptionUpdater implements Runnable {
|
|||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Re-encryption updater thread interrupted. Exiting.");
|
||||
Thread.currentThread().interrupt();
|
||||
isRunning = false;
|
||||
return;
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Re-encryption updater thread exception.", ioe);
|
||||
} catch (IOException | CancellationException e) {
|
||||
LOG.warn("Re-encryption updater thread exception.", e);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Re-encryption updater thread exiting.", t);
|
||||
isRunning = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -405,6 +415,7 @@ public final class ReencryptionUpdater implements Runnable {
|
|||
if (completed.isCancelled()) {
|
||||
LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
|
||||
task.zoneId, task.lastFile);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean shouldRetry;
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
|
@ -1086,12 +1087,9 @@ public class TestReencryption {
|
|||
getEzManager().resumeReencryptForTesting();
|
||||
|
||||
Thread.sleep(3000);
|
||||
EncryptionZoneManager ezm = getEzManager();
|
||||
ReencryptionHandler handler = (ReencryptionHandler) Whitebox
|
||||
.getInternalState(ezm, "reencryptionHandler");
|
||||
Map<Long, ZoneSubmissionTracker> tasks =
|
||||
(Map<Long, ZoneSubmissionTracker>) Whitebox
|
||||
.getInternalState(handler, "submissions");
|
||||
.getInternalState(getHandler(), "submissions");
|
||||
List<Future> futures = new LinkedList<>();
|
||||
for (ZoneSubmissionTracker zst : tasks.values()) {
|
||||
for (Future f : zst.getTasks()) {
|
||||
|
@ -1493,6 +1491,88 @@ public class TestReencryption {
|
|||
assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelFuture() throws Exception {
|
||||
final AtomicBoolean callableRunning = new AtomicBoolean(false);
|
||||
class MyInjector extends EncryptionFaultInjector {
|
||||
private volatile int exceptionCount = 0;
|
||||
|
||||
MyInjector(int numFailures) {
|
||||
exceptionCount = numFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reencryptEncryptedKeys() throws IOException {
|
||||
if (exceptionCount > 0) {
|
||||
exceptionCount--;
|
||||
try {
|
||||
callableRunning.set(true);
|
||||
Thread.sleep(Long.MAX_VALUE);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Fault injector interrupted", ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
final MyInjector injector = new MyInjector(1);
|
||||
EncryptionFaultInjector.instance = injector;
|
||||
|
||||
/* Setup test dir:
|
||||
* /zones/zone/[0-9]
|
||||
* /dir/f
|
||||
*/
|
||||
final int len = 8196;
|
||||
final Path zoneParent = new Path("/zones");
|
||||
final Path zone = new Path(zoneParent, "zone");
|
||||
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
|
||||
dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
DFSTestUtil
|
||||
.createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
|
||||
0xFEED);
|
||||
}
|
||||
final Path subdir = new Path("/dir");
|
||||
fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
|
||||
DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
|
||||
|
||||
// re-encrypt 10 files, so 2 callables. Hang 1, pause the updater so the
|
||||
// callable is taken from the executor but not processed.
|
||||
fsn.getProvider().rollNewVersion(TEST_KEY);
|
||||
fsn.getProvider().flush();
|
||||
getEzManager().pauseReencryptForTesting();
|
||||
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
||||
waitForQueuedZones(1);
|
||||
getEzManager().resumeReencryptForTesting();
|
||||
|
||||
LOG.info("Waiting for re-encrypt callables to run");
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return callableRunning.get();
|
||||
}
|
||||
}, 100, 10000);
|
||||
|
||||
getEzManager().pauseReencryptUpdaterForTesting();
|
||||
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
|
||||
|
||||
// now resume updater and verify status.
|
||||
getEzManager().resumeReencryptUpdaterForTesting();
|
||||
waitForZoneCompletes(zone.toString());
|
||||
|
||||
RemoteIterator<ZoneReencryptionStatus> it =
|
||||
dfsAdmin.listReencryptionStatus();
|
||||
assertTrue(it.hasNext());
|
||||
final ZoneReencryptionStatus zs = it.next();
|
||||
assertEquals(zone.toString(), zs.getZoneName());
|
||||
assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
|
||||
assertTrue(zs.isCanceled());
|
||||
assertTrue(zs.getCompletionTime() > 0);
|
||||
assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
|
||||
assertEquals(0, zs.getFilesReencrypted());
|
||||
|
||||
assertTrue(getUpdater().isRunning());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReencryptCancelForUpdater() throws Exception {
|
||||
/* Setup test dir:
|
||||
|
@ -1822,12 +1902,7 @@ public class TestReencryption {
|
|||
fsn.getProvider().rollNewVersion(TEST_KEY);
|
||||
fsn.getProvider().flush();
|
||||
|
||||
final EncryptionZoneManager ezm = getEzManager();
|
||||
final ReencryptionHandler handler = (ReencryptionHandler) Whitebox
|
||||
.getInternalState(ezm, "reencryptionHandler");
|
||||
final ReencryptionUpdater updater = (ReencryptionUpdater) Whitebox
|
||||
.getInternalState(handler, "reencryptionUpdater");
|
||||
Whitebox.setInternalState(updater, "faultRetryInterval", 50);
|
||||
Whitebox.setInternalState(getUpdater(), "faultRetryInterval", 50);
|
||||
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
||||
waitForReencryptedZones(1);
|
||||
assertEquals(0, injector.exceptionCount);
|
||||
|
@ -1844,4 +1919,14 @@ public class TestReencryption {
|
|||
assertEquals(10, zs.getFilesReencrypted());
|
||||
assertEquals(0, zs.getNumReencryptionFailures());
|
||||
}
|
||||
|
||||
private ReencryptionHandler getHandler() {
|
||||
return (ReencryptionHandler) Whitebox
|
||||
.getInternalState(getEzManager(), "reencryptionHandler");
|
||||
}
|
||||
|
||||
private ReencryptionUpdater getUpdater() {
|
||||
return (ReencryptionUpdater) Whitebox
|
||||
.getInternalState(getHandler(), "reencryptionUpdater");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue