Merge -c 1188466 from trunk to branch-0.23 to complete fix for MAPREDUCE-3159.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1188468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-25 00:42:56 +00:00
parent 1527edc627
commit 88c24ca831
8 changed files with 57 additions and 27 deletions

View File

@ -1697,6 +1697,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
map output fits in spill buffer. (todd)
MAPREDUCE-3159. Ensure DefaultContainerExecutor doesn't delete application
directories during app-init. (todd via acmurthy)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -340,12 +340,6 @@ public class DefaultContainerExecutor extends ContainerExecutor {
FsPermission appperms = new FsPermission(APPDIR_PERM);
for (Path localDir : localDirs) {
Path fullAppDir = getApplicationDir(localDir, user, appId);
if (lfs.util().exists(fullAppDir)) {
// this will happen on a partial execution of localizeJob. Sometimes
// copying job.xml to the local disk succeeds but copying job.jar might
// throw out an exception. We should clean up and then try again.
lfs.delete(fullAppDir, true);
}
// create $local.dir/usercache/$user/appcache/$appId
try {
lfs.mkdir(fullAppDir, appperms, true);

View File

@ -205,6 +205,11 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
// typedef
}
/**
* Transition from INIT to DOWNLOADING.
* Sends a {@link LocalizerResourceRequestEvent} to the
* {@link ResourceLocalizationService}.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
private static class FetchResourceTransition extends ResourceTransition {
@Override

View File

@ -71,7 +71,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -320,6 +319,11 @@ public class ResourceLocalizationService extends CompositeService
app.getAppId()));
}
/**
* For each of the requested resources for a container, determines the
* appropriate {@link LocalResourcesTracker} and forwards a
* {@link LocalResourceRequest} to that tracker.
*/
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
@ -833,26 +837,7 @@ public class ResourceLocalizationService extends CompositeService
// 0) init queue, etc.
// 1) write credentials to private dir
DataOutputStream tokenOut = null;
try {
Credentials credentials = context.getCredentials();
FileContext lfs = getLocalFileContext(getConfig());
tokenOut =
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
LOG.info("Writing credentials to the nmPrivate file "
+ nmPrivateCTokensPath.toString() + ". Credentials list: ");
if (LOG.isDebugEnabled()) {
for (Token<? extends TokenIdentifier> tk : credentials
.getAllTokens()) {
LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
}
}
credentials.writeTokenStorageToStream(tokenOut);
} finally {
if (tokenOut != null) {
tokenOut.close();
}
}
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
@ -876,6 +861,30 @@ public class ResourceLocalizationService extends CompositeService
}
}
private void writeCredentials(Path nmPrivateCTokensPath)
throws IOException {
DataOutputStream tokenOut = null;
try {
Credentials credentials = context.getCredentials();
FileContext lfs = getLocalFileContext(getConfig());
tokenOut =
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
LOG.info("Writing credentials to the nmPrivate file "
+ nmPrivateCTokensPath.toString() + ". Credentials list: ");
if (LOG.isDebugEnabled()) {
for (Token<? extends TokenIdentifier> tk : credentials
.getAllTokens()) {
LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
}
}
credentials.writeTokenStorageToStream(tokenOut);
} finally {
if (tokenOut != null) {
tokenOut.close();
}
}
}
}
static class CacheCleanup extends Thread {

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
/**
* Events delivered to the {@link ResourceLocalizationService.LocalizerTracker}
*/
public class LocalizerEvent extends AbstractEvent<LocalizerEventType> {
private final String localizerId;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
public enum LocalizerEventType {
/** See {@link LocalizerResourceRequestEvent} */
REQUEST_RESOURCE_LOCALIZATION,
ABORT_LOCALIZATION
}

View File

@ -20,8 +20,13 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizerContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Event indicating that the {@link ResourceLocalizationService.LocalizerTracker}
* should fetch this resource.
*/
public class LocalizerResourceRequestEvent extends LocalizerEvent {
private final LocalizerContext context;

View File

@ -17,8 +17,17 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
/**
* Events delivered to {@link LocalizedResource}. Each of these
* events is a subclass of {@link ResourceEvent}.
*/
public enum ResourceEventType {
/** See {@link ResourceRequestEvent} */
REQUEST,
/** See {@link ResourceLocalizedEvent} */
LOCALIZED,
/** See {@link ResourceReleaseEvent} */
RELEASE
}