HADOOP-11209. Configuration#updatingResource/finalParameters are not thread-safe. Contributed by Varun Saxena.
This commit is contained in:
parent
5712c9f96a
commit
786dbdfad8
|
@ -741,6 +741,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
architecture because it is slower there (Suman Somasundar via Colin P.
|
architecture because it is slower there (Suman Somasundar via Colin P.
|
||||||
McCabe)
|
McCabe)
|
||||||
|
|
||||||
|
HADOOP-11209. Configuration#updatingResource/finalParameters are not
|
||||||
|
thread-safe. (Varun Saxena via ozawa)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -52,6 +52,7 @@ import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
import java.util.WeakHashMap;
|
import java.util.WeakHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -228,7 +229,8 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
/**
|
/**
|
||||||
* List of configuration parameters marked <b>final</b>.
|
* List of configuration parameters marked <b>final</b>.
|
||||||
*/
|
*/
|
||||||
private Set<String> finalParameters = new HashSet<String>();
|
private Set<String> finalParameters = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
|
|
||||||
private boolean loadDefaults = true;
|
private boolean loadDefaults = true;
|
||||||
|
|
||||||
|
@ -258,7 +260,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
* Stores the mapping of key to the resource which modifies or loads
|
* Stores the mapping of key to the resource which modifies or loads
|
||||||
* the key most recently
|
* the key most recently
|
||||||
*/
|
*/
|
||||||
private HashMap<String, String[]> updatingResource;
|
private Map<String, String[]> updatingResource;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to keep the information about the keys which replace the deprecated
|
* Class to keep the information about the keys which replace the deprecated
|
||||||
|
@ -685,7 +687,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
*/
|
*/
|
||||||
public Configuration(boolean loadDefaults) {
|
public Configuration(boolean loadDefaults) {
|
||||||
this.loadDefaults = loadDefaults;
|
this.loadDefaults = loadDefaults;
|
||||||
updatingResource = new HashMap<String, String[]>();
|
updatingResource = new ConcurrentHashMap<String, String[]>();
|
||||||
synchronized(Configuration.class) {
|
synchronized(Configuration.class) {
|
||||||
REGISTRY.put(this, null);
|
REGISTRY.put(this, null);
|
||||||
}
|
}
|
||||||
|
@ -708,8 +710,11 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
this.overlay = (Properties)other.overlay.clone();
|
this.overlay = (Properties)other.overlay.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
this.updatingResource = new HashMap<String, String[]>(other.updatingResource);
|
this.updatingResource = new ConcurrentHashMap<String, String[]>(
|
||||||
this.finalParameters = new HashSet<String>(other.finalParameters);
|
other.updatingResource);
|
||||||
|
this.finalParameters = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
|
this.finalParameters.addAll(other.finalParameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized(Configuration.class) {
|
synchronized(Configuration.class) {
|
||||||
|
@ -2314,20 +2319,27 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
* @return final parameter set.
|
* @return final parameter set.
|
||||||
*/
|
*/
|
||||||
public Set<String> getFinalParameters() {
|
public Set<String> getFinalParameters() {
|
||||||
return new HashSet<String>(finalParameters);
|
Set<String> setFinalParams = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
|
setFinalParams.addAll(finalParameters);
|
||||||
|
return setFinalParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized Properties getProps() {
|
protected synchronized Properties getProps() {
|
||||||
if (properties == null) {
|
if (properties == null) {
|
||||||
properties = new Properties();
|
properties = new Properties();
|
||||||
HashMap<String, String[]> backup =
|
Map<String, String[]> backup =
|
||||||
new HashMap<String, String[]>(updatingResource);
|
new ConcurrentHashMap<String, String[]>(updatingResource);
|
||||||
loadResources(properties, resources, quietmode);
|
loadResources(properties, resources, quietmode);
|
||||||
if (overlay!= null) {
|
|
||||||
|
if (overlay != null) {
|
||||||
properties.putAll(overlay);
|
properties.putAll(overlay);
|
||||||
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
|
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
|
||||||
String key = (String)item.getKey();
|
String key = (String)item.getKey();
|
||||||
updatingResource.put(key, backup.get(key));
|
String[] source = backup.get(key);
|
||||||
|
if(source != null) {
|
||||||
|
updatingResource.put(key, source);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2576,16 +2588,18 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
if (value != null || allowNullValueProperties) {
|
if (value != null || allowNullValueProperties) {
|
||||||
if (!finalParameters.contains(attr)) {
|
if (!finalParameters.contains(attr)) {
|
||||||
if (value==null && allowNullValueProperties) {
|
if (value==null && allowNullValueProperties) {
|
||||||
value = DEFAULT_STRING_CHECK;
|
value = DEFAULT_STRING_CHECK;
|
||||||
}
|
}
|
||||||
properties.setProperty(attr, value);
|
properties.setProperty(attr, value);
|
||||||
updatingResource.put(attr, source);
|
if(source != null) {
|
||||||
|
updatingResource.put(attr, source);
|
||||||
|
}
|
||||||
} else if (!value.equals(properties.getProperty(attr))) {
|
} else if (!value.equals(properties.getProperty(attr))) {
|
||||||
LOG.warn(name+":an attempt to override final parameter: "+attr
|
LOG.warn(name+":an attempt to override final parameter: "+attr
|
||||||
+"; Ignoring.");
|
+"; Ignoring.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (finalParameter) {
|
if (finalParameter && attr != null) {
|
||||||
finalParameters.add(attr);
|
finalParameters.add(attr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2788,7 +2802,9 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
String value = org.apache.hadoop.io.Text.readString(in);
|
String value = org.apache.hadoop.io.Text.readString(in);
|
||||||
set(key, value);
|
set(key, value);
|
||||||
String sources[] = WritableUtils.readCompressedStringArray(in);
|
String sources[] = WritableUtils.readCompressedStringArray(in);
|
||||||
updatingResource.put(key, sources);
|
if(sources != null) {
|
||||||
|
updatingResource.put(key, sources);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1308,6 +1308,52 @@ public class TestConfiguration extends TestCase {
|
||||||
assertTrue("my.var is not final", finalParameters.contains("my.var"));
|
assertTrue("my.var is not final", finalParameters.contains("my.var"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A test to check whether this thread goes into infinite loop because of
|
||||||
|
* destruction of data structure by resize of Map. This problem was reported
|
||||||
|
* by SPARK-2546.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testConcurrentAccesses() throws Exception {
|
||||||
|
out = new BufferedWriter(new FileWriter(CONFIG));
|
||||||
|
startConfig();
|
||||||
|
declareProperty("some.config", "xyz", "xyz", false);
|
||||||
|
endConfig();
|
||||||
|
Path fileResource = new Path(CONFIG);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.addResource(fileResource);
|
||||||
|
|
||||||
|
class ConfigModifyThread extends Thread {
|
||||||
|
final private Configuration config;
|
||||||
|
final private String prefix;
|
||||||
|
|
||||||
|
public ConfigModifyThread(Configuration conf, String prefix) {
|
||||||
|
config = conf;
|
||||||
|
this.prefix = prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < 100000; i++) {
|
||||||
|
config.set("some.config.value-" + prefix + i, "value");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrayList<ConfigModifyThread> threads = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
threads.add(new ConfigModifyThread(conf, String.valueOf(i)));
|
||||||
|
}
|
||||||
|
for (Thread t: threads) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
for (Thread t: threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
// If this test completes without going into infinite loop,
|
||||||
|
// it's expected behaviour.
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] argv) throws Exception {
|
public static void main(String[] argv) throws Exception {
|
||||||
junit.textui.TestRunner.main(new String[]{
|
junit.textui.TestRunner.main(new String[]{
|
||||||
TestConfiguration.class.getName()
|
TestConfiguration.class.getName()
|
||||||
|
|
Loading…
Reference in New Issue