svn merge -c 1509389 FIXES: YARN-573. Shared data structures in Public Localizer and Private Localizer are not Thread safe. Contributed by Omkar Vinit Joshi

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1509393 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-08-01 19:40:09 +00:00
parent d3946f980a
commit 83a09512a4
2 changed files with 50 additions and 50 deletions

View File

@ -768,6 +768,9 @@ Release 2.1.0-beta - 2013-08-06
YARN-945. Removed setting of AMRMToken's service from ResourceManager YARN-945. Removed setting of AMRMToken's service from ResourceManager
and changed client libraries do it all the time and correctly. (vinodkv) and changed client libraries do it all the time and correctly. (vinodkv)
YARN-573. Shared data structures in Public Localizer and Private Localizer
are not Thread safe. (Omkar Vinit Joshi via jlowe)
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh. YARN-158. Yarn creating package-info.java must not depend on sh.

View File

@ -29,6 +29,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -630,23 +631,16 @@ public class ResourceLocalizationService extends CompositeService
final Configuration conf; final Configuration conf;
final ExecutorService threadPool; final ExecutorService threadPool;
final CompletionService<Path> queue; final CompletionService<Path> queue;
// Its shared between public localizer and dispatcher thread.
final Map<Future<Path>,LocalizerResourceRequestEvent> pending; final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
PublicLocalizer(Configuration conf) { PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
createLocalizerExecutor(conf),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
}
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
super("Public Localizer"); super("Public Localizer");
this.lfs = lfs; this.lfs = getLocalFileContext(conf);
this.conf = conf; this.conf = conf;
this.pending = pending; this.pending =
new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
this.threadPool = threadPool; this.threadPool = createLocalizerExecutor(conf);
this.queue = new ExecutorCompletionService<Path>(threadPool); this.queue = new ExecutorCompletionService<Path>(threadPool);
} }
@ -748,6 +742,7 @@ public class ResourceLocalizationService extends CompositeService
final LocalizerContext context; final LocalizerContext context;
final String localizerId; final String localizerId;
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled; final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
// Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending; final List<LocalizerResourceRequestEvent> pending;
// TODO: threadsafe, use outer? // TODO: threadsafe, use outer?
@ -758,13 +753,14 @@ public class ResourceLocalizationService extends CompositeService
super("LocalizerRunner for " + localizerId); super("LocalizerRunner for " + localizerId);
this.context = context; this.context = context;
this.localizerId = localizerId; this.localizerId = localizerId;
this.pending = new ArrayList<LocalizerResourceRequestEvent>(); this.pending =
Collections
.synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
this.scheduled = this.scheduled =
new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>(); new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
} }
public void addResource(LocalizerResourceRequestEvent request) { public void addResource(LocalizerResourceRequestEvent request) {
// TDOO: Synchronization
pending.add(request); pending.add(request);
} }
@ -774,43 +770,44 @@ public class ResourceLocalizationService extends CompositeService
* @return * @return
*/ */
private LocalResource findNextResource() { private LocalResource findNextResource() {
// TODO: Synchronization synchronized (pending) {
for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator(); for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
i.hasNext();) { i.hasNext();) {
LocalizerResourceRequestEvent evt = i.next(); LocalizerResourceRequestEvent evt = i.next();
LocalizedResource nRsrc = evt.getResource(); LocalizedResource nRsrc = evt.getResource();
// Resource download should take place ONLY if resource is in // Resource download should take place ONLY if resource is in
// Downloading state // Downloading state
if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
i.remove(); i.remove();
continue; continue;
} }
/* /*
* Multiple containers will try to download the same resource. So the * Multiple containers will try to download the same resource. So the
* resource download should start only if * resource download should start only if
* 1) We can acquire a non blocking semaphore lock on resource * 1) We can acquire a non blocking semaphore lock on resource
* 2) Resource is still in DOWNLOADING state * 2) Resource is still in DOWNLOADING state
*/ */
if (nRsrc.tryAcquire()) { if (nRsrc.tryAcquire()) {
if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResourceRequest nextRsrc = nRsrc.getRequest(); LocalResourceRequest nextRsrc = nRsrc.getRequest();
LocalResource next = LocalResource next =
recordFactory.newRecordInstance(LocalResource.class); recordFactory.newRecordInstance(LocalResource.class);
next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
.getPath())); .getPath()));
next.setTimestamp(nextRsrc.getTimestamp()); next.setTimestamp(nextRsrc.getTimestamp());
next.setType(nextRsrc.getType()); next.setType(nextRsrc.getType());
next.setVisibility(evt.getVisibility()); next.setVisibility(evt.getVisibility());
next.setPattern(evt.getPattern()); next.setPattern(evt.getPattern());
scheduled.put(nextRsrc, evt); scheduled.put(nextRsrc, evt);
return next; return next;
} else { } else {
// Need to release acquired lock // Need to release acquired lock
nRsrc.unlock(); nRsrc.unlock();
} }
} }
}
return null;
} }
return null;
} }
LocalizerHeartbeatResponse update( LocalizerHeartbeatResponse update(