HADOOP-11209. Configuration#updatingResource/finalParameters are not thread-safe. Contributed by Varun Saxena.

(cherry picked from commit 786dbdfad8)
This commit is contained in:
Tsuyoshi Ozawa 2015-01-22 14:15:59 +09:00
parent 316eedb55f
commit 1435b2002b
3 changed files with 80 additions and 15 deletions

View File

@ -382,6 +382,9 @@ Release 2.7.0 - UNRELEASED
architecture because it is slower there (Suman Somasundar via Colin P. architecture because it is slower there (Suman Somasundar via Colin P.
McCabe) McCabe)
HADOOP-11209. Configuration#updatingResource/finalParameters are not
thread-safe. (Varun Saxena via ozawa)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -52,6 +52,7 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -228,7 +229,8 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
/** /**
* List of configuration parameters marked <b>final</b>. * List of configuration parameters marked <b>final</b>.
*/ */
private Set<String> finalParameters = new HashSet<String>(); private Set<String> finalParameters = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
private boolean loadDefaults = true; private boolean loadDefaults = true;
@ -258,7 +260,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* Stores the mapping of key to the resource which modifies or loads * Stores the mapping of key to the resource which modifies or loads
* the key most recently * the key most recently
*/ */
private HashMap<String, String[]> updatingResource; private Map<String, String[]> updatingResource;
/** /**
* Class to keep the information about the keys which replace the deprecated * Class to keep the information about the keys which replace the deprecated
@ -685,7 +687,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
*/ */
public Configuration(boolean loadDefaults) { public Configuration(boolean loadDefaults) {
this.loadDefaults = loadDefaults; this.loadDefaults = loadDefaults;
updatingResource = new HashMap<String, String[]>(); updatingResource = new ConcurrentHashMap<String, String[]>();
synchronized(Configuration.class) { synchronized(Configuration.class) {
REGISTRY.put(this, null); REGISTRY.put(this, null);
} }
@ -708,8 +710,11 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
this.overlay = (Properties)other.overlay.clone(); this.overlay = (Properties)other.overlay.clone();
} }
this.updatingResource = new HashMap<String, String[]>(other.updatingResource); this.updatingResource = new ConcurrentHashMap<String, String[]>(
this.finalParameters = new HashSet<String>(other.finalParameters); other.updatingResource);
this.finalParameters = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
this.finalParameters.addAll(other.finalParameters);
} }
synchronized(Configuration.class) { synchronized(Configuration.class) {
@ -2306,20 +2311,27 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* @return final parameter set. * @return final parameter set.
*/ */
public Set<String> getFinalParameters() { public Set<String> getFinalParameters() {
return new HashSet<String>(finalParameters); Set<String> setFinalParams = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
setFinalParams.addAll(finalParameters);
return setFinalParams;
} }
protected synchronized Properties getProps() { protected synchronized Properties getProps() {
if (properties == null) { if (properties == null) {
properties = new Properties(); properties = new Properties();
HashMap<String, String[]> backup = Map<String, String[]> backup =
new HashMap<String, String[]>(updatingResource); new ConcurrentHashMap<String, String[]>(updatingResource);
loadResources(properties, resources, quietmode); loadResources(properties, resources, quietmode);
if (overlay!= null) {
if (overlay != null) {
properties.putAll(overlay); properties.putAll(overlay);
for (Map.Entry<Object,Object> item: overlay.entrySet()) { for (Map.Entry<Object,Object> item: overlay.entrySet()) {
String key = (String)item.getKey(); String key = (String)item.getKey();
updatingResource.put(key, backup.get(key)); String[] source = backup.get(key);
if(source != null) {
updatingResource.put(key, source);
}
} }
} }
} }
@ -2568,16 +2580,18 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
if (value != null || allowNullValueProperties) { if (value != null || allowNullValueProperties) {
if (!finalParameters.contains(attr)) { if (!finalParameters.contains(attr)) {
if (value==null && allowNullValueProperties) { if (value==null && allowNullValueProperties) {
value = DEFAULT_STRING_CHECK; value = DEFAULT_STRING_CHECK;
} }
properties.setProperty(attr, value); properties.setProperty(attr, value);
updatingResource.put(attr, source); if(source != null) {
updatingResource.put(attr, source);
}
} else if (!value.equals(properties.getProperty(attr))) { } else if (!value.equals(properties.getProperty(attr))) {
LOG.warn(name+":an attempt to override final parameter: "+attr LOG.warn(name+":an attempt to override final parameter: "+attr
+"; Ignoring."); +"; Ignoring.");
} }
} }
if (finalParameter) { if (finalParameter && attr != null) {
finalParameters.add(attr); finalParameters.add(attr);
} }
} }
@ -2780,7 +2794,9 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
String value = org.apache.hadoop.io.Text.readString(in); String value = org.apache.hadoop.io.Text.readString(in);
set(key, value); set(key, value);
String sources[] = WritableUtils.readCompressedStringArray(in); String sources[] = WritableUtils.readCompressedStringArray(in);
updatingResource.put(key, sources); if(sources != null) {
updatingResource.put(key, sources);
}
} }
} }

View File

@ -1216,6 +1216,52 @@ public class TestConfiguration extends TestCase {
assertTrue("my.var is not final", finalParameters.contains("my.var")); assertTrue("my.var is not final", finalParameters.contains("my.var"));
} }
/**
* A test to check whether this thread goes into infinite loop because of
* destruction of data structure by resize of Map. This problem was reported
* by SPARK-2546.
* @throws Exception
*/
public void testConcurrentAccesses() throws Exception {
out = new BufferedWriter(new FileWriter(CONFIG));
startConfig();
declareProperty("some.config", "xyz", "xyz", false);
endConfig();
Path fileResource = new Path(CONFIG);
Configuration conf = new Configuration();
conf.addResource(fileResource);
class ConfigModifyThread extends Thread {
final private Configuration config;
final private String prefix;
public ConfigModifyThread(Configuration conf, String prefix) {
config = conf;
this.prefix = prefix;
}
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
config.set("some.config.value-" + prefix + i, "value");
}
}
}
ArrayList<ConfigModifyThread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
threads.add(new ConfigModifyThread(conf, String.valueOf(i)));
}
for (Thread t: threads) {
t.start();
}
for (Thread t: threads) {
t.join();
}
// If this test completes without going into infinite loop,
// it's expected behaviour.
}
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
junit.textui.TestRunner.main(new String[]{ junit.textui.TestRunner.main(new String[]{
TestConfiguration.class.getName() TestConfiguration.class.getName()