Merge r1537584 through r1538407 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1538408 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-11-03 17:51:52 +00:00
commit 7e46025f91
36 changed files with 1432 additions and 765 deletions

View File

@ -428,6 +428,9 @@ Release 2.2.1 - UNRELEASED
HADOOP-10046. Print a log message when SSL is enabled.
(David S. Wang via wang)
HADOOP-10079. log a warning message if group resolution takes too long.
(cmccabe)
OPTIMIZATIONS
BUG FIXES
@ -452,6 +455,11 @@ Release 2.2.1 - UNRELEASED
HADOOP-10072. TestNfsExports#testMultiMatchers fails due to non-deterministic
timing around cache expiry check. (cnauroth)
HADOOP-9898. Set SO_KEEPALIVE on all our sockets. (todd via wang)
HADOOP-9478. Fix race conditions during the initialization of Configuration
related to deprecatedKeyMap (cmccabe)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -55,6 +55,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@ -65,6 +67,7 @@ import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.collections.map.UnmodifiableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -87,6 +90,7 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
import com.google.common.base.Preconditions;
/**
@ -254,13 +258,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* warning message which can be logged whenever the deprecated key is used.
*/
private static class DeprecatedKeyInfo {
private String[] newKeys;
private String customMessage;
private boolean accessed;
private final String[] newKeys;
private final String customMessage;
private final AtomicBoolean accessed = new AtomicBoolean(false);
DeprecatedKeyInfo(String[] newKeys, String customMessage) {
this.newKeys = newKeys;
this.customMessage = customMessage;
accessed = false;
}
/**
@ -286,26 +290,170 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
else {
warningMessage = customMessage;
}
accessed = true;
return warningMessage;
}
boolean getAndSetAccessed() {
return accessed.getAndSet(true);
}
public void clearAccessed() {
accessed.set(false);
}
}
/**
* Stores the deprecated keys, the new keys which replace the deprecated keys
* and custom message(if any provided).
* A pending addition to the global set of deprecated keys.
*/
private static Map<String, DeprecatedKeyInfo> deprecatedKeyMap =
new HashMap<String, DeprecatedKeyInfo>();
/**
* Stores a mapping from superseding keys to the keys which they deprecate.
*/
private static Map<String, String> reverseDeprecatedKeyMap =
new HashMap<String, String>();
public static class DeprecationDelta {
private final String key;
private final String[] newKeys;
private final String customMessage;
DeprecationDelta(String key, String[] newKeys, String customMessage) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(newKeys);
Preconditions.checkArgument(newKeys.length > 0);
this.key = key;
this.newKeys = newKeys;
this.customMessage = customMessage;
}
public DeprecationDelta(String key, String newKey, String customMessage) {
this(key, new String[] { newKey }, customMessage);
}
public DeprecationDelta(String key, String newKey) {
this(key, new String[] { newKey }, null);
}
public String getKey() {
return key;
}
public String[] getNewKeys() {
return newKeys;
}
public String getCustomMessage() {
return customMessage;
}
}
/**
* Adds the deprecated key to the deprecation map.
* The set of all keys which are deprecated.
*
* DeprecationContext objects are immutable.
*/
private static class DeprecationContext {
/**
* Stores the deprecated keys, the new keys which replace the deprecated keys
* and custom message(if any provided).
*/
private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap;
/**
* Stores a mapping from superseding keys to the keys which they deprecate.
*/
private final Map<String, String> reverseDeprecatedKeyMap;
/**
* Create a new DeprecationContext by copying a previous DeprecationContext
* and adding some deltas.
*
* @param other The previous deprecation context to copy, or null to start
* from nothing.
* @param deltas The deltas to apply.
*/
@SuppressWarnings("unchecked")
DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) {
HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap =
new HashMap<String, DeprecatedKeyInfo>();
HashMap<String, String> newReverseDeprecatedKeyMap =
new HashMap<String, String>();
if (other != null) {
for (Entry<String, DeprecatedKeyInfo> entry :
other.deprecatedKeyMap.entrySet()) {
newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
}
for (Entry<String, String> entry :
other.reverseDeprecatedKeyMap.entrySet()) {
newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
}
}
for (DeprecationDelta delta : deltas) {
if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {
DeprecatedKeyInfo newKeyInfo =
new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage());
newDeprecatedKeyMap.put(delta.key, newKeyInfo);
for (String newKey : delta.getNewKeys()) {
newReverseDeprecatedKeyMap.put(newKey, delta.key);
}
}
}
this.deprecatedKeyMap =
UnmodifiableMap.decorate(newDeprecatedKeyMap);
this.reverseDeprecatedKeyMap =
UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);
}
Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() {
return deprecatedKeyMap;
}
Map<String, String> getReverseDeprecatedKeyMap() {
return reverseDeprecatedKeyMap;
}
}
private static DeprecationDelta[] defaultDeprecations =
new DeprecationDelta[] {
new DeprecationDelta("topology.script.file.name",
CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY),
new DeprecationDelta("topology.script.number.args",
CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY),
new DeprecationDelta("hadoop.configured.node.mapping",
CommonConfigurationKeys.NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY),
new DeprecationDelta("topology.node.switch.mapping.impl",
CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY),
new DeprecationDelta("dfs.df.interval",
CommonConfigurationKeys.FS_DF_INTERVAL_KEY),
new DeprecationDelta("hadoop.native.lib",
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY),
new DeprecationDelta("fs.default.name",
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY),
new DeprecationDelta("dfs.umaskmode",
CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY)
};
/**
* The global DeprecationContext.
*/
private static AtomicReference<DeprecationContext> deprecationContext =
new AtomicReference<DeprecationContext>(
new DeprecationContext(null, defaultDeprecations));
/**
* Adds a set of deprecated keys to the global deprecations.
*
* This method is lockless. It works by means of creating a new
* DeprecationContext based on the old one, and then atomically swapping in
* the new context. If someone else updated the context in between us reading
* the old context and swapping in the new one, we try again until we win the
* race.
*
* @param deltas The deprecations to add.
*/
public static void addDeprecations(DeprecationDelta[] deltas) {
DeprecationContext prev, next;
do {
prev = deprecationContext.get();
next = new DeprecationContext(prev, deltas);
} while (!deprecationContext.compareAndSet(prev, next));
}
/**
* Adds the deprecated key to the global deprecation map.
* It does not override any existing entries in the deprecation map.
* This is to be used only by the developers in order to add deprecation of
* keys, and attempts to call this method after loading resources once,
@ -314,6 +462,9 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* If a key is deprecated in favor of multiple keys, they are all treated as
* aliases of each other, and setting any one of them resets all the others
* to the new value.
*
* If you have multiple deprecation entries to add, it is more efficient to
* use #addDeprecations(DeprecationDelta[] deltas) instead.
*
* @param key
* @param newKeys
@ -322,41 +473,35 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
String customMessage)} instead
*/
@Deprecated
public synchronized static void addDeprecation(String key, String[] newKeys,
public static void addDeprecation(String key, String[] newKeys,
String customMessage) {
if (key == null || key.length() == 0 ||
newKeys == null || newKeys.length == 0) {
throw new IllegalArgumentException();
}
if (!isDeprecated(key)) {
DeprecatedKeyInfo newKeyInfo;
newKeyInfo = new DeprecatedKeyInfo(newKeys, customMessage);
deprecatedKeyMap.put(key, newKeyInfo);
for (String newKey : newKeys) {
reverseDeprecatedKeyMap.put(newKey, key);
}
}
addDeprecations(new DeprecationDelta[] {
new DeprecationDelta(key, newKeys, customMessage)
});
}
/**
* Adds the deprecated key to the deprecation map.
* Adds the deprecated key to the global deprecation map.
* It does not override any existing entries in the deprecation map.
* This is to be used only by the developers in order to add deprecation of
* keys, and attempts to call this method after loading resources once,
* would lead to <tt>UnsupportedOperationException</tt>
*
* If you have multiple deprecation entries to add, it is more efficient to
* use #addDeprecations(DeprecationDelta[] deltas) instead.
*
* @param key
* @param newKey
* @param customMessage
*/
public synchronized static void addDeprecation(String key, String newKey,
public static void addDeprecation(String key, String newKey,
String customMessage) {
addDeprecation(key, new String[] {newKey}, customMessage);
}
/**
* Adds the deprecated key to the deprecation map when no custom message
* is provided.
* Adds the deprecated key to the global deprecation map when no custom
* message is provided.
* It does not override any existing entries in the deprecation map.
* This is to be used only by the developers in order to add deprecation of
* keys, and attempts to call this method after loading resources once,
@ -366,28 +511,34 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* aliases of each other, and setting any one of them resets all the others
* to the new value.
*
* If you have multiple deprecation entries to add, it is more efficient to
* use #addDeprecations(DeprecationDelta[] deltas) instead.
*
* @param key Key that is to be deprecated
* @param newKeys list of keys that take up the values of deprecated key
* @deprecated use {@link #addDeprecation(String key, String newKey)} instead
*/
@Deprecated
public synchronized static void addDeprecation(String key, String[] newKeys) {
public static void addDeprecation(String key, String[] newKeys) {
addDeprecation(key, newKeys, null);
}
/**
* Adds the deprecated key to the deprecation map when no custom message
* is provided.
* Adds the deprecated key to the global deprecation map when no custom
* message is provided.
* It does not override any existing entries in the deprecation map.
* This is to be used only by the developers in order to add deprecation of
* keys, and attempts to call this method after loading resources once,
* would lead to <tt>UnsupportedOperationException</tt>
*
* If you have multiple deprecation entries to add, it is more efficient to
* use #addDeprecations(DeprecationDelta[] deltas) instead.
*
* @param key Key that is to be deprecated
* @param newKey key that takes up the value of deprecated key
*/
public synchronized static void addDeprecation(String key, String newKey) {
addDeprecation(key, new String[] {newKey}, null);
public static void addDeprecation(String key, String newKey) {
addDeprecation(key, new String[] {newKey}, null);
}
/**
@ -398,7 +549,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* <code>false</code> otherwise.
*/
public static boolean isDeprecated(String key) {
return deprecatedKeyMap.containsKey(key);
return deprecationContext.get().getDeprecatedKeyMap().containsKey(key);
}
/**
@ -410,13 +561,14 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
*/
private String[] getAlternateNames(String name) {
String altNames[] = null;
DeprecatedKeyInfo keyInfo = deprecatedKeyMap.get(name);
DeprecationContext cur = deprecationContext.get();
DeprecatedKeyInfo keyInfo = cur.getDeprecatedKeyMap().get(name);
if (keyInfo == null) {
altNames = (reverseDeprecatedKeyMap.get(name) != null ) ?
new String [] {reverseDeprecatedKeyMap.get(name)} : null;
altNames = (cur.getReverseDeprecatedKeyMap().get(name) != null ) ?
new String [] {cur.getReverseDeprecatedKeyMap().get(name)} : null;
if(altNames != null && altNames.length > 0) {
//To help look for other new configs for this deprecated config
keyInfo = deprecatedKeyMap.get(altNames[0]);
keyInfo = cur.getDeprecatedKeyMap().get(altNames[0]);
}
}
if(keyInfo != null && keyInfo.newKeys.length > 0) {
@ -442,11 +594,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* @return the first property in the list of properties mapping
* the <code>name</code> or the <code>name</code> itself.
*/
private String[] handleDeprecation(String name) {
private String[] handleDeprecation(DeprecationContext deprecations,
String name) {
ArrayList<String > names = new ArrayList<String>();
if (isDeprecated(name)) {
DeprecatedKeyInfo keyInfo = deprecatedKeyMap.get(name);
warnOnceIfDeprecated(name);
DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
warnOnceIfDeprecated(deprecations, name);
for (String newKey : keyInfo.newKeys) {
if(newKey != null) {
names.add(newKey);
@ -457,7 +610,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
names.add(name);
}
for(String n : names) {
String deprecatedKey = reverseDeprecatedKeyMap.get(n);
String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n);
if (deprecatedKey != null && !getOverlay().containsKey(n) &&
getOverlay().containsKey(deprecatedKey)) {
getProps().setProperty(n, getOverlay().getProperty(deprecatedKey));
@ -469,11 +622,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
private void handleDeprecation() {
LOG.debug("Handling deprecation for all properties in config...");
DeprecationContext deprecations = deprecationContext.get();
Set<Object> keys = new HashSet<Object>();
keys.addAll(getProps().keySet());
for (Object item: keys) {
LOG.debug("Handling deprecation for " + (String)item);
handleDeprecation((String)item);
handleDeprecation(deprecations, (String)item);
}
}
@ -492,13 +646,6 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
//Add code for managing deprecated key mapping
//for example
//addDeprecation("oldKey1",new String[]{"newkey1","newkey2"});
//adds deprecation for oldKey1 to two new keys(newkey1, newkey2).
//so get or set of oldKey1 will correctly populate/access values of
//newkey1 and newkey2
addDeprecatedKeys();
}
private Properties properties;
@ -721,7 +868,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* or null if no such property exists.
*/
public String get(String name) {
String[] names = handleDeprecation(name);
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = substituteVars(getProps().getProperty(n));
@ -778,7 +925,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* its replacing property and null if no such property exists.
*/
public String getRaw(String name) {
String[] names = handleDeprecation(name);
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = getProps().getProperty(n);
@ -816,7 +963,8 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
Preconditions.checkArgument(
value != null,
"Property value must not be null");
if (deprecatedKeyMap.isEmpty()) {
DeprecationContext deprecations = deprecationContext.get();
if (deprecations.getDeprecatedKeyMap().isEmpty()) {
getProps();
}
getOverlay().setProperty(name, value);
@ -837,12 +985,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
}
}
}
warnOnceIfDeprecated(name);
warnOnceIfDeprecated(deprecations, name);
}
private void warnOnceIfDeprecated(String name) {
DeprecatedKeyInfo keyInfo = deprecatedKeyMap.get(name);
if (keyInfo != null && !keyInfo.accessed) {
private void warnOnceIfDeprecated(DeprecationContext deprecations, String name) {
DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
if (keyInfo != null && !keyInfo.getAndSetAccessed()) {
LOG_DEPRECATION.info(keyInfo.getWarningMessage(name));
}
}
@ -893,7 +1041,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* doesn't exist.
*/
public String get(String name, String defaultValue) {
String[] names = handleDeprecation(name);
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = substituteVars(getProps().getProperty(n, defaultValue));
@ -2100,6 +2248,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
if (!"configuration".equals(root.getTagName()))
LOG.fatal("bad conf file: top-level element not <configuration>");
NodeList props = root.getChildNodes();
DeprecationContext deprecations = deprecationContext.get();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element))
@ -2137,9 +2286,10 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
// Ignore this parameter if it has already been marked as 'final'
if (attr != null) {
if (deprecatedKeyMap.containsKey(attr)) {
DeprecatedKeyInfo keyInfo = deprecatedKeyMap.get(attr);
keyInfo.accessed = false;
if (deprecations.getDeprecatedKeyMap().containsKey(attr)) {
DeprecatedKeyInfo keyInfo =
deprecations.getDeprecatedKeyMap().get(attr);
keyInfo.clearAccessed();
for (String key:keyInfo.newKeys) {
// update new keys with deprecated key's value
loadProperty(toAddTo, name, key, value, finalParameter,
@ -2433,26 +2583,6 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
return result;
}
//Load deprecated keys in common
private static void addDeprecatedKeys() {
Configuration.addDeprecation("topology.script.file.name",
new String[]{CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY});
Configuration.addDeprecation("topology.script.number.args",
new String[]{CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY});
Configuration.addDeprecation("hadoop.configured.node.mapping",
new String[]{CommonConfigurationKeys.NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY});
Configuration.addDeprecation("topology.node.switch.mapping.impl",
new String[]{CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY});
Configuration.addDeprecation("dfs.df.interval",
new String[]{CommonConfigurationKeys.FS_DF_INTERVAL_KEY});
Configuration.addDeprecation("hadoop.native.lib",
new String[]{CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY});
Configuration.addDeprecation("fs.default.name",
new String[]{CommonConfigurationKeys.FS_DEFAULT_NAME_KEY});
Configuration.addDeprecation("dfs.umaskmode",
new String[]{CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY});
}
/**
* A unique class which is used as a sentinel value in the caching
* for getClassByName. {@link Configuration#getClassByNameOrNull(String)}
@ -2460,7 +2590,9 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
private static abstract class NegativeCacheSentinel {}
public static void dumpDeprecatedKeys() {
for (Map.Entry<String, DeprecatedKeyInfo> entry : deprecatedKeyMap.entrySet()) {
DeprecationContext deprecations = deprecationContext.get();
for (Map.Entry<String, DeprecatedKeyInfo> entry :
deprecations.getDeprecatedKeyMap().entrySet()) {
StringBuilder newKeys = new StringBuilder();
for (String newKey : entry.getValue().newKeys) {
newKeys.append(newKey).append("\t");

View File

@ -240,6 +240,14 @@ public class CommonConfigurationKeysPublic {
public static final String HADOOP_SECURITY_GROUPS_CACHE_SECS =
"hadoop.security.groups.cache.secs";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final long HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT =
300;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS =
"hadoop.security.groups.cache.warn.after.ms";
public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT =
5000;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_SECURITY_AUTHENTICATION =
"hadoop.security.authentication";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */

View File

@ -575,6 +575,7 @@ public class Client {
try {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(true);
/*
* Bind the socket to the host specified in the principal name of the

View File

@ -762,6 +762,7 @@ public abstract class Server {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
try {

View File

@ -50,6 +50,7 @@ public class Groups {
private final Map<String, CachedGroups> userToGroupsMap =
new ConcurrentHashMap<String, CachedGroups>();
private final long cacheTimeout;
private final long warningDeltaMs;
public Groups(Configuration conf) {
impl =
@ -60,11 +61,16 @@ public class Groups {
conf);
cacheTimeout =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 5*60) * 1000;
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
warningDeltaMs =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
if(LOG.isDebugEnabled())
LOG.debug("Group mapping impl=" + impl.getClass().getName() +
"; cacheTimeout=" + cacheTimeout);
"; cacheTimeout=" + cacheTimeout + "; warningDeltaMs=" +
warningDeltaMs);
}
/**
@ -76,17 +82,24 @@ public class Groups {
public List<String> getGroups(String user) throws IOException {
// Return cached value if available
CachedGroups groups = userToGroupsMap.get(user);
long now = Time.now();
long startMs = Time.monotonicNow();
// if cache has a value and it hasn't expired
if (groups != null && (groups.getTimestamp() + cacheTimeout > now)) {
if (groups != null && (groups.getTimestamp() + cacheTimeout > startMs)) {
if(LOG.isDebugEnabled()) {
LOG.debug("Returning cached groups for '" + user + "'");
}
return groups.getGroups();
}
// Create and cache user's groups
groups = new CachedGroups(impl.getGroups(user));
List<String> groupList = impl.getGroups(user);
long endMs = Time.monotonicNow();
long deltaMs = endMs - startMs ;
if (deltaMs > warningDeltaMs) {
LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
"took " + deltaMs + " milliseconds.");
}
groups = new CachedGroups(groupList, endMs);
if (groups.getGroups().isEmpty()) {
throw new IOException("No groups found for user " + user);
}
@ -133,9 +146,9 @@ public class Groups {
/**
* Create and initialize group cache
*/
CachedGroups(List<String> groups) {
CachedGroups(List<String> groups, long timestamp) {
this.groups = groups;
this.timestamp = Time.now();
this.timestamp = timestamp;
}
/**

View File

@ -105,6 +105,15 @@
</description>
</property>
<property>
<name>hadoop.security.groups.cache.warn.after.ms</name>
<value>5000</value>
<description>
If looking up a single user to group takes longer than this amount of
milliseconds, we will log a warning message.
</description>
</property>
<property>
<name>hadoop.security.group.mapping.ldap.url</name>
<value></value>

View File

@ -26,13 +26,28 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration.DeprecationDelta;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
public class TestConfigurationDeprecation {
private Configuration conf;
@ -320,4 +335,68 @@ public class TestConfigurationDeprecation {
assertNull(conf.get("nK"));
}
private static String getTestKeyName(int threadIndex, int testIndex) {
return "testConcurrentDeprecateAndManipulate.testKey." +
threadIndex + "." + testIndex;
}
/**
* Run a set of threads making changes to the deprecations
* concurrently with another set of threads calling get()
* and set() on Configuration objects.
*/
@SuppressWarnings("deprecation")
@Test(timeout=60000)
public void testConcurrentDeprecateAndManipulate() throws Exception {
final int NUM_THREAD_IDS = 10;
final int NUM_KEYS_PER_THREAD = 1000;
ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(2 * NUM_THREAD_IDS,
new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("testConcurrentDeprecateAndManipulate modification thread %d").
build());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger highestModificationThreadId = new AtomicInteger(1);
List<Future<Void>> futures = new LinkedList<Future<Void>>();
for (int i = 0; i < NUM_THREAD_IDS; i++) {
futures.add(executor.schedule(new Callable<Void>() {
@Override
public Void call() throws Exception {
latch.await();
int threadIndex = highestModificationThreadId.addAndGet(1);
for (int i = 0; i < NUM_KEYS_PER_THREAD; i++) {
String testKey = getTestKeyName(threadIndex, i);
String testNewKey = testKey + ".new";
Configuration.addDeprecations(
new DeprecationDelta[] {
new DeprecationDelta(testKey, testNewKey)
});
}
return null;
}
}, 0, TimeUnit.SECONDS));
}
final AtomicInteger highestAccessThreadId = new AtomicInteger(1);
for (int i = 0; i < NUM_THREAD_IDS; i++) {
futures.add(executor.schedule(new Callable<Void>() {
@Override
public Void call() throws Exception {
Configuration conf = new Configuration();
latch.await();
int threadIndex = highestAccessThreadId.addAndGet(1);
for (int i = 0; i < NUM_KEYS_PER_THREAD; i++) {
String testNewKey = getTestKeyName(threadIndex, i) + ".new";
String value = "value." + threadIndex + "." + i;
conf.set(testNewKey, value);
Assert.assertEquals(value, conf.get(testNewKey));
}
return null;
}
}, 0, TimeUnit.SECONDS));
}
latch.countDown(); // allow all threads to proceed
for (Future<Void> future : futures) {
Uninterruptibles.getUninterruptibly(future);
}
}
}

View File

@ -458,12 +458,10 @@ Release 2.3.0 - UNRELEASED
(Qus-Jiawei via kihwal)
BUG FIXES
HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin
Patrick McCabe)
HDFS-5035. getFileLinkStatus and rename do not correctly check permissions
of symlinks. (Andrew Wang via Colin Patrick McCabe)
HDFS-4816. transitionToActive blocks if the SBN is doing checkpoint image
transfer. (Andrew Wang)
@ -515,6 +513,8 @@ Release 2.2.1 - UNRELEASED
report to a configurable value. (Aaron T. Myers via Colin Patrick
McCabe)
HDFS-5344. Make LsSnapshottableDir as Tool interface implementation. (Sathish via umamahesh)
OPTIMIZATIONS
BUG FIXES
@ -574,6 +574,11 @@ Release 2.2.1 - UNRELEASED
HDFS-4633 TestDFSClientExcludedNodes fails sporadically if excluded nodes
cache expires too quickly (Chris Nauroth via Sanjay)
HDFS-5037. Active NN should trigger its own edit log rolls (wang)
HDFS-5035. getFileLinkStatus and rename do not correctly check permissions
of symlinks. (Andrew Wang via Colin Patrick McCabe)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -188,6 +188,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.DeprecationDelta;
import org.apache.hadoop.classification.InterfaceAudience;
@ -62,48 +63,83 @@ public class HdfsConfiguration extends Configuration {
public static void init() {
}
private static void deprecate(String oldKey, String newKey) {
Configuration.addDeprecation(oldKey, newKey);
}
private static void addDeprecatedKeys() {
deprecate("dfs.backup.address", DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY);
deprecate("dfs.backup.http.address", DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY);
deprecate("dfs.balance.bandwidthPerSec", DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY);
deprecate("dfs.data.dir", DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
deprecate("dfs.http.address", DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
deprecate("dfs.https.address", DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
deprecate("dfs.max.objects", DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY);
deprecate("dfs.name.dir", DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
deprecate("dfs.name.dir.restore", DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY);
deprecate("dfs.name.edits.dir", DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
deprecate("dfs.read.prefetch.size", DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY);
deprecate("dfs.safemode.extension", DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY);
deprecate("dfs.safemode.threshold.pct", DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY);
deprecate("dfs.secondary.http.address", DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
deprecate("dfs.socket.timeout", DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY);
deprecate("fs.checkpoint.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
deprecate("fs.checkpoint.edits.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY);
deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
deprecate("dfs.https.client.keystore.resource", DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY);
deprecate("dfs.https.need.client.auth", DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY);
deprecate("slave.host.name", DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
deprecate("session.id", DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
deprecate("dfs.access.time.precision", DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY);
deprecate("dfs.replication.considerLoad", DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY);
deprecate("dfs.replication.interval", DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY);
deprecate("dfs.replication.min", DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY);
deprecate("dfs.replication.pending.timeout.sec", DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY);
deprecate("dfs.max-repl-streams", DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
deprecate("dfs.permissions", DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY);
deprecate("dfs.permissions.supergroup", DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY);
deprecate("dfs.write.packet.size", DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY);
deprecate("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY);
deprecate("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY);
deprecate("io.bytes.per.checksum", DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY);
deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("dfs.backup.address",
DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY),
new DeprecationDelta("dfs.backup.http.address",
DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY),
new DeprecationDelta("dfs.balance.bandwidthPerSec",
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY),
new DeprecationDelta("dfs.data.dir",
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY),
new DeprecationDelta("dfs.http.address",
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY),
new DeprecationDelta("dfs.https.address",
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY),
new DeprecationDelta("dfs.max.objects",
DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY),
new DeprecationDelta("dfs.name.dir",
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY),
new DeprecationDelta("dfs.name.dir.restore",
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY),
new DeprecationDelta("dfs.name.edits.dir",
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY),
new DeprecationDelta("dfs.read.prefetch.size",
DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY),
new DeprecationDelta("dfs.safemode.extension",
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY),
new DeprecationDelta("dfs.safemode.threshold.pct",
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY),
new DeprecationDelta("dfs.secondary.http.address",
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY),
new DeprecationDelta("dfs.socket.timeout",
DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY),
new DeprecationDelta("fs.checkpoint.dir",
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY),
new DeprecationDelta("fs.checkpoint.edits.dir",
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY),
new DeprecationDelta("fs.checkpoint.period",
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY),
new DeprecationDelta("heartbeat.recheck.interval",
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY),
new DeprecationDelta("dfs.https.client.keystore.resource",
DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY),
new DeprecationDelta("dfs.https.need.client.auth",
DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY),
new DeprecationDelta("slave.host.name",
DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY),
new DeprecationDelta("session.id",
DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
new DeprecationDelta("dfs.access.time.precision",
DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY),
new DeprecationDelta("dfs.replication.considerLoad",
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY),
new DeprecationDelta("dfs.replication.interval",
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY),
new DeprecationDelta("dfs.replication.min",
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY),
new DeprecationDelta("dfs.replication.pending.timeout.sec",
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY),
new DeprecationDelta("dfs.max-repl-streams",
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY),
new DeprecationDelta("dfs.permissions",
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY),
new DeprecationDelta("dfs.permissions.supergroup",
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY),
new DeprecationDelta("dfs.write.packet.size",
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY),
new DeprecationDelta("dfs.block.size",
DFSConfigKeys.DFS_BLOCK_SIZE_KEY),
new DeprecationDelta("dfs.datanode.max.xcievers",
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
new DeprecationDelta("io.bytes.per.checksum",
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY),
new DeprecationDelta("dfs.federation.nameservices",
DFSConfigKeys.DFS_NAMESERVICES),
new DeprecationDelta("dfs.federation.nameservice.id",
DFSConfigKeys.DFS_NAMESERVICE_ID)
});
}
public static void main(String[] args) {

View File

@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECI
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
@ -49,6 +51,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
@ -390,6 +396,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
/**
* When an active namenode will roll its own edit log, in # edits
*/
private final long editLogRollerThreshold;
/**
* Check interval of an active namenode's edit log roller thread
*/
private final int editLogRollerInterval;
private volatile boolean hasResourcesAvailable = false;
private volatile boolean fsRunning = true;
@ -703,7 +719,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// # edit autoroll threshold is a multiple of the checkpoint threshold
this.editLogRollerThreshold = (long)
(conf.getFloat(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) *
conf.getLong(
DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
this.editLogRollerInterval = conf.getInt(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
this.inodeId = new INodeId();
// For testing purposes, allow the DT secret manager to be started regardless
@ -978,6 +1004,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
//ResourceMonitor required only at ActiveNN. See HDFS-2914
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();
nnEditLogRoller = new Daemon(new NameNodeEditLogRoller(
editLogRollerThreshold, editLogRollerInterval));
nnEditLogRoller.start();
cacheManager.activate();
blockManager.getDatanodeManager().setSendCachingCommands(true);
} finally {
@ -1017,6 +1048,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor();
nnrmthread.interrupt();
}
if (nnEditLogRoller != null) {
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
nnEditLogRoller.interrupt();
}
if (dir != null && dir.fsImage != null) {
if (dir.fsImage.editLog != null) {
dir.fsImage.editLog.close();
@ -4159,7 +4194,48 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
shouldNNRmRun = false;
}
}
class NameNodeEditLogRoller implements Runnable {
private boolean shouldRun = true;
private final long rollThreshold;
private final long sleepIntervalMs;
public NameNodeEditLogRoller(long rollThreshold, int sleepIntervalMs) {
this.rollThreshold = rollThreshold;
this.sleepIntervalMs = sleepIntervalMs;
}
@Override
public void run() {
while (fsRunning && shouldRun) {
try {
FSEditLog editLog = getFSImage().getEditLog();
long numEdits =
editLog.getLastWrittenTxId() - editLog.getCurSegmentTxId();
if (numEdits > rollThreshold) {
FSNamesystem.LOG.info("NameNode rolling its own edit log because"
+ " number of edits in open segment exceeds threshold of "
+ rollThreshold);
rollEditLog();
}
Thread.sleep(sleepIntervalMs);
} catch (InterruptedException e) {
FSNamesystem.LOG.info(NameNodeEditLogRoller.class.getSimpleName()
+ " was interrupted, exiting");
break;
} catch (Exception e) {
FSNamesystem.LOG.error("Swallowing exception in "
+ NameNodeEditLogRoller.class.getSimpleName() + ":", e);
}
}
}
public void stop() {
shouldRun = false;
}
}
public FSImage getFSImage() {
return dir.fsImage;
}
@ -5176,7 +5252,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
checkOperation(OperationCategory.JOURNAL);
checkNameNodeSafeMode("Log not rolled");
LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
if (Server.isRpcInvocation()) {
LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
}
return getFSImage().rollEditLog();
} finally {
writeUnlock();

View File

@ -38,7 +38,7 @@ public class ActiveState extends HAState {
@Override
public void checkOperation(HAContext context, OperationCategory op) {
return; // Other than journal all operations are allowed in active state
return; // All operations are allowed in active state
}
@Override

View File

@ -21,9 +21,12 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A tool used to list all snapshottable directories that are owned by the
@ -31,23 +34,23 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
* is a super user.
*/
@InterfaceAudience.Private
public class LsSnapshottableDir {
public static void main(String[] argv) throws IOException {
public class LsSnapshottableDir extends Configured implements Tool {
@Override
public int run(String[] argv) throws Exception {
String description = "LsSnapshottableDir: \n" +
"\tGet the list of snapshottable directories that are owned by the current user.\n" +
"\tReturn all the snapshottable directories if the current user is a super user.\n";
if(argv.length != 0) {
System.err.println("Usage: \n" + description);
System.exit(1);
return 1;
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileSystem fs = FileSystem.get(getConf());
if (! (fs instanceof DistributedFileSystem)) {
System.err.println(
"LsSnapshottableDir can only be used in DistributedFileSystem");
System.exit(1);
return 1;
}
DistributedFileSystem dfs = (DistributedFileSystem) fs;
@ -57,7 +60,12 @@ public class LsSnapshottableDir {
} catch (IOException e) {
String[] content = e.getLocalizedMessage().split("\n");
System.err.println("lsSnapshottableDir: " + content[0]);
return 1;
}
return 0;
}
public static void main(String[] argv) throws Exception {
int rc = ToolRunner.run(new LsSnapshottableDir(), argv);
System.exit(rc);
}
}

View File

@ -1543,4 +1543,29 @@
</description>
</property>
<property>
<name>dfs.namenode.edit.log.autoroll.multiplier.threshold</name>
<value>2.0</value>
<description>
Determines when an active namenode will roll its own edit log.
The actual threshold (in number of edits) is determined by multiplying
this value by dfs.namenode.checkpoint.txns.
This prevents extremely large edit files from accumulating on the active
namenode, which can cause timeouts during namenode startup and pose an
administrative hassle. This behavior is intended as a failsafe for when
the standby or secondary namenode fail to roll the edit log by the normal
checkpoint threshold.
</description>
</property>
<property>
<name>dfs.namenode.edit.log.autoroll.check.interval.ms</name>
<value>300000</value>
<description>
How often an active namenode will check if it needs to roll its edit log,
in milliseconds.
</description>
</property>
</configuration>

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
public class TestEditLogAutoroll {
private Configuration conf;
private MiniDFSCluster cluster;
private NameNode nn0;
private FileSystem fs;
private FSEditLog editLog;
@Before
public void setUp() throws Exception {
conf = new Configuration();
// Stall the standby checkpointer in two ways
conf.setLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, Long.MAX_VALUE);
conf.setLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 20);
// Make it autoroll after 10 edits
conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f);
conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10061))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10062)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.waitActive();
nn0 = cluster.getNameNode(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
cluster.transitionToActive(0);
fs = cluster.getFileSystem(0);
editLog = nn0.getNamesystem().getEditLog();
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
@Test(timeout=60000)
public void testEditLogAutoroll() throws Exception {
// Make some edits
final long startTxId = editLog.getCurSegmentTxId();
for (int i=0; i<11; i++) {
fs.mkdirs(new Path("testEditLogAutoroll-" + i));
}
// Wait for the NN to autoroll
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return editLog.getCurSegmentTxId() > startTxId;
}
}, 1000, 5000);
// Transition to standby and make sure the roller stopped
nn0.transitionToStandby();
GenericTestUtils.assertNoThreadsMatching(
".*" + NameNodeEditLogRoller.class.getSimpleName() + ".*");
}
}

View File

@ -171,6 +171,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5487. In task processes, JobConf is unnecessarily loaded again
in Limits (Sandy Ryza)
MAPREDUCE-5601. ShuffleHandler fadvises file regions as DONTNEED even when
fetch fails (Sandy Ryza)
BUG FIXES
MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal
@ -237,6 +240,9 @@ Release 2.2.1 - UNRELEASED
MAPREDUCE-5598. TestUserDefinedCounters.testMapReduceJob is flakey
(Robert Kanter via jlowe)
MAPREDUCE-5604. TestMRAMWithNonNormalizedCapabilities fails on Windows due to
exceeding max path length. (cnauroth)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.DeprecationDelta;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -50,487 +51,483 @@ public class ConfigUtil {
*/
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Configuration.addDeprecation("mapred.temp.dir",
new String[] {MRConfig.TEMP_DIR});
Configuration.addDeprecation("mapred.local.dir",
new String[] {MRConfig.LOCAL_DIR});
Configuration.addDeprecation("mapred.cluster.map.memory.mb",
new String[] {MRConfig.MAPMEMORY_MB});
Configuration.addDeprecation("mapred.cluster.reduce.memory.mb",
new String[] {MRConfig.REDUCEMEMORY_MB});
Configuration.addDeprecation("mapred.acls.enabled",
new String[] {MRConfig.MR_ACLS_ENABLED});
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("mapred.temp.dir",
MRConfig.TEMP_DIR),
new DeprecationDelta("mapred.local.dir",
MRConfig.LOCAL_DIR),
new DeprecationDelta("mapred.cluster.map.memory.mb",
MRConfig.MAPMEMORY_MB),
new DeprecationDelta("mapred.cluster.reduce.memory.mb",
MRConfig.REDUCEMEMORY_MB),
new DeprecationDelta("mapred.acls.enabled",
MRConfig.MR_ACLS_ENABLED),
Configuration.addDeprecation("mapred.cluster.max.map.memory.mb",
new String[] {JTConfig.JT_MAX_MAPMEMORY_MB});
Configuration.addDeprecation("mapred.cluster.max.reduce.memory.mb",
new String[] {JTConfig.JT_MAX_REDUCEMEMORY_MB});
new DeprecationDelta("mapred.cluster.max.map.memory.mb",
JTConfig.JT_MAX_MAPMEMORY_MB),
new DeprecationDelta("mapred.cluster.max.reduce.memory.mb",
JTConfig.JT_MAX_REDUCEMEMORY_MB),
Configuration.addDeprecation("mapred.cluster.average.blacklist.threshold",
new String[] {JTConfig.JT_AVG_BLACKLIST_THRESHOLD});
Configuration.addDeprecation("hadoop.job.history.location",
new String[] {JTConfig.JT_JOBHISTORY_LOCATION});
Configuration.addDeprecation(
"mapred.job.tracker.history.completed.location",
new String[] {JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION});
Configuration.addDeprecation("mapred.jobtracker.job.history.block.size",
new String[] {JTConfig.JT_JOBHISTORY_BLOCK_SIZE});
Configuration.addDeprecation("mapred.job.tracker.jobhistory.lru.cache.size",
new String[] {JTConfig.JT_JOBHISTORY_CACHE_SIZE});
Configuration.addDeprecation("mapred.hosts",
new String[] {JTConfig.JT_HOSTS_FILENAME});
Configuration.addDeprecation("mapred.hosts.exclude",
new String[] {JTConfig.JT_HOSTS_EXCLUDE_FILENAME});
Configuration.addDeprecation("mapred.system.dir",
new String[] {JTConfig.JT_SYSTEM_DIR});
Configuration.addDeprecation("mapred.max.tracker.blacklists",
new String[] {JTConfig.JT_MAX_TRACKER_BLACKLISTS});
Configuration.addDeprecation("mapred.job.tracker",
new String[] {JTConfig.JT_IPC_ADDRESS});
Configuration.addDeprecation("mapred.job.tracker.http.address",
new String[] {JTConfig.JT_HTTP_ADDRESS});
Configuration.addDeprecation("mapred.job.tracker.handler.count",
new String[] {JTConfig.JT_IPC_HANDLER_COUNT});
Configuration.addDeprecation("mapred.jobtracker.restart.recover",
new String[] {JTConfig.JT_RESTART_ENABLED});
Configuration.addDeprecation("mapred.jobtracker.taskScheduler",
new String[] {JTConfig.JT_TASK_SCHEDULER});
Configuration.addDeprecation(
"mapred.jobtracker.taskScheduler.maxRunningTasksPerJob",
new String[] {JTConfig.JT_RUNNINGTASKS_PER_JOB});
Configuration.addDeprecation("mapred.jobtracker.instrumentation",
new String[] {JTConfig.JT_INSTRUMENTATION});
Configuration.addDeprecation("mapred.jobtracker.maxtasks.per.job",
new String[] {JTConfig.JT_TASKS_PER_JOB});
Configuration.addDeprecation("mapred.heartbeats.in.second",
new String[] {JTConfig.JT_HEARTBEATS_IN_SECOND});
Configuration.addDeprecation("mapred.job.tracker.persist.jobstatus.active",
new String[] {JTConfig.JT_PERSIST_JOBSTATUS});
Configuration.addDeprecation("mapred.job.tracker.persist.jobstatus.hours",
new String[] {JTConfig.JT_PERSIST_JOBSTATUS_HOURS});
Configuration.addDeprecation("mapred.job.tracker.persist.jobstatus.dir",
new String[] {JTConfig.JT_PERSIST_JOBSTATUS_DIR});
Configuration.addDeprecation("mapred.permissions.supergroup",
new String[] {MRConfig.MR_SUPERGROUP});
Configuration.addDeprecation("mapreduce.jobtracker.permissions.supergroup",
new String[] {MRConfig.MR_SUPERGROUP});
Configuration.addDeprecation("mapred.task.cache.levels",
new String[] {JTConfig.JT_TASKCACHE_LEVELS});
Configuration.addDeprecation("mapred.jobtracker.taskalloc.capacitypad",
new String[] {JTConfig.JT_TASK_ALLOC_PAD_FRACTION});
Configuration.addDeprecation("mapred.jobinit.threads",
new String[] {JTConfig.JT_JOBINIT_THREADS});
Configuration.addDeprecation("mapred.tasktracker.expiry.interval",
new String[] {JTConfig.JT_TRACKER_EXPIRY_INTERVAL});
Configuration.addDeprecation("mapred.job.tracker.retiredjobs.cache.size",
new String[] {JTConfig.JT_RETIREJOB_CACHE_SIZE});
Configuration.addDeprecation("mapred.job.tracker.retire.jobs",
new String[] {JTConfig.JT_RETIREJOBS});
Configuration.addDeprecation("mapred.healthChecker.interval",
new String[] {TTConfig.TT_HEALTH_CHECKER_INTERVAL});
Configuration.addDeprecation("mapred.healthChecker.script.args",
new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_ARGS});
Configuration.addDeprecation("mapred.healthChecker.script.path",
new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH});
Configuration.addDeprecation("mapred.healthChecker.script.timeout",
new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_TIMEOUT});
Configuration.addDeprecation("mapred.local.dir.minspacekill",
new String[] {TTConfig.TT_LOCAL_DIR_MINSPACE_KILL});
Configuration.addDeprecation("mapred.local.dir.minspacestart",
new String[] {TTConfig.TT_LOCAL_DIR_MINSPACE_START});
Configuration.addDeprecation("mapred.task.tracker.http.address",
new String[] {TTConfig.TT_HTTP_ADDRESS});
Configuration.addDeprecation("mapred.task.tracker.report.address",
new String[] {TTConfig.TT_REPORT_ADDRESS});
Configuration.addDeprecation("mapred.task.tracker.task-controller",
new String[] {TTConfig.TT_TASK_CONTROLLER});
Configuration.addDeprecation("mapred.tasktracker.dns.interface",
new String[] {TTConfig.TT_DNS_INTERFACE});
Configuration.addDeprecation("mapred.tasktracker.dns.nameserver",
new String[] {TTConfig.TT_DNS_NAMESERVER});
Configuration.addDeprecation("mapred.tasktracker.events.batchsize",
new String[] {TTConfig.TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL});
Configuration.addDeprecation("mapred.tasktracker.indexcache.mb",
new String[] {TTConfig.TT_INDEX_CACHE});
Configuration.addDeprecation("mapred.tasktracker.instrumentation",
new String[] {TTConfig.TT_INSTRUMENTATION});
Configuration.addDeprecation("mapred.tasktracker.map.tasks.maximum",
new String[] {TTConfig.TT_MAP_SLOTS});
Configuration.addDeprecation("mapred.tasktracker.memory_calculator_plugin",
new String[] {TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN});
Configuration.addDeprecation("mapred.tasktracker.memorycalculatorplugin",
new String[] {TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN});
Configuration.addDeprecation("mapred.tasktracker.reduce.tasks.maximum",
new String[] {TTConfig.TT_REDUCE_SLOTS});
Configuration.addDeprecation(
"mapred.tasktracker.taskmemorymanager.monitoring-interval",
new String[] {TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL});
Configuration.addDeprecation(
"mapred.tasktracker.tasks.sleeptime-before-sigkill",
new String[] {TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL});
Configuration.addDeprecation("slave.host.name",
new String[] {TTConfig.TT_HOST_NAME});
Configuration.addDeprecation("tasktracker.http.threads",
new String[] {TTConfig.TT_HTTP_THREADS});
Configuration.addDeprecation("hadoop.net.static.resolutions",
new String[] {TTConfig.TT_STATIC_RESOLUTIONS});
Configuration.addDeprecation("local.cache.size",
new String[] {TTConfig.TT_LOCAL_CACHE_SIZE});
Configuration.addDeprecation("tasktracker.contention.tracking",
new String[] {TTConfig.TT_CONTENTION_TRACKING});
Configuration.addDeprecation("job.end.notification.url",
new String[] {MRJobConfig.MR_JOB_END_NOTIFICATION_URL});
Configuration.addDeprecation("job.end.retry.attempts",
new String[] {MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS});
Configuration.addDeprecation("job.end.retry.interval",
new String[] {MRJobConfig.MR_JOB_END_RETRY_INTERVAL});
Configuration.addDeprecation("mapred.committer.job.setup.cleanup.needed",
new String[] {MRJobConfig.SETUP_CLEANUP_NEEDED});
Configuration.addDeprecation("mapred.jar",
new String[] {MRJobConfig.JAR});
Configuration.addDeprecation("mapred.job.id",
new String[] {MRJobConfig.ID});
Configuration.addDeprecation("mapred.job.name",
new String[] {MRJobConfig.JOB_NAME});
Configuration.addDeprecation("mapred.job.priority",
new String[] {MRJobConfig.PRIORITY});
Configuration.addDeprecation("mapred.job.queue.name",
new String[] {MRJobConfig.QUEUE_NAME});
Configuration.addDeprecation("mapred.job.reuse.jvm.num.tasks",
new String[] {MRJobConfig.JVM_NUMTASKS_TORUN});
Configuration.addDeprecation("mapred.map.tasks",
new String[] {MRJobConfig.NUM_MAPS});
Configuration.addDeprecation("mapred.max.tracker.failures",
new String[] {MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER});
Configuration.addDeprecation("mapred.reduce.slowstart.completed.maps",
new String[] {MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART});
Configuration.addDeprecation("mapred.reduce.tasks",
new String[] {MRJobConfig.NUM_REDUCES});
Configuration.addDeprecation("mapred.skip.on",
new String[] {MRJobConfig.SKIP_RECORDS});
Configuration.addDeprecation("mapred.skip.out.dir",
new String[] {MRJobConfig.SKIP_OUTDIR});
Configuration.addDeprecation(
"mapred.speculative.execution.slowNodeThreshold",
new String[] {MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD});
Configuration.addDeprecation(
"mapred.speculative.execution.slowTaskThreshold",
new String[] {MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD});
Configuration.addDeprecation("mapred.speculative.execution.speculativeCap",
new String[] {MRJobConfig.SPECULATIVECAP});
Configuration.addDeprecation("job.local.dir",
new String[] {MRJobConfig.JOB_LOCAL_DIR});
Configuration.addDeprecation("mapreduce.inputformat.class",
new String[] {MRJobConfig.INPUT_FORMAT_CLASS_ATTR});
Configuration.addDeprecation("mapreduce.map.class",
new String[] {MRJobConfig.MAP_CLASS_ATTR});
Configuration.addDeprecation("mapreduce.combine.class",
new String[] {MRJobConfig.COMBINE_CLASS_ATTR});
Configuration.addDeprecation("mapreduce.reduce.class",
new String[] {MRJobConfig.REDUCE_CLASS_ATTR});
Configuration.addDeprecation("mapreduce.outputformat.class",
new String[] {MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR});
Configuration.addDeprecation("mapreduce.partitioner.class",
new String[] {MRJobConfig.PARTITIONER_CLASS_ATTR});
Configuration.addDeprecation("mapred.job.classpath.archives",
new String[] {MRJobConfig.CLASSPATH_ARCHIVES});
Configuration.addDeprecation("mapred.job.classpath.files",
new String[] {MRJobConfig.CLASSPATH_FILES});
Configuration.addDeprecation("mapred.cache.files",
new String[] {MRJobConfig.CACHE_FILES});
Configuration.addDeprecation("mapred.cache.archives",
new String[] {MRJobConfig.CACHE_ARCHIVES});
Configuration.addDeprecation("mapred.cache.localFiles",
new String[] {MRJobConfig.CACHE_LOCALFILES});
Configuration.addDeprecation("mapred.cache.localArchives",
new String[] {MRJobConfig.CACHE_LOCALARCHIVES});
Configuration.addDeprecation("mapred.cache.files.filesizes",
new String[] {MRJobConfig.CACHE_FILES_SIZES});
Configuration.addDeprecation("mapred.cache.archives.filesizes",
new String[] {MRJobConfig.CACHE_ARCHIVES_SIZES});
Configuration.addDeprecation("mapred.cache.files.timestamps",
new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
Configuration.addDeprecation("mapred.cache.archives.timestamps",
new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS});
Configuration.addDeprecation("mapred.working.dir",
new String[] {MRJobConfig.WORKING_DIR});
Configuration.addDeprecation("user.name",
new String[] {MRJobConfig.USER_NAME});
Configuration.addDeprecation("mapred.output.key.class",
new String[] {MRJobConfig.OUTPUT_KEY_CLASS});
Configuration.addDeprecation("mapred.output.value.class",
new String[] {MRJobConfig.OUTPUT_VALUE_CLASS});
Configuration.addDeprecation("mapred.output.value.groupfn.class",
new String[] {MRJobConfig.GROUP_COMPARATOR_CLASS});
Configuration.addDeprecation("mapred.output.key.comparator.class",
new String[] {MRJobConfig.KEY_COMPARATOR});
Configuration.addDeprecation("io.sort.factor",
new String[] {MRJobConfig.IO_SORT_FACTOR});
Configuration.addDeprecation("io.sort.mb",
new String[] {MRJobConfig.IO_SORT_MB});
Configuration.addDeprecation("keep.failed.task.files",
new String[] {MRJobConfig.PRESERVE_FAILED_TASK_FILES});
Configuration.addDeprecation("keep.task.files.pattern",
new String[] {MRJobConfig.PRESERVE_FILES_PATTERN});
Configuration.addDeprecation("mapred.child.tmp",
new String[] {MRJobConfig.TASK_TEMP_DIR});
Configuration.addDeprecation("mapred.debug.out.lines",
new String[] {MRJobConfig.TASK_DEBUGOUT_LINES});
Configuration.addDeprecation("mapred.merge.recordsBeforeProgress",
new String[] {MRJobConfig.RECORDS_BEFORE_PROGRESS});
Configuration.addDeprecation("mapred.skip.attempts.to.start.skipping",
new String[] {MRJobConfig.SKIP_START_ATTEMPTS});
Configuration.addDeprecation("mapred.task.id",
new String[] {MRJobConfig.TASK_ATTEMPT_ID});
Configuration.addDeprecation("mapred.task.is.map",
new String[] {MRJobConfig.TASK_ISMAP});
Configuration.addDeprecation("mapred.task.partition",
new String[] {MRJobConfig.TASK_PARTITION});
Configuration.addDeprecation("mapred.task.profile",
new String[] {MRJobConfig.TASK_PROFILE});
Configuration.addDeprecation("mapred.task.profile.maps",
new String[] {MRJobConfig.NUM_MAP_PROFILES});
Configuration.addDeprecation("mapred.task.profile.reduces",
new String[] {MRJobConfig.NUM_REDUCE_PROFILES});
Configuration.addDeprecation("mapred.task.timeout",
new String[] {MRJobConfig.TASK_TIMEOUT});
Configuration.addDeprecation("mapred.tip.id",
new String[] {MRJobConfig.TASK_ID});
Configuration.addDeprecation("mapred.work.output.dir",
new String[] {MRJobConfig.TASK_OUTPUT_DIR});
Configuration.addDeprecation("mapred.userlog.limit.kb",
new String[] {MRJobConfig.TASK_USERLOG_LIMIT});
Configuration.addDeprecation("mapred.userlog.retain.hours",
new String[] {MRJobConfig.USER_LOG_RETAIN_HOURS});
Configuration.addDeprecation("mapred.task.profile.params",
new String[] {MRJobConfig.TASK_PROFILE_PARAMS});
Configuration.addDeprecation("io.sort.spill.percent",
new String[] {MRJobConfig.MAP_SORT_SPILL_PERCENT});
Configuration.addDeprecation("map.input.file",
new String[] {MRJobConfig.MAP_INPUT_FILE});
Configuration.addDeprecation("map.input.length",
new String[] {MRJobConfig.MAP_INPUT_PATH});
Configuration.addDeprecation("map.input.start",
new String[] {MRJobConfig.MAP_INPUT_START});
Configuration.addDeprecation("mapred.job.map.memory.mb",
new String[] {MRJobConfig.MAP_MEMORY_MB});
Configuration.addDeprecation("mapred.map.child.env",
new String[] {MRJobConfig.MAP_ENV});
Configuration.addDeprecation("mapred.map.child.java.opts",
new String[] {MRJobConfig.MAP_JAVA_OPTS});
Configuration.addDeprecation("mapred.map.max.attempts",
new String[] {MRJobConfig.MAP_MAX_ATTEMPTS});
Configuration.addDeprecation("mapred.map.task.debug.script",
new String[] {MRJobConfig.MAP_DEBUG_SCRIPT});
Configuration.addDeprecation("mapred.map.tasks.speculative.execution",
new String[] {MRJobConfig.MAP_SPECULATIVE});
Configuration.addDeprecation("mapred.max.map.failures.percent",
new String[] {MRJobConfig.MAP_FAILURES_MAX_PERCENT});
Configuration.addDeprecation("mapred.skip.map.auto.incr.proc.count",
new String[] {MRJobConfig.MAP_SKIP_INCR_PROC_COUNT});
Configuration.addDeprecation("mapred.skip.map.max.skip.records",
new String[] {MRJobConfig.MAP_SKIP_MAX_RECORDS});
Configuration.addDeprecation("min.num.spills.for.combine",
new String[] {MRJobConfig.MAP_COMBINE_MIN_SPILLS});
Configuration.addDeprecation("mapred.compress.map.output",
new String[] {MRJobConfig.MAP_OUTPUT_COMPRESS});
Configuration.addDeprecation("mapred.map.output.compression.codec",
new String[] {MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC});
Configuration.addDeprecation("mapred.mapoutput.key.class",
new String[] {MRJobConfig.MAP_OUTPUT_KEY_CLASS});
Configuration.addDeprecation("mapred.mapoutput.value.class",
new String[] {MRJobConfig.MAP_OUTPUT_VALUE_CLASS});
Configuration.addDeprecation("map.output.key.field.separator",
new String[] {MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR});
Configuration.addDeprecation("mapred.map.child.log.level",
new String[] {MRJobConfig.MAP_LOG_LEVEL});
Configuration.addDeprecation("mapred.inmem.merge.threshold",
new String[] {MRJobConfig.REDUCE_MERGE_INMEM_THRESHOLD});
Configuration.addDeprecation("mapred.job.reduce.input.buffer.percent",
new String[] {MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.reduce.markreset.buffer.percent",
new String[] {MRJobConfig.REDUCE_MARKRESET_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.reduce.memory.mb",
new String[] {MRJobConfig.REDUCE_MEMORY_MB});
Configuration.addDeprecation("mapred.job.reduce.total.mem.bytes",
new String[] {MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES});
Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent",
new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.shuffle.merge.percent",
new String[] {MRJobConfig.SHUFFLE_MERGE_PERCENT});
Configuration.addDeprecation("mapred.max.reduce.failures.percent",
new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT});
Configuration.addDeprecation("mapred.reduce.child.env",
new String[] {MRJobConfig.REDUCE_ENV});
Configuration.addDeprecation("mapred.reduce.child.java.opts",
new String[] {MRJobConfig.REDUCE_JAVA_OPTS});
Configuration.addDeprecation("mapred.reduce.max.attempts",
new String[] {MRJobConfig.REDUCE_MAX_ATTEMPTS});
Configuration.addDeprecation("mapred.reduce.parallel.copies",
new String[] {MRJobConfig.SHUFFLE_PARALLEL_COPIES});
Configuration.addDeprecation("mapred.reduce.task.debug.script",
new String[] {MRJobConfig.REDUCE_DEBUG_SCRIPT});
Configuration.addDeprecation("mapred.reduce.tasks.speculative.execution",
new String[] {MRJobConfig.REDUCE_SPECULATIVE});
Configuration.addDeprecation("mapred.shuffle.connect.timeout",
new String[] {MRJobConfig.SHUFFLE_CONNECT_TIMEOUT});
Configuration.addDeprecation("mapred.shuffle.read.timeout",
new String[] {MRJobConfig.SHUFFLE_READ_TIMEOUT});
Configuration.addDeprecation("mapred.skip.reduce.auto.incr.proc.count",
new String[] {MRJobConfig.REDUCE_SKIP_INCR_PROC_COUNT});
Configuration.addDeprecation("mapred.skip.reduce.max.skip.groups",
new String[] {MRJobConfig.REDUCE_SKIP_MAXGROUPS});
Configuration.addDeprecation("mapred.reduce.child.log.level",
new String[] {MRJobConfig.REDUCE_LOG_LEVEL});
Configuration.addDeprecation("mapreduce.job.counters.limit",
new String[] {MRJobConfig.COUNTERS_MAX_KEY});
Configuration.addDeprecation("jobclient.completion.poll.interval",
new String[] {Job.COMPLETION_POLL_INTERVAL_KEY});
Configuration.addDeprecation("jobclient.progress.monitor.poll.interval",
new String[] {Job.PROGRESS_MONITOR_POLL_INTERVAL_KEY});
Configuration.addDeprecation("jobclient.output.filter",
new String[] {Job.OUTPUT_FILTER});
Configuration.addDeprecation("mapred.submit.replication",
new String[] {Job.SUBMIT_REPLICATION});
Configuration.addDeprecation("mapred.used.genericoptionsparser",
new String[] {Job.USED_GENERIC_PARSER});
Configuration.addDeprecation("mapred.input.dir",
new String[] {
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR});
Configuration.addDeprecation("mapred.input.pathFilter.class",
new String[] {org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.PATHFILTER_CLASS});
Configuration.addDeprecation("mapred.max.split.size",
new String[] {org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MAXSIZE});
Configuration.addDeprecation("mapred.min.split.size",
new String[] {org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE});
Configuration.addDeprecation("mapred.output.compress",
new String[] {org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.COMPRESS});
Configuration.addDeprecation("mapred.output.compression.codec",
new String[] {org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.COMPRESS_CODEC});
Configuration.addDeprecation("mapred.output.compression.type",
new String[] {org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.COMPRESS_TYPE});
Configuration.addDeprecation("mapred.output.dir",
new String[] {org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.OUTDIR});
Configuration.addDeprecation("mapred.seqbinary.output.key.class",
new String[] {org.apache.hadoop.mapreduce.lib.output.
SequenceFileAsBinaryOutputFormat.KEY_CLASS});
Configuration.addDeprecation("mapred.seqbinary.output.value.class",
new String[] {org.apache.hadoop.mapreduce.lib.output.
SequenceFileAsBinaryOutputFormat.VALUE_CLASS});
Configuration.addDeprecation("sequencefile.filter.class",
new String[] {org.apache.hadoop.mapreduce.lib.input.
SequenceFileInputFilter.FILTER_CLASS});
Configuration.addDeprecation("sequencefile.filter.regex",
new String[] {org.apache.hadoop.mapreduce.lib.input.
SequenceFileInputFilter.FILTER_REGEX});
Configuration.addDeprecation("sequencefile.filter.frequency",
new String[] {org.apache.hadoop.mapreduce.lib.input.
SequenceFileInputFilter.FILTER_FREQUENCY});
Configuration.addDeprecation("mapred.input.dir.mappers",
new String[] {org.apache.hadoop.mapreduce.lib.input.
MultipleInputs.DIR_MAPPERS});
Configuration.addDeprecation("mapred.input.dir.formats",
new String[] {org.apache.hadoop.mapreduce.lib.input.
MultipleInputs.DIR_FORMATS});
Configuration.addDeprecation("mapred.line.input.format.linespermap",
new String[] {org.apache.hadoop.mapreduce.lib.input.
NLineInputFormat.LINES_PER_MAP});
Configuration.addDeprecation("mapred.binary.partitioner.left.offset",
new String[] {org.apache.hadoop.mapreduce.lib.partition.
BinaryPartitioner.LEFT_OFFSET_PROPERTY_NAME});
Configuration.addDeprecation("mapred.binary.partitioner.right.offset",
new String[] {org.apache.hadoop.mapreduce.lib.partition.
BinaryPartitioner.RIGHT_OFFSET_PROPERTY_NAME});
Configuration.addDeprecation("mapred.text.key.comparator.options",
new String[] {org.apache.hadoop.mapreduce.lib.partition.
KeyFieldBasedComparator.COMPARATOR_OPTIONS});
Configuration.addDeprecation("mapred.text.key.partitioner.options",
new String[] {org.apache.hadoop.mapreduce.lib.partition.
KeyFieldBasedPartitioner.PARTITIONER_OPTIONS});
Configuration.addDeprecation("mapred.mapper.regex.group",
new String[] {org.apache.hadoop.mapreduce.lib.map.RegexMapper.GROUP});
Configuration.addDeprecation("mapred.mapper.regex",
new String[] {org.apache.hadoop.mapreduce.lib.map.RegexMapper.PATTERN});
Configuration.addDeprecation("create.empty.dir.if.nonexist",
new String[] {org.apache.hadoop.mapreduce.lib.jobcontrol.
ControlledJob.CREATE_DIR});
Configuration.addDeprecation("mapred.data.field.separator",
new String[] {org.apache.hadoop.mapreduce.lib.fieldsel.
FieldSelectionHelper.DATA_FIELD_SEPERATOR});
Configuration.addDeprecation("map.output.key.value.fields.spec",
new String[] {org.apache.hadoop.mapreduce.lib.fieldsel.
FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC});
Configuration.addDeprecation("reduce.output.key.value.fields.spec",
new String[] {org.apache.hadoop.mapreduce.lib.fieldsel.
FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC});
Configuration.addDeprecation("mapred.min.split.size.per.node",
new String[] {org.apache.hadoop.mapreduce.lib.input.
CombineFileInputFormat.SPLIT_MINSIZE_PERNODE});
Configuration.addDeprecation("mapred.min.split.size.per.rack",
new String[] {org.apache.hadoop.mapreduce.lib.input.
CombineFileInputFormat.SPLIT_MINSIZE_PERRACK});
Configuration.addDeprecation("key.value.separator.in.input.line",
new String[] {org.apache.hadoop.mapreduce.lib.input.
KeyValueLineRecordReader.KEY_VALUE_SEPERATOR});
Configuration.addDeprecation("mapred.linerecordreader.maxlength",
new String[] {org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH});
Configuration.addDeprecation("mapred.lazy.output.format",
new String[] {org.apache.hadoop.mapreduce.lib.output.
LazyOutputFormat.OUTPUT_FORMAT});
Configuration.addDeprecation("mapred.textoutputformat.separator",
new String[] {org.apache.hadoop.mapreduce.lib.output.
TextOutputFormat.SEPERATOR});
Configuration.addDeprecation("mapred.join.expr",
new String[] {org.apache.hadoop.mapreduce.lib.join.
CompositeInputFormat.JOIN_EXPR});
Configuration.addDeprecation("mapred.join.keycomparator",
new String[] {org.apache.hadoop.mapreduce.lib.join.
CompositeInputFormat.JOIN_COMPARATOR});
Configuration.addDeprecation("hadoop.pipes.command-file.keep",
new String[] {org.apache.hadoop.mapred.pipes.
Submitter.PRESERVE_COMMANDFILE});
Configuration.addDeprecation("hadoop.pipes.executable",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.EXECUTABLE});
Configuration.addDeprecation("hadoop.pipes.executable.interpretor",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.INTERPRETOR});
Configuration.addDeprecation("hadoop.pipes.java.mapper",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_MAP});
Configuration.addDeprecation("hadoop.pipes.java.recordreader",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_RR});
Configuration.addDeprecation("hadoop.pipes.java.recordwriter",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_RW});
Configuration.addDeprecation("hadoop.pipes.java.reducer",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_REDUCE});
Configuration.addDeprecation("hadoop.pipes.partitioner",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.PARTITIONER});
Configuration.addDeprecation("mapred.pipes.user.inputformat",
new String[] {org.apache.hadoop.mapred.pipes.Submitter.INPUT_FORMAT});
Configuration.addDeprecation("webinterface.private.actions",
new String[]{JTConfig.PRIVATE_ACTIONS_KEY});
Configuration.addDeprecation("security.task.umbilical.protocol.acl",
new String[] {
MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL
new DeprecationDelta("mapred.cluster.average.blacklist.threshold",
JTConfig.JT_AVG_BLACKLIST_THRESHOLD),
new DeprecationDelta("hadoop.job.history.location",
JTConfig.JT_JOBHISTORY_LOCATION),
new DeprecationDelta(
"mapred.job.tracker.history.completed.location",
JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION),
new DeprecationDelta("mapred.jobtracker.job.history.block.size",
JTConfig.JT_JOBHISTORY_BLOCK_SIZE),
new DeprecationDelta("mapred.job.tracker.jobhistory.lru.cache.size",
JTConfig.JT_JOBHISTORY_CACHE_SIZE),
new DeprecationDelta("mapred.hosts",
JTConfig.JT_HOSTS_FILENAME),
new DeprecationDelta("mapred.hosts.exclude",
JTConfig.JT_HOSTS_EXCLUDE_FILENAME),
new DeprecationDelta("mapred.system.dir",
JTConfig.JT_SYSTEM_DIR),
new DeprecationDelta("mapred.max.tracker.blacklists",
JTConfig.JT_MAX_TRACKER_BLACKLISTS),
new DeprecationDelta("mapred.job.tracker",
JTConfig.JT_IPC_ADDRESS),
new DeprecationDelta("mapred.job.tracker.http.address",
JTConfig.JT_HTTP_ADDRESS),
new DeprecationDelta("mapred.job.tracker.handler.count",
JTConfig.JT_IPC_HANDLER_COUNT),
new DeprecationDelta("mapred.jobtracker.restart.recover",
JTConfig.JT_RESTART_ENABLED),
new DeprecationDelta("mapred.jobtracker.taskScheduler",
JTConfig.JT_TASK_SCHEDULER),
new DeprecationDelta(
"mapred.jobtracker.taskScheduler.maxRunningTasksPerJob",
JTConfig.JT_RUNNINGTASKS_PER_JOB),
new DeprecationDelta("mapred.jobtracker.instrumentation",
JTConfig.JT_INSTRUMENTATION),
new DeprecationDelta("mapred.jobtracker.maxtasks.per.job",
JTConfig.JT_TASKS_PER_JOB),
new DeprecationDelta("mapred.heartbeats.in.second",
JTConfig.JT_HEARTBEATS_IN_SECOND),
new DeprecationDelta("mapred.job.tracker.persist.jobstatus.active",
JTConfig.JT_PERSIST_JOBSTATUS),
new DeprecationDelta("mapred.job.tracker.persist.jobstatus.hours",
JTConfig.JT_PERSIST_JOBSTATUS_HOURS),
new DeprecationDelta("mapred.job.tracker.persist.jobstatus.dir",
JTConfig.JT_PERSIST_JOBSTATUS_DIR),
new DeprecationDelta("mapred.permissions.supergroup",
MRConfig.MR_SUPERGROUP),
new DeprecationDelta("mapreduce.jobtracker.permissions.supergroup",
MRConfig.MR_SUPERGROUP),
new DeprecationDelta("mapred.task.cache.levels",
JTConfig.JT_TASKCACHE_LEVELS),
new DeprecationDelta("mapred.jobtracker.taskalloc.capacitypad",
JTConfig.JT_TASK_ALLOC_PAD_FRACTION),
new DeprecationDelta("mapred.jobinit.threads",
JTConfig.JT_JOBINIT_THREADS),
new DeprecationDelta("mapred.tasktracker.expiry.interval",
JTConfig.JT_TRACKER_EXPIRY_INTERVAL),
new DeprecationDelta("mapred.job.tracker.retiredjobs.cache.size",
JTConfig.JT_RETIREJOB_CACHE_SIZE),
new DeprecationDelta("mapred.job.tracker.retire.jobs",
JTConfig.JT_RETIREJOBS),
new DeprecationDelta("mapred.healthChecker.interval",
TTConfig.TT_HEALTH_CHECKER_INTERVAL),
new DeprecationDelta("mapred.healthChecker.script.args",
TTConfig.TT_HEALTH_CHECKER_SCRIPT_ARGS),
new DeprecationDelta("mapred.healthChecker.script.path",
TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH),
new DeprecationDelta("mapred.healthChecker.script.timeout",
TTConfig.TT_HEALTH_CHECKER_SCRIPT_TIMEOUT),
new DeprecationDelta("mapred.local.dir.minspacekill",
TTConfig.TT_LOCAL_DIR_MINSPACE_KILL),
new DeprecationDelta("mapred.local.dir.minspacestart",
TTConfig.TT_LOCAL_DIR_MINSPACE_START),
new DeprecationDelta("mapred.task.tracker.http.address",
TTConfig.TT_HTTP_ADDRESS),
new DeprecationDelta("mapred.task.tracker.report.address",
TTConfig.TT_REPORT_ADDRESS),
new DeprecationDelta("mapred.task.tracker.task-controller",
TTConfig.TT_TASK_CONTROLLER),
new DeprecationDelta("mapred.tasktracker.dns.interface",
TTConfig.TT_DNS_INTERFACE),
new DeprecationDelta("mapred.tasktracker.dns.nameserver",
TTConfig.TT_DNS_NAMESERVER),
new DeprecationDelta("mapred.tasktracker.events.batchsize",
TTConfig.TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL),
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
TTConfig.TT_INDEX_CACHE),
new DeprecationDelta("mapred.tasktracker.instrumentation",
TTConfig.TT_INSTRUMENTATION),
new DeprecationDelta("mapred.tasktracker.map.tasks.maximum",
TTConfig.TT_MAP_SLOTS),
new DeprecationDelta("mapred.tasktracker.memory_calculator_plugin",
TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN),
new DeprecationDelta("mapred.tasktracker.memorycalculatorplugin",
TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN),
new DeprecationDelta("mapred.tasktracker.reduce.tasks.maximum",
TTConfig.TT_REDUCE_SLOTS),
new DeprecationDelta(
"mapred.tasktracker.taskmemorymanager.monitoring-interval",
TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL),
new DeprecationDelta(
"mapred.tasktracker.tasks.sleeptime-before-sigkill",
TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL),
new DeprecationDelta("slave.host.name",
TTConfig.TT_HOST_NAME),
new DeprecationDelta("tasktracker.http.threads",
TTConfig.TT_HTTP_THREADS),
new DeprecationDelta("hadoop.net.static.resolutions",
TTConfig.TT_STATIC_RESOLUTIONS),
new DeprecationDelta("local.cache.size",
TTConfig.TT_LOCAL_CACHE_SIZE),
new DeprecationDelta("tasktracker.contention.tracking",
TTConfig.TT_CONTENTION_TRACKING),
new DeprecationDelta("job.end.notification.url",
MRJobConfig.MR_JOB_END_NOTIFICATION_URL),
new DeprecationDelta("job.end.retry.attempts",
MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS),
new DeprecationDelta("job.end.retry.interval",
MRJobConfig.MR_JOB_END_RETRY_INTERVAL),
new DeprecationDelta("mapred.committer.job.setup.cleanup.needed",
MRJobConfig.SETUP_CLEANUP_NEEDED),
new DeprecationDelta("mapred.jar",
MRJobConfig.JAR),
new DeprecationDelta("mapred.job.id",
MRJobConfig.ID),
new DeprecationDelta("mapred.job.name",
MRJobConfig.JOB_NAME),
new DeprecationDelta("mapred.job.priority",
MRJobConfig.PRIORITY),
new DeprecationDelta("mapred.job.queue.name",
MRJobConfig.QUEUE_NAME),
new DeprecationDelta("mapred.job.reuse.jvm.num.tasks",
MRJobConfig.JVM_NUMTASKS_TORUN),
new DeprecationDelta("mapred.map.tasks",
MRJobConfig.NUM_MAPS),
new DeprecationDelta("mapred.max.tracker.failures",
MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER),
new DeprecationDelta("mapred.reduce.slowstart.completed.maps",
MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART),
new DeprecationDelta("mapred.reduce.tasks",
MRJobConfig.NUM_REDUCES),
new DeprecationDelta("mapred.skip.on",
MRJobConfig.SKIP_RECORDS),
new DeprecationDelta("mapred.skip.out.dir",
MRJobConfig.SKIP_OUTDIR),
new DeprecationDelta("mapred.speculative.execution.slowNodeThreshold",
MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD),
new DeprecationDelta("mapred.speculative.execution.slowTaskThreshold",
MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD),
new DeprecationDelta("mapred.speculative.execution.speculativeCap",
MRJobConfig.SPECULATIVECAP),
new DeprecationDelta("job.local.dir",
MRJobConfig.JOB_LOCAL_DIR),
new DeprecationDelta("mapreduce.inputformat.class",
MRJobConfig.INPUT_FORMAT_CLASS_ATTR),
new DeprecationDelta("mapreduce.map.class",
MRJobConfig.MAP_CLASS_ATTR),
new DeprecationDelta("mapreduce.combine.class",
MRJobConfig.COMBINE_CLASS_ATTR),
new DeprecationDelta("mapreduce.reduce.class",
MRJobConfig.REDUCE_CLASS_ATTR),
new DeprecationDelta("mapreduce.outputformat.class",
MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR),
new DeprecationDelta("mapreduce.partitioner.class",
MRJobConfig.PARTITIONER_CLASS_ATTR),
new DeprecationDelta("mapred.job.classpath.archives",
MRJobConfig.CLASSPATH_ARCHIVES),
new DeprecationDelta("mapred.job.classpath.files",
MRJobConfig.CLASSPATH_FILES),
new DeprecationDelta("mapred.cache.files",
MRJobConfig.CACHE_FILES),
new DeprecationDelta("mapred.cache.archives",
MRJobConfig.CACHE_ARCHIVES),
new DeprecationDelta("mapred.cache.localFiles",
MRJobConfig.CACHE_LOCALFILES),
new DeprecationDelta("mapred.cache.localArchives",
MRJobConfig.CACHE_LOCALARCHIVES),
new DeprecationDelta("mapred.cache.files.filesizes",
MRJobConfig.CACHE_FILES_SIZES),
new DeprecationDelta("mapred.cache.archives.filesizes",
MRJobConfig.CACHE_ARCHIVES_SIZES),
new DeprecationDelta("mapred.cache.files.timestamps",
MRJobConfig.CACHE_FILE_TIMESTAMPS),
new DeprecationDelta("mapred.cache.archives.timestamps",
MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS),
new DeprecationDelta("mapred.working.dir",
MRJobConfig.WORKING_DIR),
new DeprecationDelta("user.name",
MRJobConfig.USER_NAME),
new DeprecationDelta("mapred.output.key.class",
MRJobConfig.OUTPUT_KEY_CLASS),
new DeprecationDelta("mapred.output.value.class",
MRJobConfig.OUTPUT_VALUE_CLASS),
new DeprecationDelta("mapred.output.value.groupfn.class",
MRJobConfig.GROUP_COMPARATOR_CLASS),
new DeprecationDelta("mapred.output.key.comparator.class",
MRJobConfig.KEY_COMPARATOR),
new DeprecationDelta("io.sort.factor",
MRJobConfig.IO_SORT_FACTOR),
new DeprecationDelta("io.sort.mb",
MRJobConfig.IO_SORT_MB),
new DeprecationDelta("keep.failed.task.files",
MRJobConfig.PRESERVE_FAILED_TASK_FILES),
new DeprecationDelta("keep.task.files.pattern",
MRJobConfig.PRESERVE_FILES_PATTERN),
new DeprecationDelta("mapred.child.tmp",
MRJobConfig.TASK_TEMP_DIR),
new DeprecationDelta("mapred.debug.out.lines",
MRJobConfig.TASK_DEBUGOUT_LINES),
new DeprecationDelta("mapred.merge.recordsBeforeProgress",
MRJobConfig.RECORDS_BEFORE_PROGRESS),
new DeprecationDelta("mapred.skip.attempts.to.start.skipping",
MRJobConfig.SKIP_START_ATTEMPTS),
new DeprecationDelta("mapred.task.id",
MRJobConfig.TASK_ATTEMPT_ID),
new DeprecationDelta("mapred.task.is.map",
MRJobConfig.TASK_ISMAP),
new DeprecationDelta("mapred.task.partition",
MRJobConfig.TASK_PARTITION),
new DeprecationDelta("mapred.task.profile",
MRJobConfig.TASK_PROFILE),
new DeprecationDelta("mapred.task.profile.maps",
MRJobConfig.NUM_MAP_PROFILES),
new DeprecationDelta("mapred.task.profile.reduces",
MRJobConfig.NUM_REDUCE_PROFILES),
new DeprecationDelta("mapred.task.timeout",
MRJobConfig.TASK_TIMEOUT),
new DeprecationDelta("mapred.tip.id",
MRJobConfig.TASK_ID),
new DeprecationDelta("mapred.work.output.dir",
MRJobConfig.TASK_OUTPUT_DIR),
new DeprecationDelta("mapred.userlog.limit.kb",
MRJobConfig.TASK_USERLOG_LIMIT),
new DeprecationDelta("mapred.userlog.retain.hours",
MRJobConfig.USER_LOG_RETAIN_HOURS),
new DeprecationDelta("mapred.task.profile.params",
MRJobConfig.TASK_PROFILE_PARAMS),
new DeprecationDelta("io.sort.spill.percent",
MRJobConfig.MAP_SORT_SPILL_PERCENT),
new DeprecationDelta("map.input.file",
MRJobConfig.MAP_INPUT_FILE),
new DeprecationDelta("map.input.length",
MRJobConfig.MAP_INPUT_PATH),
new DeprecationDelta("map.input.start",
MRJobConfig.MAP_INPUT_START),
new DeprecationDelta("mapred.job.map.memory.mb",
MRJobConfig.MAP_MEMORY_MB),
new DeprecationDelta("mapred.map.child.env",
MRJobConfig.MAP_ENV),
new DeprecationDelta("mapred.map.child.java.opts",
MRJobConfig.MAP_JAVA_OPTS),
new DeprecationDelta("mapred.map.max.attempts",
MRJobConfig.MAP_MAX_ATTEMPTS),
new DeprecationDelta("mapred.map.task.debug.script",
MRJobConfig.MAP_DEBUG_SCRIPT),
new DeprecationDelta("mapred.map.tasks.speculative.execution",
MRJobConfig.MAP_SPECULATIVE),
new DeprecationDelta("mapred.max.map.failures.percent",
MRJobConfig.MAP_FAILURES_MAX_PERCENT),
new DeprecationDelta("mapred.skip.map.auto.incr.proc.count",
MRJobConfig.MAP_SKIP_INCR_PROC_COUNT),
new DeprecationDelta("mapred.skip.map.max.skip.records",
MRJobConfig.MAP_SKIP_MAX_RECORDS),
new DeprecationDelta("min.num.spills.for.combine",
MRJobConfig.MAP_COMBINE_MIN_SPILLS),
new DeprecationDelta("mapred.compress.map.output",
MRJobConfig.MAP_OUTPUT_COMPRESS),
new DeprecationDelta("mapred.map.output.compression.codec",
MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC),
new DeprecationDelta("mapred.mapoutput.key.class",
MRJobConfig.MAP_OUTPUT_KEY_CLASS),
new DeprecationDelta("mapred.mapoutput.value.class",
MRJobConfig.MAP_OUTPUT_VALUE_CLASS),
new DeprecationDelta("map.output.key.field.separator",
MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR),
new DeprecationDelta("mapred.map.child.log.level",
MRJobConfig.MAP_LOG_LEVEL),
new DeprecationDelta("mapred.inmem.merge.threshold",
MRJobConfig.REDUCE_MERGE_INMEM_THRESHOLD),
new DeprecationDelta("mapred.job.reduce.input.buffer.percent",
MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT),
new DeprecationDelta("mapred.job.reduce.markreset.buffer.percent",
MRJobConfig.REDUCE_MARKRESET_BUFFER_PERCENT),
new DeprecationDelta("mapred.job.reduce.memory.mb",
MRJobConfig.REDUCE_MEMORY_MB),
new DeprecationDelta("mapred.job.reduce.total.mem.bytes",
MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES),
new DeprecationDelta("mapred.job.shuffle.input.buffer.percent",
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT),
new DeprecationDelta("mapred.job.shuffle.merge.percent",
MRJobConfig.SHUFFLE_MERGE_PERCENT),
new DeprecationDelta("mapred.max.reduce.failures.percent",
MRJobConfig.REDUCE_FAILURES_MAXPERCENT),
new DeprecationDelta("mapred.reduce.child.env",
MRJobConfig.REDUCE_ENV),
new DeprecationDelta("mapred.reduce.child.java.opts",
MRJobConfig.REDUCE_JAVA_OPTS),
new DeprecationDelta("mapred.reduce.max.attempts",
MRJobConfig.REDUCE_MAX_ATTEMPTS),
new DeprecationDelta("mapred.reduce.parallel.copies",
MRJobConfig.SHUFFLE_PARALLEL_COPIES),
new DeprecationDelta("mapred.reduce.task.debug.script",
MRJobConfig.REDUCE_DEBUG_SCRIPT),
new DeprecationDelta("mapred.reduce.tasks.speculative.execution",
MRJobConfig.REDUCE_SPECULATIVE),
new DeprecationDelta("mapred.shuffle.connect.timeout",
MRJobConfig.SHUFFLE_CONNECT_TIMEOUT),
new DeprecationDelta("mapred.shuffle.read.timeout",
MRJobConfig.SHUFFLE_READ_TIMEOUT),
new DeprecationDelta("mapred.skip.reduce.auto.incr.proc.count",
MRJobConfig.REDUCE_SKIP_INCR_PROC_COUNT),
new DeprecationDelta("mapred.skip.reduce.max.skip.groups",
MRJobConfig.REDUCE_SKIP_MAXGROUPS),
new DeprecationDelta("mapred.reduce.child.log.level",
MRJobConfig.REDUCE_LOG_LEVEL),
new DeprecationDelta("mapreduce.job.counters.limit",
MRJobConfig.COUNTERS_MAX_KEY),
new DeprecationDelta("jobclient.completion.poll.interval",
Job.COMPLETION_POLL_INTERVAL_KEY),
new DeprecationDelta("jobclient.progress.monitor.poll.interval",
Job.PROGRESS_MONITOR_POLL_INTERVAL_KEY),
new DeprecationDelta("jobclient.output.filter",
Job.OUTPUT_FILTER),
new DeprecationDelta("mapred.submit.replication",
Job.SUBMIT_REPLICATION),
new DeprecationDelta("mapred.used.genericoptionsparser",
Job.USED_GENERIC_PARSER),
new DeprecationDelta("mapred.input.dir",
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR),
new DeprecationDelta("mapred.input.pathFilter.class",
org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.PATHFILTER_CLASS),
new DeprecationDelta("mapred.max.split.size",
org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MAXSIZE),
new DeprecationDelta("mapred.min.split.size",
org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE),
new DeprecationDelta("mapred.output.compress",
org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.COMPRESS),
new DeprecationDelta("mapred.output.compression.codec",
org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.COMPRESS_CODEC),
new DeprecationDelta("mapred.output.compression.type",
org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.COMPRESS_TYPE),
new DeprecationDelta("mapred.output.dir",
org.apache.hadoop.mapreduce.lib.output.
FileOutputFormat.OUTDIR),
new DeprecationDelta("mapred.seqbinary.output.key.class",
org.apache.hadoop.mapreduce.lib.output.
SequenceFileAsBinaryOutputFormat.KEY_CLASS),
new DeprecationDelta("mapred.seqbinary.output.value.class",
org.apache.hadoop.mapreduce.lib.output.
SequenceFileAsBinaryOutputFormat.VALUE_CLASS),
new DeprecationDelta("sequencefile.filter.class",
org.apache.hadoop.mapreduce.lib.input.
SequenceFileInputFilter.FILTER_CLASS),
new DeprecationDelta("sequencefile.filter.regex",
org.apache.hadoop.mapreduce.lib.input.
SequenceFileInputFilter.FILTER_REGEX),
new DeprecationDelta("sequencefile.filter.frequency",
org.apache.hadoop.mapreduce.lib.input.
SequenceFileInputFilter.FILTER_FREQUENCY),
new DeprecationDelta("mapred.input.dir.mappers",
org.apache.hadoop.mapreduce.lib.input.
MultipleInputs.DIR_MAPPERS),
new DeprecationDelta("mapred.input.dir.formats",
org.apache.hadoop.mapreduce.lib.input.
MultipleInputs.DIR_FORMATS),
new DeprecationDelta("mapred.line.input.format.linespermap",
org.apache.hadoop.mapreduce.lib.input.
NLineInputFormat.LINES_PER_MAP),
new DeprecationDelta("mapred.binary.partitioner.left.offset",
org.apache.hadoop.mapreduce.lib.partition.
BinaryPartitioner.LEFT_OFFSET_PROPERTY_NAME),
new DeprecationDelta("mapred.binary.partitioner.right.offset",
org.apache.hadoop.mapreduce.lib.partition.
BinaryPartitioner.RIGHT_OFFSET_PROPERTY_NAME),
new DeprecationDelta("mapred.text.key.comparator.options",
org.apache.hadoop.mapreduce.lib.partition.
KeyFieldBasedComparator.COMPARATOR_OPTIONS),
new DeprecationDelta("mapred.text.key.partitioner.options",
org.apache.hadoop.mapreduce.lib.partition.
KeyFieldBasedPartitioner.PARTITIONER_OPTIONS),
new DeprecationDelta("mapred.mapper.regex.group",
org.apache.hadoop.mapreduce.lib.map.RegexMapper.GROUP),
new DeprecationDelta("mapred.mapper.regex",
org.apache.hadoop.mapreduce.lib.map.RegexMapper.PATTERN),
new DeprecationDelta("create.empty.dir.if.nonexist",
org.apache.hadoop.mapreduce.lib.jobcontrol.
ControlledJob.CREATE_DIR),
new DeprecationDelta("mapred.data.field.separator",
org.apache.hadoop.mapreduce.lib.fieldsel.
FieldSelectionHelper.DATA_FIELD_SEPERATOR),
new DeprecationDelta("map.output.key.value.fields.spec",
org.apache.hadoop.mapreduce.lib.fieldsel.
FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC),
new DeprecationDelta("reduce.output.key.value.fields.spec",
org.apache.hadoop.mapreduce.lib.fieldsel.
FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC),
new DeprecationDelta("mapred.min.split.size.per.node",
org.apache.hadoop.mapreduce.lib.input.
CombineFileInputFormat.SPLIT_MINSIZE_PERNODE),
new DeprecationDelta("mapred.min.split.size.per.rack",
org.apache.hadoop.mapreduce.lib.input.
CombineFileInputFormat.SPLIT_MINSIZE_PERRACK),
new DeprecationDelta("key.value.separator.in.input.line",
org.apache.hadoop.mapreduce.lib.input.
KeyValueLineRecordReader.KEY_VALUE_SEPERATOR),
new DeprecationDelta("mapred.linerecordreader.maxlength",
org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH),
new DeprecationDelta("mapred.lazy.output.format",
org.apache.hadoop.mapreduce.lib.output.
LazyOutputFormat.OUTPUT_FORMAT),
new DeprecationDelta("mapred.textoutputformat.separator",
org.apache.hadoop.mapreduce.lib.output.
TextOutputFormat.SEPERATOR),
new DeprecationDelta("mapred.join.expr",
org.apache.hadoop.mapreduce.lib.join.
CompositeInputFormat.JOIN_EXPR),
new DeprecationDelta("mapred.join.keycomparator",
org.apache.hadoop.mapreduce.lib.join.
CompositeInputFormat.JOIN_COMPARATOR),
new DeprecationDelta("hadoop.pipes.command-file.keep",
org.apache.hadoop.mapred.pipes.
Submitter.PRESERVE_COMMANDFILE),
new DeprecationDelta("hadoop.pipes.executable",
org.apache.hadoop.mapred.pipes.Submitter.EXECUTABLE),
new DeprecationDelta("hadoop.pipes.executable.interpretor",
org.apache.hadoop.mapred.pipes.Submitter.INTERPRETOR),
new DeprecationDelta("hadoop.pipes.java.mapper",
org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_MAP),
new DeprecationDelta("hadoop.pipes.java.recordreader",
org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_RR),
new DeprecationDelta("hadoop.pipes.java.recordwriter",
org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_RW),
new DeprecationDelta("hadoop.pipes.java.reducer",
org.apache.hadoop.mapred.pipes.Submitter.IS_JAVA_REDUCE),
new DeprecationDelta("hadoop.pipes.partitioner",
org.apache.hadoop.mapred.pipes.Submitter.PARTITIONER),
new DeprecationDelta("mapred.pipes.user.inputformat",
org.apache.hadoop.mapred.pipes.Submitter.INPUT_FORMAT),
new DeprecationDelta("webinterface.private.actions",
JTConfig.PRIVATE_ACTIONS_KEY),
new DeprecationDelta("security.task.umbilical.protocol.acl",
MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL),
new DeprecationDelta("security.job.submission.protocol.acl",
MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT),
new DeprecationDelta("mapreduce.user.classpath.first",
MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST ),
new DeprecationDelta(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
MRJobConfig.SPLIT_METAINFO_MAXSIZE),
new DeprecationDelta("mapred.input.dir.recursive",
FileInputFormat.INPUT_DIR_RECURSIVE)
});
Configuration.addDeprecation("security.job.submission.protocol.acl",
new String[] {
MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT
});
Configuration.addDeprecation("mapreduce.user.classpath.first",
MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
MRJobConfig.SPLIT_METAINFO_MAXSIZE);
Configuration.addDeprecation("mapred.input.dir.recursive",
FileInputFormat.INPUT_DIR_RECURSIVE);
}
public static void main(String[] args) {

View File

@ -67,7 +67,7 @@ public class TestMRAMWithNonNormalizedCapabilities {
}
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(getClass().getName());
mrCluster = new MiniMRYarnCluster(getClass().getSimpleName());
mrCluster.init(new Configuration());
mrCluster.start();
}

View File

@ -69,6 +69,14 @@ public class FadvisedFileRegion extends DefaultFileRegion {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
super.releaseExternalResources();
}
/**
* Call when the transfer completes successfully so we can advise the OS that
* we don't need the region to be cached anymore.
*/
public void transferSuccessful() {
if (manageOsCache && getCount() > 0) {
try {
NativeIO.POSIX.posixFadviseIfPossible(identifier,
@ -78,6 +86,5 @@ public class FadvisedFileRegion extends DefaultFileRegion {
LOG.warn("Failed to manage OS cache for " + identifier, t);
}
}
super.releaseExternalResources();
}
}

View File

@ -625,6 +625,9 @@ public class ShuffleHandler extends AuxiliaryService {
// attribute to appropriate spill output
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
partition.transferSuccessful();
}
partition.releaseExternalResources();
}
});

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.DeprecationDelta;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@ -71,11 +72,12 @@ public class Logalyzer {
"logalizer.logcomparator.column.separator";
static {
Configuration.addDeprecation("mapred.reducer.sort",
new String[] {SORT_COLUMNS});
Configuration.addDeprecation("mapred.reducer.separator",
new String[] {COLUMN_SEPARATOR});
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("mapred.reducer.sort", SORT_COLUMNS),
new DeprecationDelta("mapred.reducer.separator", COLUMN_SEPARATOR)
});
}
/** A {@link Mapper} that extracts text matching a regular expression. */
public static class LogRegexMapper<K extends WritableComparable>
extends MapReduceBase

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.DeprecationDelta;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
@ -118,10 +119,12 @@ class DistributedCacheEmulator {
{
// Need to handle deprecation of these MapReduce-internal configuration
// properties as MapReduce doesn't handle their deprecation.
Configuration.addDeprecation("mapred.cache.files.filesizes",
new String[] {MRJobConfig.CACHE_FILES_SIZES});
Configuration.addDeprecation("mapred.cache.files.visibilities",
new String[] {MRJobConfig.CACHE_FILE_VISIBILITIES});
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("mapred.cache.files.filesizes",
MRJobConfig.CACHE_FILES_SIZES),
new DeprecationDelta("mapred.cache.files.visibilities",
MRJobConfig.CACHE_FILE_VISIBILITIES)
});
}
/**

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -860,5 +861,11 @@ public class ResourceSchedulerWrapper implements ResourceScheduler,
QueueACL acl, String queueName) {
return scheduler.checkAccess(callerUGI, acl, queueName);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
return scheduler.getAppResourceUsageReport(appAttemptId);
}
}

View File

@ -77,6 +77,11 @@ Release 2.3.0 - UNRELEASED
applications so that clients can get information about them post RM-restart.
(Jian He via vinodkv)
YARN-1290. Let continuous scheduling achieve more balanced task assignment
(Wei Yan via Sandy Ryza)
YARN-786. Expose application resource usage in RM REST API (Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
@ -178,6 +183,9 @@ Release 2.2.1 - UNRELEASED
YARN-1343. NodeManagers additions/restarts are not reported as node updates
in AllocateResponse responses to AMs. (tucu)
YARN-1381. Same relaxLocality appears twice in exception message of
AMRMClientImpl#checkLocalityRelaxationConflict() (Ted Yu via Sandy Ryza)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -499,13 +499,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
for (String location : locations) {
TreeMap<Resource, ResourceRequestInfo> reqs =
remoteRequests.get(location);
if (reqs != null && !reqs.isEmpty()
&& reqs.values().iterator().next().remoteRequest.getRelaxLocality()
!= relaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location " + location
+ " with locality relaxation " + relaxLocality + " when it has "
+ "already been requested with locality relaxation " + relaxLocality);
if (reqs != null && !reqs.isEmpty()) {
boolean existingRelaxLocality =
reqs.values().iterator().next().remoteRequest.getRelaxLocality();
if (relaxLocality != existingRelaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location " + location
+ " with locality relaxation " + relaxLocality + " when it has "
+ "already been requested with locality relaxation " + existingRelaxLocality);
}
}
}
}

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@ -54,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -84,9 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@ -98,7 +94,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
@ -673,38 +668,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
this.readLock.lock();
try {
int numUsedContainers = 0;
int numReservedContainers = 0;
Resource currentConsumption = Resources.createResource(0, 0);
Resource reservedResources = Resources.createResource(0, 0);
SchedulerAppReport schedApp =
scheduler.getSchedulerAppInfo(this.getAppAttemptId());
Collection<RMContainer> liveContainers;
Collection<RMContainer> reservedContainers;
if (schedApp != null) {
liveContainers = schedApp.getLiveContainers();
reservedContainers = schedApp.getReservedContainers();
if (liveContainers != null) {
numUsedContainers = liveContainers.size();
for (RMContainer lc : liveContainers) {
Resources.addTo(currentConsumption, lc.getContainer().getResource());
}
}
if (reservedContainers != null) {
numReservedContainers = reservedContainers.size();
for (RMContainer rc : reservedContainers) {
Resources.addTo(reservedResources, rc.getContainer().getResource());
}
}
}
return BuilderUtils.newApplicationResourceUsageReport(
numUsedContainers, numReservedContainers,
currentConsumption, reservedResources,
Resources.add(currentConsumption, reservedResources));
return scheduler.getAppResourceUsageReport(this.getAppAttemptId());
} finally {
this.readLock.unlock();
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -397,5 +398,12 @@ public abstract class SchedulerApplication {
lastScheduledContainer.put(priority, currentTimeMs);
schedulingOpportunities.setCount(priority, 0);
}
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), Resources.clone(currentConsumption),
Resources.clone(currentReservation),
Resources.add(currentConsumption, currentReservation));
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -129,6 +130,16 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
@Stable
SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);
/**
* Get a resource usage report from a given app attempt ID.
* @param appAttemptId the id of the application attempt
* @return resource usage report for this given attempt
*/
@LimitedPrivate("yarn")
@Evolving
ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId);
/**
* Get the root queue for the scheduler.
* @return the root queue for the scheduler.

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -856,6 +857,13 @@ public class CapacityScheduler
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@Lock(Lock.NoLock.class)
FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -181,6 +182,8 @@ public class FairScheduler implements ResourceScheduler {
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
private Comparator nodeAvailableResourceComparator =
new NodeAvailableResourceComparator(); // Node available resource comparator
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
protected long nodeLocalityDelayMs; // Delay for node locality
@ -948,14 +951,22 @@ public class FairScheduler implements ResourceScheduler {
private void continuousScheduling() {
while (true) {
for (FSSchedulerNode node : nodes.values()) {
try {
if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
attemptScheduling(node);
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
if (nodes.containsKey(nodeId)) {
FSSchedulerNode node = nodes.get(nodeId);
try {
if (Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.warn("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex);
}
} catch (Throwable ex) {
LOG.warn("Error while attempting scheduling for node " + node + ": " +
ex.toString(), ex);
}
}
try {
@ -966,6 +977,17 @@ public class FairScheduler implements ResourceScheduler {
}
}
}
/** Sort nodes by available resource */
private class NodeAvailableResourceComparator implements Comparator<NodeId> {
@Override
public int compare(NodeId n1, NodeId n2) {
return RESOURCE_CALCULATOR.compare(clusterCapacity,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
}
}
private synchronized void attemptScheduling(FSSchedulerNode node) {
// Assign new containers...
@ -1030,6 +1052,17 @@ public class FairScheduler implements ResourceScheduler {
return new SchedulerAppReport(applications.get(appAttemptId));
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
FSSchedulerApp app = applications.get(appAttemptId);
if (app == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
return app.getResourceUsageReport();
}
/**
* Subqueue metrics might be a little out of date because fair shares are
* recalculated at the update interval, but the root queue metrics needs to

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -327,6 +328,13 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}

View File

@ -26,8 +26,10 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -71,6 +73,9 @@ public class AppInfo {
protected long elapsedTime;
protected String amContainerLogs;
protected String amHostHttpAddress;
protected int allocatedMB;
protected int allocatedVCores;
protected int runningContainers;
public AppInfo() {
} // JAXB needs this
@ -132,6 +137,15 @@ public class AppInfo {
this.amContainerLogs = url;
this.amHostHttpAddress = masterContainer.getNodeHttpAddress();
}
ApplicationResourceUsageReport resourceReport = attempt
.getApplicationResourceUsageReport();
if (resourceReport != null) {
Resource usedResources = resourceReport.getUsedResources();
allocatedMB = usedResources.getMemory();
allocatedVCores = usedResources.getVirtualCores();
runningContainers = resourceReport.getNumUsedContainers();
}
}
}
}
@ -224,5 +238,17 @@ public class AppInfo {
public String getApplicationType() {
return this.applicationType;
}
public int getRunningContainers() {
return this.runningContainers;
}
public int getAllocatedMB() {
return this.allocatedMB;
}
public int getAllocatedVCores() {
return this.allocatedVCores;
}
}

View File

@ -33,6 +33,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -2348,7 +2352,7 @@ public class TestFairScheduler {
fs.applications, FSSchedulerApp.class);
}
@Test (timeout = 5000)
@Test (timeout = 10000)
public void testContinuousScheduling() throws Exception {
// set continuous scheduling enabled
FairScheduler fs = new FairScheduler();
@ -2359,16 +2363,21 @@ public class TestFairScheduler {
Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled());
// Add one node
// Add two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
fs.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
fs.handle(nodeEvent2);
// available resource
Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
// send application request
ApplicationAttemptId appAttemptId =
@ -2387,10 +2396,32 @@ public class TestFairScheduler {
FSSchedulerApp app = fs.applications.get(appAttemptId);
// Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { }
// check consumption
Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
// another request
request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
ask.clear();
ask.add(request);
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
// Wait until app gets resources
while (app.getCurrentConsumption()
.equals(Resources.createResource(1024, 1))) { }
Assert.assertEquals(2048, app.getCurrentConsumption().getMemory());
Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
// 2 containers should be assigned to 2 nodes
Set<NodeId> nodes = new HashSet<NodeId>();
Iterator<RMContainer> it = app.getLiveContainers().iterator();
while (it.hasNext()) {
nodes.add(it.next().getContainer().getNodeId());
}
Assert.assertEquals(2, nodes.size());
}

View File

@ -78,6 +78,8 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
public class TestRMWebServicesApps extends JerseyTest {
private static MockRM rm;
private static final int CONTAINER_MB = 1024;
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
@ -126,7 +128,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testApps() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testAppsHelper("apps", app1, MediaType.APPLICATION_JSON);
rm.stop();
@ -136,7 +138,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsSlash() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testAppsHelper("apps/", app1, MediaType.APPLICATION_JSON);
rm.stop();
@ -146,7 +148,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsDefault() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testAppsHelper("apps/", app1, "");
rm.stop();
@ -156,7 +158,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsXML() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
@ -181,7 +183,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsXMLMulti() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024, "testwordcount", "user1");
rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
rm.submitApp(2048, "testwordcount2", "user1");
amNodeManager.nodeHeartbeat(true);
@ -225,7 +227,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryState() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -248,8 +250,8 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStates() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
RMApp killedApp = rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
RMApp killedApp = rm.submitApp(CONTAINER_MB);
rm.killApp(killedApp.getApplicationId());
amNodeManager.nodeHeartbeat(true);
@ -297,8 +299,8 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStatesComma() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
RMApp killedApp = rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
RMApp killedApp = rm.submitApp(CONTAINER_MB);
rm.killApp(killedApp.getApplicationId());
amNodeManager.nodeHeartbeat(true);
@ -346,7 +348,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStatesNone() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -365,7 +367,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStateNone() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -384,7 +386,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStatesInvalid() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -421,7 +423,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStateInvalid() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -458,7 +460,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryFinalStatus() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -481,7 +483,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryFinalStatusNone() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -499,7 +501,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryFinalStatusInvalid() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -537,8 +539,8 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryUser() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -565,8 +567,8 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryQueue() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -588,9 +590,9 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryLimit() throws JSONException, Exception {
rm.start();
rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps").queryParam("limit", "2")
@ -611,9 +613,9 @@ public class TestRMWebServicesApps extends JerseyTest {
long start = System.currentTimeMillis();
Thread.sleep(1);
rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps").queryParam("startedTimeBegin", String.valueOf(start))
@ -632,11 +634,11 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryStartBeginSome() throws JSONException, Exception {
rm.start();
rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
long start = System.currentTimeMillis();
Thread.sleep(1);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps").queryParam("startedTimeBegin", String.valueOf(start))
@ -657,9 +659,9 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.registerNode("127.0.0.1:1234", 2048);
long end = System.currentTimeMillis();
Thread.sleep(1);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps").queryParam("startedTimeEnd", String.valueOf(end))
@ -677,11 +679,11 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.registerNode("127.0.0.1:1234", 2048);
long start = System.currentTimeMillis();
Thread.sleep(1);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
long end = System.currentTimeMillis();
Thread.sleep(1);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps").queryParam("startedTimeBegin", String.valueOf(start))
@ -703,7 +705,7 @@ public class TestRMWebServicesApps extends JerseyTest {
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
long start = System.currentTimeMillis();
Thread.sleep(1);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
@ -712,8 +714,8 @@ public class TestRMWebServicesApps extends JerseyTest {
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
@ -733,7 +735,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppsQueryFinishEnd() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
@ -743,8 +745,8 @@ public class TestRMWebServicesApps extends JerseyTest {
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
long end = System.currentTimeMillis();
WebResource r = resource();
@ -767,7 +769,7 @@ public class TestRMWebServicesApps extends JerseyTest {
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
long start = System.currentTimeMillis();
Thread.sleep(1);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
@ -777,8 +779,8 @@ public class TestRMWebServicesApps extends JerseyTest {
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
rm.submitApp(1024);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
long end = System.currentTimeMillis();
WebResource r = resource();
@ -801,7 +803,7 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
Thread.sleep(1);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
@ -811,9 +813,9 @@ public class TestRMWebServicesApps extends JerseyTest {
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
rm.submitApp(1024, "", UserGroupInformation.getCurrentUser()
rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "MAPREDUCE");
rm.submitApp(1024, "", UserGroupInformation.getCurrentUser()
rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "NON-YARN");
WebResource r = resource();
@ -987,7 +989,7 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 4096);
Thread.sleep(1);
RMApp app1 = rm.submitApp(1024, "", UserGroupInformation.getCurrentUser()
RMApp app1 = rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "MAPREDUCE");
amNodeManager.nodeHeartbeat(true);
// finish App
@ -998,9 +1000,9 @@ public class TestRMWebServicesApps extends JerseyTest {
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
rm.submitApp(1024, "", UserGroupInformation.getCurrentUser()
rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "MAPREDUCE");
rm.submitApp(1024, "", UserGroupInformation.getCurrentUser()
rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "OTHER");
// zero type, zero state
@ -1148,7 +1150,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testSingleApp() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
testSingleAppsHelper(app1.getApplicationId().toString(), app1,
MediaType.APPLICATION_JSON);
@ -1159,7 +1161,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testSingleAppsSlash() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testSingleAppsHelper(app1.getApplicationId().toString() + "/", app1,
MediaType.APPLICATION_JSON);
@ -1170,7 +1172,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testSingleAppsDefault() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testSingleAppsHelper(app1.getApplicationId().toString() + "/", app1, "");
rm.stop();
@ -1180,7 +1182,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testInvalidApp() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -1216,7 +1218,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testNonexistApp() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024, "testwordcount", "user1");
rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -1265,7 +1267,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testSingleAppsXML() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
@ -1307,7 +1309,10 @@ public class TestRMWebServicesApps extends JerseyTest {
WebServicesTestUtils.getXmlLong(element, "finishedTime"),
WebServicesTestUtils.getXmlLong(element, "elapsedTime"),
WebServicesTestUtils.getXmlString(element, "amHostHttpAddress"),
WebServicesTestUtils.getXmlString(element, "amContainerLogs"));
WebServicesTestUtils.getXmlString(element, "amContainerLogs"),
WebServicesTestUtils.getXmlInt(element, "allocatedMB"),
WebServicesTestUtils.getXmlInt(element, "allocatedVCores"),
WebServicesTestUtils.getXmlInt(element, "runningContainers"));
}
}
@ -1315,7 +1320,7 @@ public class TestRMWebServicesApps extends JerseyTest {
Exception {
// 15 because trackingUrl not assigned yet
assertEquals("incorrect number of elements", 16, info.length());
assertEquals("incorrect number of elements", 19, info.length());
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
info.getString("name"), info.getString("applicationType"), info.getString("queue"),
@ -1324,14 +1329,16 @@ public class TestRMWebServicesApps extends JerseyTest {
info.getString("diagnostics"), info.getLong("clusterId"),
info.getLong("startedTime"), info.getLong("finishedTime"),
info.getLong("elapsedTime"), info.getString("amHostHttpAddress"),
info.getString("amContainerLogs"));
info.getString("amContainerLogs"), info.getInt("allocatedMB"),
info.getInt("allocatedVCores"), info.getInt("runningContainers"));
}
public void verifyAppInfoGeneric(RMApp app, String id, String user,
String name, String applicationType, String queue, String state, String finalStatus,
float progress, String trackingUI, String diagnostics, long clusterId,
long startedTime, long finishedTime, long elapsedTime,
String amHostHttpAddress, String amContainerLogs) throws JSONException,
String amHostHttpAddress, String amContainerLogs, int allocatedMB,
int allocatedVCores, int numContainers) throws JSONException,
Exception {
WebServicesTestUtils.checkStringMatch("id", app.getApplicationId()
@ -1363,13 +1370,16 @@ public class TestRMWebServicesApps extends JerseyTest {
amContainerLogs.startsWith("http://"));
assertTrue("amContainerLogs doesn't contain user info",
amContainerLogs.endsWith("/" + app.getUser()));
assertEquals("allocatedMB doesn't match", 1024, allocatedMB);
assertEquals("allocatedVCores doesn't match", 1, allocatedVCores);
assertEquals("numContainers doesn't match", 1, numContainers);
}
@Test
public void testAppAttempts() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
testAppAttemptsHelper(app1.getApplicationId().toString(), app1,
MediaType.APPLICATION_JSON);
@ -1380,7 +1390,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testMultipleAppAttempts() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
int maxAppAttempts = rm.getConfig().getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -1406,7 +1416,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppAttemptsSlash() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testAppAttemptsHelper(app1.getApplicationId().toString() + "/", app1,
MediaType.APPLICATION_JSON);
@ -1417,7 +1427,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testAppAttemtpsDefault() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
testAppAttemptsHelper(app1.getApplicationId().toString() + "/", app1, "");
rm.stop();
@ -1427,7 +1437,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testInvalidAppAttempts() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024);
rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -1463,7 +1473,7 @@ public class TestRMWebServicesApps extends JerseyTest {
public void testNonexistAppAttempts() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
rm.submitApp(1024, "testwordcount", "user1");
rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
@ -1526,7 +1536,7 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.start();
String user = "user1";
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", user);
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", user);
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")

View File

@ -1177,7 +1177,10 @@ ResourceManager REST API's.
"elapsedTime" : 25196,
"diagnostics" : "",
"trackingUrl" : "http://host.domain.com:8088/proxy/application_1326815542473_0001/jobhistory/job/job_1326815542473_1_1",
"queue" : "default"
"queue" : "default",
"allocatedMB" : 0,
"allocatedVCores" : 0,
"runningContainers" : 0
},
{
"finishedTime" : 1326815789546,
@ -1195,7 +1198,10 @@ ResourceManager REST API's.
"elapsedTime" : 148166,
"diagnostics" : "",
"trackingUrl" : "http://host.domain.com:8088/proxy/application_1326815542473_0002/jobhistory/job/job_1326815542473_2_2",
"queue" : "default"
"queue" : "default",
"allocatedMB" : 0,
"allocatedVCores" : 0,
"runningContainers" : 1
}
]
}
@ -1245,6 +1251,9 @@ ResourceManager REST API's.
<amContainerLogs>http://host.domain.com:8042/node/containerlogs/container_1326815542473_0001
_01_000001</amContainerLogs>
<amHostHttpAddress>host.domain.com:8042</amHostHttpAddress>
<allocatedMB>0</allocatedMB>
<allocatedVCores>0</allocatedVCores>
<runningContainers>0</runningContainers>
</app>
<app>
<id>application_1326815542473_0002</id>
@ -1264,6 +1273,9 @@ _01_000001</amContainerLogs>
<elapsedTime>148166</elapsedTime>
<amContainerLogs>http://host.domain.com:8042/node/containerlogs/container_1326815542473_0002_01_000001</amContainerLogs>
<amHostHttpAddress>host.domain.com:8042</amHostHttpAddress>
<allocatedMB>0</allocatedMB>
<allocatedVCores>0</allocatedVCores>
<runningContainers>0</runningContainers>
</app>
</apps>
@ -1457,6 +1469,12 @@ _01_000001</amContainerLogs>
*---------------+--------------+--------------------------------+
| amHostHttpAddress | string | The nodes http address of the application master |
*---------------+--------------+--------------------------------+
| allocatedMB | int | The sum of memory in MB allocated to the application's running containers |
*---------------------------------------------------------------+
| allocatedVCores | int | The sum of virtual cores allocated to the application's running containers |
+---------------------------------------------------------------+
| runningContainers | int | The number of containers currently running for the application |
+---------------------------------------------------------------+
** Response Examples