HADOOP-11161. Expose close method in KeyProvider to give clients of Provider implementations a hook to release resources. Contribued by Arun Suresh.
This commit is contained in:
parent
d996235285
commit
2a51494ce1
|
@ -811,6 +811,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HADOOP-10404. Some accesses to DomainSocketWatcher#closed are not protected
|
HADOOP-10404. Some accesses to DomainSocketWatcher#closed are not protected
|
||||||
by the lock (cmccabe)
|
by the lock (cmccabe)
|
||||||
|
|
||||||
|
HADOOP-11161. Expose close method in KeyProvider to give clients of
|
||||||
|
Provider implementations a hook to release resources. (Arun Suresh via atm)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HADOOP-10734. Implement high-performance secure random number sources.
|
HADOOP-10734. Implement high-performance secure random number sources.
|
||||||
|
|
|
@ -533,6 +533,14 @@ public abstract class KeyProvider {
|
||||||
byte[] material
|
byte[] material
|
||||||
) throws IOException;
|
) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Can be used by implementing classes to close any resources
|
||||||
|
* that require closing
|
||||||
|
*/
|
||||||
|
public void close() throws IOException {
|
||||||
|
// NOP
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Roll a new version of the given key generating the material for it.
|
* Roll a new version of the given key generating the material for it.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
|
@ -408,6 +408,13 @@ public class KeyProviderCryptoExtension extends
|
||||||
? (CryptoExtension) keyProvider
|
? (CryptoExtension) keyProvider
|
||||||
: new DefaultCryptoExtension(keyProvider);
|
: new DefaultCryptoExtension(keyProvider);
|
||||||
return new KeyProviderCryptoExtension(keyProvider, cryptoExtension);
|
return new KeyProviderCryptoExtension(keyProvider, cryptoExtension);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (getKeyProvider() != null) {
|
||||||
|
getKeyProvider().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -791,4 +791,15 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
return tokens;
|
return tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown valueQueue executor threads
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
try {
|
||||||
|
encKeyVersionQueue.shutdown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,8 @@ public class ValueQueue <E> {
|
||||||
private final int numValues;
|
private final int numValues;
|
||||||
private final float lowWatermark;
|
private final float lowWatermark;
|
||||||
|
|
||||||
|
private volatile boolean executorThreadsStarted = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A <code>Runnable</code> which takes a string name.
|
* A <code>Runnable</code> which takes a string name.
|
||||||
*/
|
*/
|
||||||
|
@ -187,9 +189,6 @@ public class ValueQueue <E> {
|
||||||
TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder()
|
TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder()
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.setNameFormat(REFILL_THREAD).build());
|
.setNameFormat(REFILL_THREAD).build());
|
||||||
// To ensure all requests are first queued, make coreThreads = maxThreads
|
|
||||||
// and pre-start all the Core Threads.
|
|
||||||
executor.prestartAllCoreThreads();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
|
public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
|
||||||
|
@ -297,6 +296,15 @@ public class ValueQueue <E> {
|
||||||
|
|
||||||
private void submitRefillTask(final String keyName,
|
private void submitRefillTask(final String keyName,
|
||||||
final Queue<E> keyQueue) throws InterruptedException {
|
final Queue<E> keyQueue) throws InterruptedException {
|
||||||
|
if (!executorThreadsStarted) {
|
||||||
|
synchronized (this) {
|
||||||
|
// To ensure all requests are first queued, make coreThreads =
|
||||||
|
// maxThreads
|
||||||
|
// and pre-start all the Core Threads.
|
||||||
|
executor.prestartAllCoreThreads();
|
||||||
|
executorThreadsStarted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
// The submit/execute method of the ThreadPoolExecutor is bypassed and
|
// The submit/execute method of the ThreadPoolExecutor is bypassed and
|
||||||
// the Runnable is directly put in the backing BlockingQueue so that we
|
// the Runnable is directly put in the backing BlockingQueue so that we
|
||||||
// can control exactly how the runnable is inserted into the queue.
|
// can control exactly how the runnable is inserted into the queue.
|
||||||
|
|
|
@ -935,12 +935,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
if(clientRunning) {
|
try {
|
||||||
closeAllFilesBeingWritten(false);
|
if(clientRunning) {
|
||||||
clientRunning = false;
|
closeAllFilesBeingWritten(false);
|
||||||
getLeaseRenewer().closeClient(this);
|
clientRunning = false;
|
||||||
// close connections to the namenode
|
getLeaseRenewer().closeClient(this);
|
||||||
closeConnectionToNamenode();
|
// close connections to the namenode
|
||||||
|
closeConnectionToNamenode();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (provider != null) {
|
||||||
|
provider.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue