YARN-573. Shared data structures in Public Localizer and Private Localizer are not Thread safe. Contributed by Omkar Vinit Joshi

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509389 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-08-01 19:34:55 +00:00
parent 60be5fb9ed
commit 4757963d32
2 changed files with 50 additions and 50 deletions

View File

@ -783,6 +783,9 @@ Release 2.1.0-beta - 2013-08-06
YARN-945. Removed setting of AMRMToken's service from ResourceManager YARN-945. Removed setting of AMRMToken's service from ResourceManager
and changed client libraries do it all the time and correctly. (vinodkv) and changed client libraries do it all the time and correctly. (vinodkv)
YARN-573. Shared data structures in Public Localizer and Private Localizer
are not Thread safe. (Omkar Vinit Joshi via jlowe)
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh. YARN-158. Yarn creating package-info.java must not depend on sh.

View File

@ -29,6 +29,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -630,23 +631,16 @@ class PublicLocalizer extends Thread {
final Configuration conf; final Configuration conf;
final ExecutorService threadPool; final ExecutorService threadPool;
final CompletionService<Path> queue; final CompletionService<Path> queue;
// Its shared between public localizer and dispatcher thread.
final Map<Future<Path>,LocalizerResourceRequestEvent> pending; final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
PublicLocalizer(Configuration conf) { PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
createLocalizerExecutor(conf),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
}
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
super("Public Localizer"); super("Public Localizer");
this.lfs = lfs; this.lfs = getLocalFileContext(conf);
this.conf = conf; this.conf = conf;
this.pending = pending; this.pending =
new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
this.threadPool = threadPool; this.threadPool = createLocalizerExecutor(conf);
this.queue = new ExecutorCompletionService<Path>(threadPool); this.queue = new ExecutorCompletionService<Path>(threadPool);
} }
@ -748,6 +742,7 @@ class LocalizerRunner extends Thread {
final LocalizerContext context; final LocalizerContext context;
final String localizerId; final String localizerId;
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled; final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
// Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending; final List<LocalizerResourceRequestEvent> pending;
// TODO: threadsafe, use outer? // TODO: threadsafe, use outer?
@ -758,13 +753,14 @@ class LocalizerRunner extends Thread {
super("LocalizerRunner for " + localizerId); super("LocalizerRunner for " + localizerId);
this.context = context; this.context = context;
this.localizerId = localizerId; this.localizerId = localizerId;
this.pending = new ArrayList<LocalizerResourceRequestEvent>(); this.pending =
Collections
.synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
this.scheduled = this.scheduled =
new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>(); new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
} }
public void addResource(LocalizerResourceRequestEvent request) { public void addResource(LocalizerResourceRequestEvent request) {
// TDOO: Synchronization
pending.add(request); pending.add(request);
} }
@ -774,7 +770,7 @@ public void addResource(LocalizerResourceRequestEvent request) {
* @return * @return
*/ */
private LocalResource findNextResource() { private LocalResource findNextResource() {
// TODO: Synchronization synchronized (pending) {
for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator(); for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
i.hasNext();) { i.hasNext();) {
LocalizerResourceRequestEvent evt = i.next(); LocalizerResourceRequestEvent evt = i.next();
@ -812,6 +808,7 @@ private LocalResource findNextResource() {
} }
return null; return null;
} }
}
LocalizerHeartbeatResponse update( LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) { List<LocalResourceStatus> remoteResourceStatuses) {