HDFS-9405. Warmup NameNode EDEK caches in background thread. Contributed by Xiao Chen.

(cherry picked from commit e3bb38d625)
This commit is contained in:
Andrew Wang 2016-03-21 11:39:05 -07:00
parent d956e0a0bb
commit e826d860a7
10 changed files with 197 additions and 20 deletions

View File

@ -849,12 +849,8 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
} }
@VisibleForTesting @VisibleForTesting
public int getEncKeyQueueSize(String keyName) throws IOException { public int getEncKeyQueueSize(String keyName) {
try { return encKeyVersionQueue.getSize(keyName);
return encKeyVersionQueue.getSize(keyName);
} catch (ExecutionException e) {
throw new IOException(e);
}
} }
@Override @Override

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.crypto.key.kms; package org.apache.hadoop.crypto.key.kms;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -240,13 +242,19 @@ public class ValueQueue <E> {
} }
/** /**
* Get size of the Queue for keyName * Get size of the Queue for keyName. This is only used in unit tests.
* @param keyName the key name * @param keyName the key name
* @return int queue size * @return int queue size
* @throws ExecutionException
*/ */
public int getSize(String keyName) throws ExecutionException { public int getSize(String keyName) {
return keyQueues.get(keyName).size(); // We can't do keyQueues.get(keyName).size() here,
// since that will have the side effect of populating the cache.
Map<String, LinkedBlockingQueue<E>> map =
keyQueues.getAllPresent(Arrays.asList(keyName));
if (map.get(keyName) == null) {
return 0;
}
return map.get(keyName).size();
} }
/** /**

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.crypto.key;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -158,17 +157,12 @@ public class TestValueQueue {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
try {
int size = vq.getSize("k1"); int size = vq.getSize("k1");
if (size != 10) { if (size != 10) {
LOG.info("Current ValueQueue size is " + size); LOG.info("Current ValueQueue size is " + size);
return false; return false;
} }
return true; return true;
} catch (ExecutionException e) {
LOG.error("Exception when getSize.", e);
return false;
}
} }
}, 100, 3000); }, 100, 3000);
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num); Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);

View File

@ -715,6 +715,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI; HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms";
public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
public static final int DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000;
// Journal-node related configs. These are read on the JN side. // Journal-node related configs. These are read on the JN side.
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

View File

@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -380,4 +381,18 @@ public class EncryptionZoneManager {
public int getNumEncryptionZones() { public int getNumEncryptionZones() {
return encryptionZones.size(); return encryptionZones.size();
} }
/**
* @return a list of all key names.
*/
String[] getKeyNames() {
assert dir.hasReadLock();
String[] ret = new String[encryptionZones.size()];
int index = 0;
for (Map.Entry<Long, EncryptionZoneInt> entry : encryptionZones
.entrySet()) {
ret[index] = entry.getValue().getKeyName();
}
return ret;
}
} }

View File

@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.concurrent.ExecutorService;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -304,4 +305,86 @@ final class FSDirEncryptionZoneOp {
fsd.readUnlock(); fsd.readUnlock();
} }
} }
/**
* Proactively warm up the edek cache. We'll get all the edek key names,
* then launch up a separate thread to warm them up.
*/
static void warmUpEdekCache(final ExecutorService executor,
final FSDirectory fsd, final int delay, final int interval) {
fsd.readLock();
try {
String[] edeks = fsd.ezManager.getKeyNames();
executor.execute(
new EDEKCacheLoader(edeks, fsd.getProvider(), delay, interval));
} finally {
fsd.readUnlock();
}
}
/**
* EDEKCacheLoader is being run in a separate thread to loop through all the
* EDEKs and warm them up in the KMS cache.
*/
static class EDEKCacheLoader implements Runnable {
private final String[] keyNames;
private final KeyProviderCryptoExtension kp;
private int initialDelay;
private int retryInterval;
EDEKCacheLoader(final String[] names, final KeyProviderCryptoExtension kp,
final int delay, final int interval) {
this.keyNames = names;
this.kp = kp;
this.initialDelay = delay;
this.retryInterval = interval;
}
@Override
public void run() {
NameNode.LOG.info("Warming up {} EDEKs... (initialDelay={}, "
+ "retryInterval={})", keyNames.length, initialDelay, retryInterval);
try {
Thread.sleep(initialDelay);
} catch (InterruptedException ie) {
NameNode.LOG.info("EDEKCacheLoader interrupted before warming up.");
return;
}
final int logCoolDown = 10000; // periodically print error log (if any)
int sinceLastLog = logCoolDown; // always print the first failure
boolean success = false;
IOException lastSeenIOE = null;
while (true) {
try {
kp.warmUpEncryptedKeys(keyNames);
NameNode.LOG
.info("Successfully warmed up {} EDEKs.", keyNames.length);
success = true;
break;
} catch (IOException ioe) {
lastSeenIOE = ioe;
if (sinceLastLog >= logCoolDown) {
NameNode.LOG.info("Failed to warm up EDEKs.", ioe);
sinceLastLog = 0;
} else {
NameNode.LOG.debug("Failed to warm up EDEKs.", ioe);
}
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
NameNode.LOG.info("EDEKCacheLoader interrupted during retry.");
break;
}
sinceLastLog += retryInterval;
}
if (!success) {
NameNode.LOG.warn("Unable to warm up EDEKs.");
if (lastSeenIOE != null) {
NameNode.LOG.warn("Last seen exception:", lastSeenIOE);
}
}
}
}
} }

View File

@ -119,6 +119,8 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -285,6 +287,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/*************************************************** /***************************************************
* FSNamesystem does the actual bookkeeping work for the * FSNamesystem does the actual bookkeeping work for the
@ -410,6 +413,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// A daemon to periodically clean up corrupt lazyPersist files // A daemon to periodically clean up corrupt lazyPersist files
// from the name space. // from the name space.
Daemon lazyPersistFileScrubber = null; Daemon lazyPersistFileScrubber = null;
// Executor to warm up EDEK cache
private ExecutorService edekCacheLoader = null;
private final int edekCacheLoaderDelay;
private final int edekCacheLoaderInterval;
/** /**
* When an active namenode will roll its own edit log, in # edits * When an active namenode will roll its own edit log, in # edits
*/ */
@ -774,6 +783,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ " must be zero (for disable) or greater than zero."); + " must be zero (for disable) or greater than zero.");
} }
this.edekCacheLoaderDelay = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT);
this.edekCacheLoaderInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
// For testing purposes, allow the DT secret manager to be started regardless // For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled. // of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean( alwaysUseDelegationTokensForTests = conf.getBoolean(
@ -1115,6 +1131,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
cacheManager.startMonitorThread(); cacheManager.startMonitorThread();
blockManager.getDatanodeManager().setShouldSendCachingCommands(true); blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
if (provider != null) {
edekCacheLoader = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Warm Up EDEK Cache Thread #%d")
.build());
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
}
} finally { } finally {
startingActiveService = false; startingActiveService = false;
writeUnlock(); writeUnlock();
@ -1149,6 +1173,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor(); ((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor();
nnrmthread.interrupt(); nnrmthread.interrupt();
} }
if (edekCacheLoader != null) {
edekCacheLoader.shutdownNow();
}
if (nnEditLogRoller != null) { if (nnEditLogRoller != null) {
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop(); ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
nnEditLogRoller.interrupt(); nnEditLogRoller.interrupt();

View File

@ -2610,6 +2610,24 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.edekcacheloader.interval.ms</name>
<value>1000</value>
<description>When KeyProvider is configured, the interval time of warming
up edek cache on NN starts up / becomes active. All edeks will be loaded
from KMS into provider cache. The edek cache loader will try to warm up the
cache until succeed or NN leaves active state.
</description>
</property>
<property>
<name>dfs.namenode.edekcacheloader.initial.delay.ms</name>
<value>3000</value>
<description>When KeyProvider is configured, the time delayed until the first
attempt to warm up edek cache on NN start up / become active.
</description>
</property>
<property> <property>
<name>dfs.namenode.inotify.max.events.per.rpc</name> <name>dfs.namenode.inotify.max.events.per.rpc</name>
<value>1000</value> <value>1000</value>

View File

@ -117,10 +117,10 @@ import javax.xml.parsers.SAXParserFactory;
public class TestEncryptionZones { public class TestEncryptionZones {
private Configuration conf; protected Configuration conf;
private FileSystemTestHelper fsHelper; private FileSystemTestHelper fsHelper;
private MiniDFSCluster cluster; protected MiniDFSCluster cluster;
protected HdfsAdmin dfsAdmin; protected HdfsAdmin dfsAdmin;
protected DistributedFileSystem fs; protected DistributedFileSystem fs;
private File testRootDir; private File testRootDir;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.google.common.base.Supplier;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider; import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS; import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -26,10 +27,12 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
@ -71,8 +74,10 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones {
final Path zonePath = new Path("/TestEncryptionZone"); final Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY); dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
assertTrue(((KMSClientProvider)fs.getClient().getKeyProvider()). @SuppressWarnings("unchecked")
getEncKeyQueueSize(TEST_KEY) > 0); KMSClientProvider kcp = (KMSClientProvider) Whitebox
.getInternalState(cluster.getNamesystem().getProvider(), "extension");
assertTrue(kcp.getEncKeyQueueSize(TEST_KEY) > 0);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
@ -92,4 +97,31 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones {
Assert.assertEquals(0, tokens.length); Assert.assertEquals(0, tokens.length);
Assert.assertEquals(2, creds.numberOfTokens()); Assert.assertEquals(2, creds.numberOfTokens());
} }
@Test(timeout = 120000)
public void testWarmupEDEKCacheOnStartup() throws Exception {
final Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
@SuppressWarnings("unchecked")
KMSClientProvider spy = (KMSClientProvider) Whitebox
.getInternalState(cluster.getNamesystem().getProvider(), "extension");
assertTrue("key queue is empty after creating encryption zone",
spy.getEncKeyQueueSize(TEST_KEY) > 0);
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, 0);
cluster.restartNameNode(true);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final KMSClientProvider kspy = (KMSClientProvider) Whitebox
.getInternalState(cluster.getNamesystem().getProvider(),
"extension");
return kspy.getEncKeyQueueSize(TEST_KEY) > 0;
}
}, 1000, 60000);
}
} }