diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 446c6a3a50d..4a6bc11f746 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -893,6 +893,9 @@ Release 2.7.0 - UNRELEASED HDFS-7744. Fix potential NPE in DFSInputStream after setDropBehind or setReadahead is called (cmccabe) + HDFS-7718. Store KeyProvider in ClientContext to avoid leaking key provider + threads when using FileContext (Arun Suresh via Colin P. McCabe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index e106fca55d7..af7c0956e88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; import org.apache.hadoop.hdfs.util.ByteArrayManager; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; /** * ClientContext contains context information for a client. @@ -71,6 +72,10 @@ public class ClientContext { */ private final DomainSocketFactory domainSocketFactory; + /** + * Caches key Providers for the DFSClient + */ + private final KeyProviderCache keyProviderCache; /** * True if we should use the legacy BlockReaderLocal. */ @@ -107,6 +112,7 @@ public class ClientContext { conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); + this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); @@ -138,7 +144,9 @@ public class ClientContext { append(", domainSocketDataTraffic = "). append(conf.domainSocketDataTraffic). append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). - append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); + append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs). + append(", keyProviderCacheExpiryMs = "). + append(conf.keyProviderCacheExpiryMs); return builder.toString(); } @@ -195,6 +203,10 @@ public class ClientContext { return peerCache; } + public KeyProviderCache getKeyProviderCache() { + return keyProviderCache; + } + public boolean getUseLegacyBlockReaderLocal() { return useLegacyBlockReaderLocal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index e5bf98d7271..48697068da7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -275,8 +275,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = new DFSHedgedReadMetrics(); private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; - @VisibleForTesting - KeyProvider provider; private final Sampler traceSampler; /** @@ -336,6 +334,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final long shortCircuitMmapCacheRetryTimeout; final long shortCircuitCacheStaleThresholdMs; + final long keyProviderCacheExpiryMs; + public Conf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getTimeout(conf); @@ -499,6 +499,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, dfsclientSlowIoWarningThresholdMs = conf.getLong( DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + + keyProviderCacheExpiryMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, + DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); } public boolean isUseLegacyBlockReaderLocal() { @@ -638,14 +642,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); - provider = DFSUtil.createKeyProvider(conf); - if (LOG.isDebugEnabled()) { - if (provider == null) { - LOG.debug("No KeyProvider found."); - } else { - LOG.debug("Found KeyProvider: " + provider.toString()); - } - } int numResponseToDrop = conf.getInt( DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); @@ -961,18 +957,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ @Override public synchronized void close() throws IOException { - try { - if(clientRunning) { - closeAllFilesBeingWritten(false); - clientRunning = false; - getLeaseRenewer().closeClient(this); - // close connections to the namenode - closeConnectionToNamenode(); - } - } finally { - if (provider != null) { - provider.close(); - } + if(clientRunning) { + closeAllFilesBeingWritten(false); + clientRunning = false; + getLeaseRenewer().closeClient(this); + // close connections to the namenode + closeConnectionToNamenode(); } } @@ -1382,6 +1372,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, feInfo) throws IOException { TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler); try { + KeyProvider provider = getKeyProvider(); if (provider == null) { throw new IOException("No KeyProvider is configured, cannot access" + " an encrypted file"); @@ -3499,12 +3490,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } public KeyProvider getKeyProvider() { - return provider; + return clientContext.getKeyProviderCache().get(conf); } @VisibleForTesting - public void setKeyProvider(KeyProviderCryptoExtension provider) { - this.provider = provider; + public void setKeyProvider(KeyProvider provider) { + try { + clientContext.getKeyProviderCache().setKeyProvider(conf, provider); + } catch (IOException e) { + LOG.error("Could not set KeyProvider !!", e); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index beb3e38019e..e4343bbeeb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; @@ -769,4 +771,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"}; public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn"; public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false; + + // Key Provider Cache Expiry + public static final String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS = + "dfs.client.key.provider.cache.expiry"; + // 10 days + public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = + TimeUnit.DAYS.toMillis(10); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java new file mode 100644 index 00000000000..68ff554a892 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class KeyProviderCache { + + public static final Log LOG = LogFactory.getLog(KeyProviderCache.class); + + private final Cache cache; + + public KeyProviderCache(long expiryMs) { + cache = CacheBuilder.newBuilder() + .expireAfterAccess(expiryMs, TimeUnit.MILLISECONDS) + .removalListener(new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + try { + notification.getValue().close(); + } catch (Throwable e) { + LOG.error( + "Error closing KeyProvider with uri [" + + notification.getKey() + "]", e); + ; + } + } + }) + .build(); + } + + public KeyProvider get(final Configuration conf) { + URI kpURI = createKeyProviderURI(conf); + if (kpURI == null) { + return null; + } + try { + return cache.get(kpURI, new Callable() { + @Override + public KeyProvider call() throws Exception { + return DFSUtil.createKeyProvider(conf); + } + }); + } catch (Exception e) { + LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause()); + return null; + } + } + + private URI createKeyProviderURI(Configuration conf) { + final String providerUriStr = + conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, null); + // No provider set in conf + if (providerUriStr == null) { + LOG.error("Could not find uri with key [" + + DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + + "] to create a keyProvider !!"); + return null; + } + final URI providerUri; + try { + providerUri = new URI(providerUriStr); + } catch (URISyntaxException e) { + LOG.error("KeyProvider URI string is invalid [" + providerUriStr + + "]!!", e.getCause()); + return null; + } + return providerUri; + } + + @VisibleForTesting + public void setKeyProvider(Configuration conf, KeyProvider keyProvider) + throws IOException { + URI uri = createKeyProviderURI(conf); + cache.put(uri, keyProvider); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 13eb4cf97e0..dbb7ea59c3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -38,11 +38,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; @@ -96,7 +98,6 @@ import static org.mockito.Matchers.anyShort; import static org.mockito.Mockito.withSettings; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; - import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; @@ -105,6 +106,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import org.xml.sax.InputSource; import org.xml.sax.helpers.DefaultHandler; @@ -157,8 +159,8 @@ public class TestEncryptionZones { protected void setProvider() { // Need to set the client's KeyProvider to the NN's for JKS, // else the updates do not get flushed properly - fs.getClient().provider = cluster.getNameNode().getNamesystem() - .getProvider(); + fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem() + .getProvider()); } @After @@ -1072,7 +1074,7 @@ public class TestEncryptionZones { addDelegationTokens(anyString(), (Credentials)any())). thenReturn(new Token[] { testToken }); - dfs.getClient().provider = keyProvider; + dfs.getClient().setKeyProvider(keyProvider); Credentials creds = new Credentials(); final Token tokens[] = dfs.addDelegationTokens("JobTracker", creds); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java index 3339f167c23..e61a02b45e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java @@ -80,7 +80,7 @@ public class TestEncryptionZonesWithHA { dfsAdmin1 = new HdfsAdmin(cluster.getURI(1), conf); KeyProviderCryptoExtension nn0Provider = cluster.getNameNode(0).getNamesystem().getProvider(); - fs.getClient().provider = nn0Provider; + fs.getClient().setKeyProvider(nn0Provider); } @After diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java index b165c56825e..0040d7561be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java @@ -71,7 +71,7 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones { final Path zonePath = new Path("/TestEncryptionZone"); fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); dfsAdmin.createEncryptionZone(zonePath, TEST_KEY); - assertTrue(((KMSClientProvider)fs.getClient().provider). + assertTrue(((KMSClientProvider)fs.getClient().getKeyProvider()). getEncKeyQueueSize(TEST_KEY) > 0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java new file mode 100644 index 00000000000..4cbe871df5f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderFactory; +import org.apache.hadoop.crypto.key.kms.KMSClientProvider; +import org.junit.Assert; +import org.junit.Test; + +public class TestKeyProviderCache { + + public static class DummyKeyProvider extends KeyProvider { + + public DummyKeyProvider(Configuration conf) { + super(conf); + } + + @Override + public KeyVersion getKeyVersion(String versionName) throws IOException { + return null; + } + + @Override + public List getKeys() throws IOException { + return null; + } + + @Override + public List getKeyVersions(String name) throws IOException { + return null; + } + + @Override + public Metadata getMetadata(String name) throws IOException { + return null; + } + + @Override + public KeyVersion createKey(String name, byte[] material, Options options) + throws IOException { + return null; + } + + @Override + public void deleteKey(String name) throws IOException { + } + + @Override + public KeyVersion rollNewVersion(String name, byte[] material) + throws IOException { + return null; + } + + @Override + public void flush() throws IOException { + } + + } + + public static class Factory extends KeyProviderFactory { + + @Override + public KeyProvider createProvider(URI providerName, Configuration conf) + throws IOException { + if ("dummy".equals(providerName.getScheme())) { + return new DummyKeyProvider(conf); + } + return null; + } + } + + @Test + public void testCache() throws Exception { + KeyProviderCache kpCache = new KeyProviderCache(10000); + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, + "dummy://foo:bar@test_provider1"); + KeyProvider keyProvider1 = kpCache.get(conf); + Assert.assertNotNull("Returned Key Provider is null !!", keyProvider1); + + conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, + "dummy://foo:bar@test_provider1"); + KeyProvider keyProvider2 = kpCache.get(conf); + + Assert.assertTrue("Different KeyProviders returned !!", + keyProvider1 == keyProvider2); + + conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, + "dummy://test_provider3"); + KeyProvider keyProvider3 = kpCache.get(conf); + + Assert.assertFalse("Same KeyProviders returned !!", + keyProvider1 == keyProvider3); + + conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, + "dummy://hello:there@test_provider1"); + KeyProvider keyProvider4 = kpCache.get(conf); + + Assert.assertFalse("Same KeyProviders returned !!", + keyProvider1 == keyProvider4); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java index 13381335f09..9720dd6c749 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java @@ -81,8 +81,8 @@ public class TestReservedRawPaths { dfsAdmin = new HdfsAdmin(cluster.getURI(), conf); // Need to set the client's KeyProvider to the NN's for JKS, // else the updates do not get flushed properly - fs.getClient().provider = cluster.getNameNode().getNamesystem() - .getProvider(); + fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem() + .getProvider()); DFSTestUtil.createKey(TEST_KEY, cluster, conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory new file mode 100644 index 00000000000..f7b09124b02 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.hdfs.TestKeyProviderCache$Factory \ No newline at end of file