HDFS-12383. Re-encryption updater should handle canceled tasks better.
This commit is contained in:
parent
c0b19a93a5
commit
8b2235b367
|
@ -39,6 +39,7 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ListIterator;
|
import java.util.ListIterator;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -72,6 +73,7 @@ public final class ReencryptionUpdater implements Runnable {
|
||||||
private final StopWatch throttleTimerLocked = new StopWatch();
|
private final StopWatch throttleTimerLocked = new StopWatch();
|
||||||
|
|
||||||
private volatile long faultRetryInterval = 60000;
|
private volatile long faultRetryInterval = 60000;
|
||||||
|
private volatile boolean isRunning = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to track re-encryption submissions of a single zone. It contains
|
* Class to track re-encryption submissions of a single zone. It contains
|
||||||
|
@ -201,6 +203,11 @@ public final class ReencryptionUpdater implements Runnable {
|
||||||
pauseZoneId = zoneId;
|
pauseZoneId = zoneId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isRunning() {
|
||||||
|
return isRunning;
|
||||||
|
}
|
||||||
|
|
||||||
private final FSDirectory dir;
|
private final FSDirectory dir;
|
||||||
private final CompletionService<ReencryptionTask> batchService;
|
private final CompletionService<ReencryptionTask> batchService;
|
||||||
private final ReencryptionHandler handler;
|
private final ReencryptionHandler handler;
|
||||||
|
@ -242,6 +249,7 @@ public final class ReencryptionUpdater implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
isRunning = true;
|
||||||
throttleTimerAll.start();
|
throttleTimerAll.start();
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
@ -250,11 +258,13 @@ public final class ReencryptionUpdater implements Runnable {
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.warn("Re-encryption updater thread interrupted. Exiting.");
|
LOG.warn("Re-encryption updater thread interrupted. Exiting.");
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
isRunning = false;
|
||||||
return;
|
return;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException | CancellationException e) {
|
||||||
LOG.warn("Re-encryption updater thread exception.", ioe);
|
LOG.warn("Re-encryption updater thread exception.", e);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("Re-encryption updater thread exiting.", t);
|
LOG.error("Re-encryption updater thread exiting.", t);
|
||||||
|
isRunning = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,6 +415,7 @@ public final class ReencryptionUpdater implements Runnable {
|
||||||
if (completed.isCancelled()) {
|
if (completed.isCancelled()) {
|
||||||
LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
|
LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
|
||||||
task.zoneId, task.lastFile);
|
task.zoneId, task.lastFile);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean shouldRetry;
|
boolean shouldRetry;
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
|
@ -1086,12 +1087,9 @@ public class TestReencryption {
|
||||||
getEzManager().resumeReencryptForTesting();
|
getEzManager().resumeReencryptForTesting();
|
||||||
|
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
EncryptionZoneManager ezm = getEzManager();
|
|
||||||
ReencryptionHandler handler = (ReencryptionHandler) Whitebox
|
|
||||||
.getInternalState(ezm, "reencryptionHandler");
|
|
||||||
Map<Long, ZoneSubmissionTracker> tasks =
|
Map<Long, ZoneSubmissionTracker> tasks =
|
||||||
(Map<Long, ZoneSubmissionTracker>) Whitebox
|
(Map<Long, ZoneSubmissionTracker>) Whitebox
|
||||||
.getInternalState(handler, "submissions");
|
.getInternalState(getHandler(), "submissions");
|
||||||
List<Future> futures = new LinkedList<>();
|
List<Future> futures = new LinkedList<>();
|
||||||
for (ZoneSubmissionTracker zst : tasks.values()) {
|
for (ZoneSubmissionTracker zst : tasks.values()) {
|
||||||
for (Future f : zst.getTasks()) {
|
for (Future f : zst.getTasks()) {
|
||||||
|
@ -1493,6 +1491,88 @@ public class TestReencryption {
|
||||||
assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
|
assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelFuture() throws Exception {
|
||||||
|
final AtomicBoolean callableRunning = new AtomicBoolean(false);
|
||||||
|
class MyInjector extends EncryptionFaultInjector {
|
||||||
|
private volatile int exceptionCount = 0;
|
||||||
|
|
||||||
|
MyInjector(int numFailures) {
|
||||||
|
exceptionCount = numFailures;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reencryptEncryptedKeys() throws IOException {
|
||||||
|
if (exceptionCount > 0) {
|
||||||
|
exceptionCount--;
|
||||||
|
try {
|
||||||
|
callableRunning.set(true);
|
||||||
|
Thread.sleep(Long.MAX_VALUE);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.info("Fault injector interrupted", ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final MyInjector injector = new MyInjector(1);
|
||||||
|
EncryptionFaultInjector.instance = injector;
|
||||||
|
|
||||||
|
/* Setup test dir:
|
||||||
|
* /zones/zone/[0-9]
|
||||||
|
* /dir/f
|
||||||
|
*/
|
||||||
|
final int len = 8196;
|
||||||
|
final Path zoneParent = new Path("/zones");
|
||||||
|
final Path zone = new Path(zoneParent, "zone");
|
||||||
|
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
|
||||||
|
dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
DFSTestUtil
|
||||||
|
.createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
|
||||||
|
0xFEED);
|
||||||
|
}
|
||||||
|
final Path subdir = new Path("/dir");
|
||||||
|
fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
|
||||||
|
DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
|
||||||
|
|
||||||
|
// re-encrypt 10 files, so 2 callables. Hang 1, pause the updater so the
|
||||||
|
// callable is taken from the executor but not processed.
|
||||||
|
fsn.getProvider().rollNewVersion(TEST_KEY);
|
||||||
|
fsn.getProvider().flush();
|
||||||
|
getEzManager().pauseReencryptForTesting();
|
||||||
|
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
||||||
|
waitForQueuedZones(1);
|
||||||
|
getEzManager().resumeReencryptForTesting();
|
||||||
|
|
||||||
|
LOG.info("Waiting for re-encrypt callables to run");
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return callableRunning.get();
|
||||||
|
}
|
||||||
|
}, 100, 10000);
|
||||||
|
|
||||||
|
getEzManager().pauseReencryptUpdaterForTesting();
|
||||||
|
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
|
||||||
|
|
||||||
|
// now resume updater and verify status.
|
||||||
|
getEzManager().resumeReencryptUpdaterForTesting();
|
||||||
|
waitForZoneCompletes(zone.toString());
|
||||||
|
|
||||||
|
RemoteIterator<ZoneReencryptionStatus> it =
|
||||||
|
dfsAdmin.listReencryptionStatus();
|
||||||
|
assertTrue(it.hasNext());
|
||||||
|
final ZoneReencryptionStatus zs = it.next();
|
||||||
|
assertEquals(zone.toString(), zs.getZoneName());
|
||||||
|
assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
|
||||||
|
assertTrue(zs.isCanceled());
|
||||||
|
assertTrue(zs.getCompletionTime() > 0);
|
||||||
|
assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
|
||||||
|
assertEquals(0, zs.getFilesReencrypted());
|
||||||
|
|
||||||
|
assertTrue(getUpdater().isRunning());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReencryptCancelForUpdater() throws Exception {
|
public void testReencryptCancelForUpdater() throws Exception {
|
||||||
/* Setup test dir:
|
/* Setup test dir:
|
||||||
|
@ -1822,12 +1902,7 @@ public class TestReencryption {
|
||||||
fsn.getProvider().rollNewVersion(TEST_KEY);
|
fsn.getProvider().rollNewVersion(TEST_KEY);
|
||||||
fsn.getProvider().flush();
|
fsn.getProvider().flush();
|
||||||
|
|
||||||
final EncryptionZoneManager ezm = getEzManager();
|
Whitebox.setInternalState(getUpdater(), "faultRetryInterval", 50);
|
||||||
final ReencryptionHandler handler = (ReencryptionHandler) Whitebox
|
|
||||||
.getInternalState(ezm, "reencryptionHandler");
|
|
||||||
final ReencryptionUpdater updater = (ReencryptionUpdater) Whitebox
|
|
||||||
.getInternalState(handler, "reencryptionUpdater");
|
|
||||||
Whitebox.setInternalState(updater, "faultRetryInterval", 50);
|
|
||||||
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
||||||
waitForReencryptedZones(1);
|
waitForReencryptedZones(1);
|
||||||
assertEquals(0, injector.exceptionCount);
|
assertEquals(0, injector.exceptionCount);
|
||||||
|
@ -1844,4 +1919,14 @@ public class TestReencryption {
|
||||||
assertEquals(10, zs.getFilesReencrypted());
|
assertEquals(10, zs.getFilesReencrypted());
|
||||||
assertEquals(0, zs.getNumReencryptionFailures());
|
assertEquals(0, zs.getNumReencryptionFailures());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ReencryptionHandler getHandler() {
|
||||||
|
return (ReencryptionHandler) Whitebox
|
||||||
|
.getInternalState(getEzManager(), "reencryptionHandler");
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReencryptionUpdater getUpdater() {
|
||||||
|
return (ReencryptionUpdater) Whitebox
|
||||||
|
.getInternalState(getHandler(), "reencryptionUpdater");
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue