Merge remote-tracking branch 'origin/trunk' into HDFS-6584

This commit is contained in:
Jing Zhao 2014-09-08 15:15:34 -07:00
commit 6997c1c658
32 changed files with 520 additions and 54 deletions

View File

@ -768,6 +768,12 @@ Release 2.6.0 - UNRELEASED
HADOOP-11069. KMSClientProvider should use getAuthenticationMethod() to
determine if in proxyuser mode or not. (tucu)
HADOOP-11073. Credential Provider related Unit Tests Failure on Windows.
(Xiaoyu Yao via cnauroth)
HADOOP-11071. KMSClientProvider should drain the local generated EEK cache
on key rollover. (tucu)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -997,7 +997,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
}
getOverlay().setProperty(name, value);
getProps().setProperty(name, value);
String newSource = (source == null ? "programatically" : source);
String newSource = (source == null ? "programmatically" : source);
if (!isDeprecated(name)) {
updatingResource.put(name, new String[] {newSource});
@ -1471,7 +1471,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
/**
* Gets information about why a property was set. Typically this is the
* path to the resource objects (file, URL, etc.) the property came from, but
* it can also indicate that it was set programatically, or because of the
* it can also indicate that it was set programmatically, or because of the
* command line.
*
* @param name - The property name to get the source of.

View File

@ -178,6 +178,13 @@ public class KeyProviderCryptoExtension extends
public void warmUpEncryptedKeys(String... keyNames)
throws IOException;
/**
* Drains the Queue for the provided key.
*
* @param keyName the key to drain the Queue for
*/
public void drain(String keyName);
/**
* Generates a key material and encrypts it using the given key version name
* and initialization vector. The generated key material is of the same
@ -313,6 +320,10 @@ public class KeyProviderCryptoExtension extends
// NO-OP since the default version does not cache any keys
}
@Override
public void drain(String keyName) {
// NO-OP since the default version does not cache any keys
}
}
/**

View File

@ -590,7 +590,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME);
Map response = call(conn, jsonMaterial,
HttpURLConnection.HTTP_OK, Map.class);
return parseJSONKeyVersion(response);
KeyVersion keyVersion = parseJSONKeyVersion(response);
encKeyVersionQueue.drain(name);
return keyVersion;
}
@ -712,6 +714,11 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
}
@Override
public void drain(String keyName) {
encKeyVersionQueue.drain(keyName);
}
@Override
public Token<?>[] addDelegationTokens(String renewer,
Credentials credentials) throws IOException {

View File

@ -227,6 +227,19 @@ public class ValueQueue <E> {
return getAtMost(keyName, 1).get(0);
}
/**
* Drains the Queue for the provided key.
*
* @param keyName the key to drain the Queue for
*/
public void drain(String keyName ) {
try {
keyQueues.get(keyName).clear();
} catch (ExecutionException ex) {
//NOP
}
}
/**
* This removes the "num" values currently at the head of the Queue for the
* provided key. Will immediately fire the Queue filler function if key

View File

@ -63,7 +63,7 @@ public class TestConfServlet extends TestCase {
String resource = (String)propertyInfo.get("resource");
System.err.println("k: " + key + " v: " + val + " r: " + resource);
if (TEST_KEY.equals(key) && TEST_VAL.equals(val)
&& "programatically".equals(resource)) {
&& "programmatically".equals(resource)) {
foundSetting = true;
}
}

View File

@ -821,8 +821,8 @@ public class TestConfiguration extends TestCase {
fileResource,
new Path(sources[0]));
assertArrayEquals("Resource string returned for a set() property must be " +
"\"programatically\"",
new String[]{"programatically"},
"\"programmatically\"",
new String[]{"programmatically"},
conf.getPropertySources("fs.defaultFS"));
assertEquals("Resource string returned for an unset property must be null",
null, conf.getPropertySources("fs.defaultFoo"));
@ -1101,7 +1101,7 @@ public class TestConfiguration extends TestCase {
confDump.put(prop.getKey(), prop);
}
assertEquals("value5",confDump.get("test.key6").getValue());
assertEquals("programatically", confDump.get("test.key4").getResource());
assertEquals("programmatically", confDump.get("test.key4").getResource());
outWriter.close();
}

View File

@ -55,18 +55,18 @@ public class TestKeyProviderFactory {
@Test
public void testFactory() throws Exception {
Configuration conf = new Configuration();
final String userUri = UserProvider.SCHEME_NAME + ":///";
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
final String jksUri = JavaKeyStoreProvider.SCHEME_NAME +
"://file" + jksPath.toUri().toString();
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
UserProvider.SCHEME_NAME + ":///," +
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
userUri + "," + jksUri);
List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
assertEquals(2, providers.size());
assertEquals(UserProvider.class, providers.get(0).getClass());
assertEquals(JavaKeyStoreProvider.class, providers.get(1).getClass());
assertEquals(UserProvider.SCHEME_NAME +
":///", providers.get(0).toString());
assertEquals(JavaKeyStoreProvider.SCHEME_NAME +
"://file" + tmpDir + "/test.jks",
providers.get(1).toString());
assertEquals(userUri, providers.get(0).toString());
assertEquals(jksUri, providers.get(1).toString());
}
@Test
@ -207,8 +207,9 @@ public class TestKeyProviderFactory {
@Test
public void testJksProvider() throws Exception {
Configuration conf = new Configuration();
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(tmpDir, "test.jks");
file.delete();
@ -317,8 +318,9 @@ public class TestKeyProviderFactory {
@Test
public void testJksProviderPasswordViaConfig() throws Exception {
Configuration conf = new Configuration();
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(tmpDir, "test.jks");
file.delete();
try {
@ -360,8 +362,8 @@ public class TestKeyProviderFactory {
@Test
public void testGetProviderViaURI() throws Exception {
Configuration conf = new Configuration(false);
URI uri = new URI(JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir +
"/test.jks");
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
URI uri = new URI(JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
KeyProvider kp = KeyProviderFactory.get(uri, conf);
Assert.assertNotNull(kp);
Assert.assertEquals(JavaKeyStoreProvider.class, kp.getClass());

View File

@ -187,4 +187,18 @@ public class TestValueQueue {
Assert.assertEquals(10, filler.getTop().num);
vq.shutdown();
}
@Test
public void testDrain() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1,
SyncGenerationPolicy.ALL, filler);
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
vq.drain("k1");
Assert.assertNull(filler.getTop());
vq.shutdown();
}
}

View File

@ -40,6 +40,7 @@ import javax.naming.directory.SearchControls;
import javax.naming.directory.SearchResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
@ -165,8 +166,9 @@ public class TestLdapGroupsMapping {
File testDir = new File(System.getProperty("test.build.data",
"target/test-dir"));
Configuration conf = new Configuration();
final Path jksPath = new Path(testDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(testDir, "test.jks");
file.delete();

View File

@ -52,19 +52,19 @@ public class TestCredentialProviderFactory {
@Test
public void testFactory() throws Exception {
Configuration conf = new Configuration();
final String userUri = UserProvider.SCHEME_NAME + ":///";
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
final String jksUri = JavaKeyStoreProvider.SCHEME_NAME +
"://file" + jksPath.toUri();
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
UserProvider.SCHEME_NAME + ":///," +
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
userUri + "," + jksUri);
List<CredentialProvider> providers =
CredentialProviderFactory.getProviders(conf);
assertEquals(2, providers.size());
assertEquals(UserProvider.class, providers.get(0).getClass());
assertEquals(JavaKeyStoreProvider.class, providers.get(1).getClass());
assertEquals(UserProvider.SCHEME_NAME +
":///", providers.get(0).toString());
assertEquals(JavaKeyStoreProvider.SCHEME_NAME +
"://file" + tmpDir + "/test.jks",
providers.get(1).toString());
assertEquals(userUri, providers.get(0).toString());
assertEquals(jksUri, providers.get(1).toString());
}
@Test
@ -188,8 +188,9 @@ public class TestCredentialProviderFactory {
@Test
public void testJksProvider() throws Exception {
Configuration conf = new Configuration();
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(tmpDir, "test.jks");
file.delete();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
@ -392,8 +393,9 @@ public class KeyStoreTestUtil {
"target/test-dir"));
Configuration conf = new Configuration();
final Path jksPath = new Path(testDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(testDir, "test.jks");
file.delete();

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
@ -305,8 +306,9 @@ public class TestSSLFactory {
if (useCredProvider) {
File testDir = new File(System.getProperty("test.build.data",
"target/test-dir"));
final Path jksPath = new Path(testDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
sslConf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, ourUrl);
}
} else {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key.kms.server;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@ -27,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy;
@ -112,6 +114,11 @@ public class EagerKeyGeneratorKeyProviderCryptoExtension
}
}
@Override
public void drain(String keyName) {
encKeyVersionQueue.drain(keyName);
}
@Override
public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
throws IOException, GeneralSecurityException {
@ -146,4 +153,19 @@ public class EagerKeyGeneratorKeyProviderCryptoExtension
new CryptoExtension(conf, keyProviderCryptoExtension));
}
@Override
public KeyVersion rollNewVersion(String name)
throws NoSuchAlgorithmException, IOException {
KeyVersion keyVersion = super.rollNewVersion(name);
getExtension().drain(name);
return keyVersion;
}
@Override
public KeyVersion rollNewVersion(String name, byte[] material)
throws IOException {
KeyVersion keyVersion = super.rollNewVersion(name, material);
getExtension().drain(name);
return keyVersion;
}
}

View File

@ -531,6 +531,7 @@ public class TestKMS {
Assert.assertEquals("d", meta.getDescription());
Assert.assertEquals(attributes, meta.getAttributes());
// test delegation token retrieval
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(kp);
@ -542,6 +543,22 @@ public class TestKMS {
Assert.assertEquals(new Text("kms-dt"), credentials.getToken(
SecurityUtil.buildTokenService(kmsAddr)).getKind());
// test rollover draining
KeyProviderCryptoExtension kpce = KeyProviderCryptoExtension.
createKeyProviderCryptoExtension(kp);
options = new KeyProvider.Options(conf);
options.setCipher("AES/CTR/NoPadding");
options.setBitLength(128);
kpce.createKey("k6", options);
EncryptedKeyVersion ekv1 = kpce.generateEncryptedKey("k6");
kpce.rollNewVersion("k6");
EncryptedKeyVersion ekv2 = kpce.generateEncryptedKey("k6");
Assert.assertNotEquals(ekv1.getEncryptionKeyVersionName(),
ekv2.getEncryptionKeyVersionName());
return null;
}
});

View File

@ -282,6 +282,9 @@ Trunk (Unreleased)
intermittently with various symptoms - debugging patch. (Yongjun Zhang via
Arpit Agarwal)
HDFS-6893. crypto subcommand is not sorted properly in hdfs's hadoop_usage
(David Luo via aw)
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -469,6 +472,12 @@ Release 2.6.0 - UNRELEASED
HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv)
HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs.
(Ming Ma via wheat9)
HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do
no-checksum reads that extend too long (cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -641,6 +650,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7025. HDFS Credential Provider related Unit Test Failure.
(Xiaoyu Yao via cnauroth)
HDFS-7005. DFS input streams do not timeout.
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -23,6 +23,7 @@ function hadoop_usage
echo " cacheadmin configure the HDFS cache"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
echo " crypto configure HDFS encryption zones"
echo " datanode run a DFS datanode"
echo " dfs run a filesystem command on the file system"
echo " dfsadmin run a DFS admin client"
@ -46,7 +47,6 @@ function hadoop_usage
echo " snapshotDiff diff two snapshots of a directory or diff the"
echo " current directory contents with a snapshot"
echo " zkfc run the ZK Failover Controller daemon"
echo " crypto configure HDFS encryption zones"
echo ""
echo "Most commands print help when invoked w/o parameters."
}

View File

@ -3034,6 +3034,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
dfsClientConf.socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
blockToken, datanodeId);
peer.setReadTimeout(dfsClientConf.socketTimeout);
success = true;
return peer;
} finally {

View File

@ -248,6 +248,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_OOB_TIMEOUT_KEY = "dfs.datanode.oob.timeout-ms";
public static final String DFS_DATANODE_OOB_TIMEOUT_DEFAULT = "1500,0,0,0"; // OOB_TYPE1, OOB_TYPE2, OOB_TYPE3, OOB_TYPE4
public static final String DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS = "dfs.datanode.cache.revocation.timeout.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT = 900000L;
public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -305,7 +305,7 @@ public class DatanodeStorageInfo {
@Override
public String toString() {
return "[" + storageType + "]" + storageID + ":" + state;
return "[" + storageType + "]" + storageID + ":" + state + ":" + dn;
}
StorageReport toStorageReport() {

View File

@ -26,6 +26,7 @@ import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@ -43,6 +44,7 @@ import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
@ -83,11 +85,13 @@ public class ShortCircuitRegistry {
private static class RegisteredShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
private final String clientName;
private final ShortCircuitRegistry registry;
RegisteredShm(ShmId shmId, FileInputStream stream,
RegisteredShm(String clientName, ShmId shmId, FileInputStream stream,
ShortCircuitRegistry registry) throws IOException {
super(shmId, stream);
this.clientName = clientName;
this.registry = registry;
}
@ -100,6 +104,10 @@ public class ShortCircuitRegistry {
}
return true;
}
String getClientName() {
return clientName;
}
}
public synchronized void removeShm(ShortCircuitShm shm) {
@ -243,6 +251,16 @@ public class ShortCircuitRegistry {
}
}
public synchronized String getClientNames(ExtendedBlockId blockId) {
if (!enabled) return "";
final HashSet<String> clientNames = new HashSet<String>();
final Set<Slot> affectedSlots = slots.get(blockId);
for (Slot slot : affectedSlots) {
clientNames.add(((RegisteredShm)slot.getShm()).getClientName());
}
return Joiner.on(",").join(clientNames);
}
public static class NewShmInfo implements Closeable {
public final ShmId shmId;
public final FileInputStream stream;
@ -290,7 +308,7 @@ public class ShortCircuitRegistry {
shmId = ShmId.createRandom();
} while (segments.containsKey(shmId));
fis = shmFactory.createDescriptor(clientName, SHM_LENGTH);
shm = new RegisteredShm(shmId, fis, this);
shm = new RegisteredShm(clientName, shmId, fis, this);
} finally {
if (shm == null) {
IOUtils.closeQuietly(fis);

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -33,10 +38,12 @@ import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -116,6 +124,12 @@ public class FsDatasetCache {
private final ThreadPoolExecutor uncachingExecutor;
private final ScheduledThreadPoolExecutor deferredUncachingExecutor;
private final long revocationMs;
private final long revocationPollingMs;
/**
* The approximate amount of cache space in use.
*
@ -217,6 +231,24 @@ public class FsDatasetCache {
new LinkedBlockingQueue<Runnable>(),
workerFactory);
this.uncachingExecutor.allowCoreThreadTimeOut(true);
this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor(
1, workerFactory);
this.revocationMs = dataset.datanode.getConf().getLong(
DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT);
long confRevocationPollingMs = dataset.datanode.getConf().getLong(
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS,
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT);
long minRevocationPollingMs = revocationMs / 2;
if (minRevocationPollingMs < confRevocationPollingMs) {
throw new RuntimeException("configured value " +
confRevocationPollingMs + "for " +
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS +
" is too high. It must not be more than half of the " +
"value of " + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS +
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
}
/**
@ -262,13 +294,11 @@ public class FsDatasetCache {
synchronized void uncacheBlock(String bpid, long blockId) {
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
Value prevValue = mappableBlockMap.get(key);
boolean deferred = false;
if (!dataset.datanode.getShortCircuitRegistry().
processBlockMunlockRequest(key)) {
// TODO: we probably want to forcibly uncache the block (and close the
// shm) after a certain timeout has elapsed.
LOG.debug("{} is anchored, and can't be uncached now.", key);
return;
deferred = true;
}
if (prevValue == null) {
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
@ -285,12 +315,19 @@ public class FsDatasetCache {
new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
break;
case CACHED:
LOG.debug(
"Block with id {}, pool {} has been scheduled for uncaching" + ".",
blockId, bpid);
mappableBlockMap.put(key,
new Value(prevValue.mappableBlock, State.UNCACHING));
uncachingExecutor.execute(new UncachingTask(key));
if (deferred) {
LOG.debug("{} is anchored, and can't be uncached now. Scheduling it " +
"for uncaching in {} ",
key, DurationFormatUtils.formatDurationHMS(revocationPollingMs));
deferredUncachingExecutor.schedule(
new UncachingTask(key, revocationMs),
revocationPollingMs, TimeUnit.MILLISECONDS);
} else {
LOG.debug("{} has been scheduled for immediate uncaching.", key);
uncachingExecutor.execute(new UncachingTask(key, 0));
}
break;
default:
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
@ -403,22 +440,62 @@ public class FsDatasetCache {
private class UncachingTask implements Runnable {
private final ExtendedBlockId key;
private final long revocationTimeMs;
UncachingTask(ExtendedBlockId key) {
UncachingTask(ExtendedBlockId key, long revocationDelayMs) {
this.key = key;
if (revocationDelayMs == 0) {
this.revocationTimeMs = 0;
} else {
this.revocationTimeMs = revocationDelayMs + Time.monotonicNow();
}
}
private boolean shouldDefer() {
/* If revocationTimeMs == 0, this is an immediate uncache request.
* No clients were anchored at the time we made the request. */
if (revocationTimeMs == 0) {
return false;
}
/* Let's check if any clients still have this block anchored. */
boolean anchored =
!dataset.datanode.getShortCircuitRegistry().
processBlockMunlockRequest(key);
if (!anchored) {
LOG.debug("Uncaching {} now that it is no longer in use " +
"by any clients.", key);
return false;
}
long delta = revocationTimeMs - Time.monotonicNow();
if (delta < 0) {
LOG.warn("Forcibly uncaching {} after {} " +
"because client(s) {} refused to stop using it.", key,
DurationFormatUtils.formatDurationHMS(revocationTimeMs),
dataset.datanode.getShortCircuitRegistry().getClientNames(key));
return false;
}
LOG.info("Replica {} still can't be uncached because some " +
"clients continue to use it. Will wait for {}", key,
DurationFormatUtils.formatDurationHMS(delta));
return true;
}
@Override
public void run() {
Value value;
if (shouldDefer()) {
deferredUncachingExecutor.schedule(
this, revocationPollingMs, TimeUnit.MILLISECONDS);
return;
}
synchronized (FsDatasetCache.this) {
value = mappableBlockMap.get(key);
}
Preconditions.checkNotNull(value);
Preconditions.checkArgument(value.state == State.UNCACHING);
// TODO: we will eventually need to do revocation here if any clients
// are reading via mmap with checksums enabled. See HDFS-5182.
IOUtils.closeQuietly(value.mappableBlock);
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
@ -427,7 +504,13 @@ public class FsDatasetCache {
usedBytesCount.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes);
if (revocationTimeMs != 0) {
LOG.debug("Uncaching of {} completed. usedBytes = {}",
key, newUsedBytes);
} else {
LOG.debug("Deferred uncaching of {} completed. usedBytes = {}",
key, newUsedBytes);
}
}
}

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.PositionTrackingIn
/**
* OfflineImageViewer to dump the contents of an Hadoop image file to XML
* or the console. Main entry point into utility, either via the
* command line or programatically.
* command line or programmatically.
*/
@InterfaceAudience.Private
public class OfflineImageViewer {

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.net.NetUtils;
/**
* OfflineImageViewerPB to dump the contents of an Hadoop image file to XML or
* the console. Main entry point into utility, either via the command line or
* programatically.
* programmatically.
*/
@InterfaceAudience.Private
public class OfflineImageViewerPB {

View File

@ -2117,4 +2117,25 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.revocation.timeout.ms</name>
<value>900000</value>
<description>When the DFSClient reads from a block file which the DataNode is
caching, the DFSClient can skip verifying checksums. The DataNode will
keep the block file in cache until the client is done. If the client takes
an unusually long time, though, the DataNode may need to evict the block
file from the cache anyway. This value controls how long the DataNode will
wait for the client to release a replica that it is reading without
checksums.
</description>
</property>
<property>
<name>dfs.datanode.cache.revocation.polling.ms</name>
<value>500</value>
<description>How often the DataNode should poll to see if the clients have
stopped using a replica that the DataNode wants to uncache.
</description>
</property>
</configuration>

View File

@ -31,6 +31,9 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -60,6 +63,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
@ -69,6 +73,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
@ -961,4 +966,36 @@ public class TestDistributedFileSystem {
cluster.shutdown();
}
}
@Test(timeout=10000)
public void testDFSClientPeerTimeout() throws IOException {
final int timeout = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// use a dummy socket to ensure the read timesout
ServerSocket socket = new ServerSocket(0);
Peer peer = dfs.getClient().newConnectedPeer(
(InetSocketAddress) socket.getLocalSocketAddress(), null, null);
long start = Time.now();
try {
peer.getInputStream().read();
Assert.fail("should timeout");
} catch (SocketTimeoutException ste) {
long delta = Time.now() - start;
Assert.assertTrue("timedout too soon", delta >= timeout*0.9);
Assert.assertTrue("timedout too late", delta <= timeout*1.1);
} catch (Throwable t) {
Assert.fail("wrong exception:"+t);
}
} finally {
cluster.shutdown();
}
}
}

View File

@ -89,7 +89,7 @@ public class TestFsDatasetCache {
private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
// Most Linux installs allow a default of 64KB locked memory
private static final long CACHE_CAPACITY = 64 * 1024;
static final long CACHE_CAPACITY = 64 * 1024;
// mlock always locks the entire page. So we don't need to deal with this
// rounding, use the OS page size for the block size.
private static final long PAGE_SIZE =

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFsDatasetCacheRevocation {
private static final Logger LOG = LoggerFactory.getLogger(
TestFsDatasetCacheRevocation.class);
private static CacheManipulator prevCacheManipulator;
private static TemporarySocketDirectory sockDir;
private static final int BLOCK_SIZE = 4096;
@Before
public void setUp() throws Exception {
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
DomainSocket.disableBindPathValidation();
sockDir = new TemporarySocketDirectory();
}
@After
public void tearDown() throws Exception {
// Restore the original CacheManipulator
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
sockDir.close();
}
private static Configuration getDefaultConf() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 50);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 250);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
TestFsDatasetCache.CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "sock").getAbsolutePath());
return conf;
}
/**
* Test that when a client has a replica mmapped, we will not un-mlock that
* replica for a reasonable amount of time, even if an uncache request
* occurs.
*/
@Test(timeout=120000)
public void testPinning() throws Exception {
Configuration conf = getDefaultConf();
// Set a really long revocation timeout, so that we won't reach it during
// this test.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
1800000L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should still be cached.
dfs.removeCacheDirective(cacheDirectiveId);
Thread.sleep(500);
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Un-mmap the file. The file should be uncached after this.
in.releaseBuffer(buf);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.close();
cluster.shutdown();
}
/**
* Test that when we have an uncache request, and the client refuses to release
* the replica for a long time, we will un-mlock it.
*/
@Test(timeout=120000)
public void testRevocation() throws Exception {
BlockReaderTestUtil.enableHdfsCachingTracing();
BlockReaderTestUtil.enableShortCircuitShmTracing();
Configuration conf = getDefaultConf();
// Set a really short revocation timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file2";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should get uncached.
LOG.info("removing cache directive {}", cacheDirectiveId);
dfs.removeCacheDirective(cacheDirectiveId);
LOG.info("finished removing cache directive {}", cacheDirectiveId);
Thread.sleep(1000);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.releaseBuffer(buf);
in.close();
cluster.shutdown();
}
}

View File

@ -154,6 +154,9 @@ Trunk (Unreleased)
MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to
ProportionalCapacityPreemptionPolicy (Sunil G via devaraj)
MAPREDUCE-5972. Fix typo 'programatically' in job.xml (and a few other
places) (Akira AJISAKA via aw)
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1333,7 +1333,7 @@ MapReduce Application Master REST API's.
{
"value" : "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer",
"name" : "hadoop.http.filter.initializers"
"source" : ["programatically", "job.xml"]
"source" : ["programmatically", "job.xml"]
},
{
"value" : "/home/hadoop/tmp",
@ -1379,7 +1379,7 @@ MapReduce Application Master REST API's.
<property>
<name>hadoop.http.filter.initializers</name>
<value>org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer</value>
<source>programatically</source>
<source>programmatically</source>
<source>job.xml</source>
</property>
<property>

View File

@ -1311,7 +1311,7 @@ MapReduce History Server REST API's.
{
"value" : "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer",
"name" : "hadoop.http.filter.initializers"
"source" : ["programatically", "job.xml"]
"source" : ["programmatically", "job.xml"]
},
{
"value" : "/home/hadoop/tmp",
@ -1357,7 +1357,7 @@ MapReduce History Server REST API's.
<property>
<name>hadoop.http.filter.initializers</name>
<value>org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer</value>
<source>programatically</source>
<source>programmatically</source>
<source>job.xml</source>
</property>
<property>

View File

@ -1504,7 +1504,7 @@ setDocument = Sizzle.setDocument = function( node ) {
// Support: IE<10
// Check if getElementById returns elements by name
// The broken getElementById methods don't pick up programatically-set names,
// The broken getElementById methods don't pick up programmatically-set names,
// so use a roundabout getElementsByName test
support.getById = assert(function( div ) {
docElem.appendChild( div ).id = expando;