HADOOP-11161. Expose close method in KeyProvider to give clients of Provider implementations a hook to release resources. Contribued by Arun Suresh.

(cherry picked from commit d9556e873ef4d3e68c4f0c991f856d1faa747f07)
(cherry picked from commit 3a2565c7be80cf6e9cdfec0f5460ed8ed2252768)
This commit is contained in:
Aaron T. Myers 2014-10-08 17:58:53 -07:00
parent ab448565c9
commit f86c9c6c71
6 changed files with 52 additions and 9 deletions

View File

@ -450,6 +450,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10404. Some accesses to DomainSocketWatcher#closed are not protected HADOOP-10404. Some accesses to DomainSocketWatcher#closed are not protected
by lock (cmccabe) by lock (cmccabe)
HADOOP-11161. Expose close method in KeyProvider to give clients of
Provider implementations a hook to release resources. (Arun Suresh via atm)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HADOOP-10734. Implement high-performance secure random number sources. HADOOP-10734. Implement high-performance secure random number sources.

View File

@ -533,6 +533,14 @@ public abstract class KeyProvider {
byte[] material byte[] material
) throws IOException; ) throws IOException;
/**
* Can be used by implementing classes to close any resources
* that require closing
*/
public void close() throws IOException {
// NOP
}
/** /**
* Roll a new version of the given key generating the material for it. * Roll a new version of the given key generating the material for it.
* <p/> * <p/>

View File

@ -410,4 +410,11 @@ public class KeyProviderCryptoExtension extends
return new KeyProviderCryptoExtension(keyProvider, cryptoExtension); return new KeyProviderCryptoExtension(keyProvider, cryptoExtension);
} }
@Override
public void close() throws IOException {
if (getKeyProvider() != null) {
getKeyProvider().close();
}
}
} }

View File

@ -791,4 +791,15 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return tokens; return tokens;
} }
/**
* Shutdown valueQueue executor threads
*/
@Override
public void close() throws IOException {
try {
encKeyVersionQueue.shutdown();
} catch (Exception e) {
throw new IOException(e);
}
}
} }

View File

@ -75,6 +75,8 @@ public class ValueQueue <E> {
private final int numValues; private final int numValues;
private final float lowWatermark; private final float lowWatermark;
private volatile boolean executorThreadsStarted = false;
/** /**
* A <code>Runnable</code> which takes a string name. * A <code>Runnable</code> which takes a string name.
*/ */
@ -187,9 +189,6 @@ public class ValueQueue <E> {
TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder() TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder()
.setDaemon(true) .setDaemon(true)
.setNameFormat(REFILL_THREAD).build()); .setNameFormat(REFILL_THREAD).build());
// To ensure all requests are first queued, make coreThreads = maxThreads
// and pre-start all the Core Threads.
executor.prestartAllCoreThreads();
} }
public ValueQueue(final int numValues, final float lowWaterMark, long expiry, public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
@ -297,6 +296,15 @@ public class ValueQueue <E> {
private void submitRefillTask(final String keyName, private void submitRefillTask(final String keyName,
final Queue<E> keyQueue) throws InterruptedException { final Queue<E> keyQueue) throws InterruptedException {
if (!executorThreadsStarted) {
synchronized (this) {
// To ensure all requests are first queued, make coreThreads =
// maxThreads
// and pre-start all the Core Threads.
executor.prestartAllCoreThreads();
executorThreadsStarted = true;
}
}
// The submit/execute method of the ThreadPoolExecutor is bypassed and // The submit/execute method of the ThreadPoolExecutor is bypassed and
// the Runnable is directly put in the backing BlockingQueue so that we // the Runnable is directly put in the backing BlockingQueue so that we
// can control exactly how the runnable is inserted into the queue. // can control exactly how the runnable is inserted into the queue.

View File

@ -920,12 +920,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/ */
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if(clientRunning) { try {
closeAllFilesBeingWritten(false); if(clientRunning) {
clientRunning = false; closeAllFilesBeingWritten(false);
getLeaseRenewer().closeClient(this); clientRunning = false;
// close connections to the namenode getLeaseRenewer().closeClient(this);
closeConnectionToNamenode(); // close connections to the namenode
closeConnectionToNamenode();
}
} finally {
if (provider != null) {
provider.close();
}
} }
} }