svn merge -c 1371390 FIXES: YARN-14. Symlinks to peer distributed cache files no longer work (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1371395 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-08-09 19:18:37 +00:00
parent 4705652915
commit 2b97a4cce4
7 changed files with 76 additions and 41 deletions

View File

@ -36,3 +36,16 @@ Release 2.1.0-alpha - Unreleased
YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
Release 0.23.3 - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
BUG FIXES
YARN-14. Symlinks to peer distributed cache files no longer work
(Jason Lowe via bobby)

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
@ -38,7 +39,7 @@ public interface Container extends EventHandler<ContainerEvent> {
Credentials getCredentials();
Map<Path,String> getLocalizedResources();
Map<Path,List<String>> getLocalizedResources();
ContainerStatus cloneAndGetContainerStatus();

View File

@ -84,10 +84,10 @@ public class ContainerImpl implements Container {
private static final Log LOG = LogFactory.getLog(Container.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final Map<LocalResourceRequest,String> pendingResources =
new HashMap<LocalResourceRequest,String>();
private final Map<Path,String> localizedResources =
new HashMap<Path,String>();
private final Map<LocalResourceRequest,List<String>> pendingResources =
new HashMap<LocalResourceRequest,List<String>>();
private final Map<Path,List<String>> localizedResources =
new HashMap<Path,List<String>>();
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> privateRsrcs =
@ -327,7 +327,7 @@ public String getUser() {
}
@Override
public Map<Path,String> getLocalizedResources() {
public Map<Path,List<String>> getLocalizedResources() {
this.readLock.lock();
try {
assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
@ -496,20 +496,25 @@ public ContainerState transition(ContainerImpl container,
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
container.pendingResources.put(req, rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
List<String> links = container.pendingResources.get(req);
if (links == null) {
links = new ArrayList<String>();
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue());
@ -560,15 +565,16 @@ static class LocalizedTransition implements
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
String sym = container.pendingResources.remove(rsrcEvent.getResource());
if (null == sym) {
List<String> syms =
container.pendingResources.remove(rsrcEvent.getResource());
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
" for container " + container.getContainerID());
assert false;
// fail container?
return ContainerState.LOCALIZING;
}
container.localizedResources.put(rsrcEvent.getLocation(), sym);
container.localizedResources.put(rsrcEvent.getLocation(), syms);
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
@ -728,15 +734,16 @@ static class LocalizedResourceDuringKillTransition implements
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
String sym = container.pendingResources.remove(rsrcEvent.getResource());
if (null == sym) {
List<String> syms =
container.pendingResources.remove(rsrcEvent.getResource());
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
" for container " + container.getContainerID());
assert false;
// fail container?
return;
}
container.localizedResources.put(rsrcEvent.getLocation(), sym);
container.localizedResources.put(rsrcEvent.getLocation(), syms);
}
}

View File

@ -111,7 +111,8 @@ public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
@SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,String> localResources = container.getLocalizedResources();
final Map<Path,List<String>> localResources =
container.getLocalizedResources();
ContainerId containerID = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser();
@ -533,7 +534,7 @@ public void sanitizeEnv(Map<String, String> environment,
}
static void writeLaunchEnv(OutputStream out,
Map<String,String> environment, Map<Path,String> resources,
Map<String,String> environment, Map<Path,List<String>> resources,
List<String> command)
throws IOException {
ShellScriptBuilder sb = new ShellScriptBuilder();
@ -543,8 +544,10 @@ static void writeLaunchEnv(OutputStream out,
}
}
if (resources != null) {
for (Map.Entry<Path,String> link : resources.entrySet()) {
sb.symlink(link.getKey(), link.getValue());
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), linkName);
}
}
}

View File

@ -29,12 +29,15 @@
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@ -111,11 +114,12 @@ public void testLocalizationLaunch() throws Exception {
wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
assertEquals(ContainerState.NEW, wc.c.getContainerState());
wc.initContainer();
Map<Path, String> localPaths = wc.localizeResources();
Map<Path, List<String>> localPaths = wc.localizeResources();
// all resources should be localized
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) {
for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
.entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
}
assertTrue(localPaths.isEmpty());
@ -578,10 +582,12 @@ public void initContainer() {
// Localize resources
// Skip some resources so as to consider them failed
public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
int skipRsrcCount) throws URISyntaxException {
public Map<Path, List<String>> doLocalizeResources(
boolean checkLocalizingState, int skipRsrcCount)
throws URISyntaxException {
Path cache = new Path("file:///cache");
Map<Path, String> localPaths = new HashMap<Path, String>();
Map<Path, List<String>> localPaths =
new HashMap<Path, List<String>>();
int counter = 0;
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
if (counter++ < skipRsrcCount) {
@ -592,7 +598,7 @@ public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
}
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
localPaths.put(p, Arrays.asList(rsrc.getKey()));
// rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
req, p));
@ -602,7 +608,8 @@ public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
}
public Map<Path, String> localizeResources() throws URISyntaxException {
public Map<Path, List<String>> localizeResources()
throws URISyntaxException {
return doLocalizeResources(true, 0);
}

View File

@ -28,6 +28,7 @@
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -95,9 +96,10 @@ public void testSpecialCharSymlinks() throws IOException {
writer.println(timeoutCommand);
writer.close();
Map<Path, String> resources = new HashMap<Path, String>();
Map<Path, List<String>> resources =
new HashMap<Path, List<String>>();
Path path = new Path(shellFile.getAbsolutePath());
resources.put(path, badSymlink);
resources.put(path, Arrays.asList(badSymlink));
FileOutputStream fos = new FileOutputStream(tempFile);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@ -43,7 +44,8 @@ public class MockContainer implements Container {
private ContainerState state;
private String user;
private ContainerLaunchContext launchContext;
private final Map<Path, String> resource = new HashMap<Path, String>();
private final Map<Path, List<String>> resource =
new HashMap<Path, List<String>>();
private RecordFactory recordFactory;
public MockContainer(ApplicationAttemptId appAttemptId,
@ -92,7 +94,7 @@ public Credentials getCredentials() {
}
@Override
public Map<Path, String> getLocalizedResources() {
public Map<Path, List<String>> getLocalizedResources() {
return resource;
}