MAPREDUCE-3159. Ensure DefaultContainerExecutor doesn't delete application directories during app-init. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4846ca7863
commit
6217e54718
|
@ -1753,6 +1753,9 @@ Release 0.23.0 - Unreleased
|
||||||
MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
|
MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
|
||||||
map output fits in spill buffer. (todd)
|
map output fits in spill buffer. (todd)
|
||||||
|
|
||||||
|
MAPREDUCE-3159. Ensure DefaultContainerExecutor doesn't delete application
|
||||||
|
directories during app-init. (todd via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -340,12 +340,6 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
FsPermission appperms = new FsPermission(APPDIR_PERM);
|
FsPermission appperms = new FsPermission(APPDIR_PERM);
|
||||||
for (Path localDir : localDirs) {
|
for (Path localDir : localDirs) {
|
||||||
Path fullAppDir = getApplicationDir(localDir, user, appId);
|
Path fullAppDir = getApplicationDir(localDir, user, appId);
|
||||||
if (lfs.util().exists(fullAppDir)) {
|
|
||||||
// this will happen on a partial execution of localizeJob. Sometimes
|
|
||||||
// copying job.xml to the local disk succeeds but copying job.jar might
|
|
||||||
// throw out an exception. We should clean up and then try again.
|
|
||||||
lfs.delete(fullAppDir, true);
|
|
||||||
}
|
|
||||||
// create $local.dir/usercache/$user/appcache/$appId
|
// create $local.dir/usercache/$user/appcache/$appId
|
||||||
try {
|
try {
|
||||||
lfs.mkdir(fullAppDir, appperms, true);
|
lfs.mkdir(fullAppDir, appperms, true);
|
||||||
|
|
|
@ -205,6 +205,11 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
|
||||||
// typedef
|
// typedef
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transition from INIT to DOWNLOADING.
|
||||||
|
* Sends a {@link LocalizerResourceRequestEvent} to the
|
||||||
|
* {@link ResourceLocalizationService}.
|
||||||
|
*/
|
||||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||||
private static class FetchResourceTransition extends ResourceTransition {
|
private static class FetchResourceTransition extends ResourceTransition {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -71,7 +71,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityInfo;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -320,6 +319,11 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
app.getAppId()));
|
app.getAppId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For each of the requested resources for a container, determines the
|
||||||
|
* appropriate {@link LocalResourcesTracker} and forwards a
|
||||||
|
* {@link LocalResourceRequest} to that tracker.
|
||||||
|
*/
|
||||||
private void handleInitContainerResources(
|
private void handleInitContainerResources(
|
||||||
ContainerLocalizationRequestEvent rsrcReqs) {
|
ContainerLocalizationRequestEvent rsrcReqs) {
|
||||||
Container c = rsrcReqs.getContainer();
|
Container c = rsrcReqs.getContainer();
|
||||||
|
@ -833,26 +837,7 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
|
|
||||||
// 0) init queue, etc.
|
// 0) init queue, etc.
|
||||||
// 1) write credentials to private dir
|
// 1) write credentials to private dir
|
||||||
DataOutputStream tokenOut = null;
|
writeCredentials(nmPrivateCTokensPath);
|
||||||
try {
|
|
||||||
Credentials credentials = context.getCredentials();
|
|
||||||
FileContext lfs = getLocalFileContext(getConfig());
|
|
||||||
tokenOut =
|
|
||||||
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
|
|
||||||
LOG.info("Writing credentials to the nmPrivate file "
|
|
||||||
+ nmPrivateCTokensPath.toString() + ". Credentials list: ");
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
for (Token<? extends TokenIdentifier> tk : credentials
|
|
||||||
.getAllTokens()) {
|
|
||||||
LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
credentials.writeTokenStorageToStream(tokenOut);
|
|
||||||
} finally {
|
|
||||||
if (tokenOut != null) {
|
|
||||||
tokenOut.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 2) exec initApplication and wait
|
// 2) exec initApplication and wait
|
||||||
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
|
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
|
||||||
context.getUser(),
|
context.getUser(),
|
||||||
|
@ -876,6 +861,30 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeCredentials(Path nmPrivateCTokensPath)
|
||||||
|
throws IOException {
|
||||||
|
DataOutputStream tokenOut = null;
|
||||||
|
try {
|
||||||
|
Credentials credentials = context.getCredentials();
|
||||||
|
FileContext lfs = getLocalFileContext(getConfig());
|
||||||
|
tokenOut =
|
||||||
|
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
LOG.info("Writing credentials to the nmPrivate file "
|
||||||
|
+ nmPrivateCTokensPath.toString() + ". Credentials list: ");
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
for (Token<? extends TokenIdentifier> tk : credentials
|
||||||
|
.getAllTokens()) {
|
||||||
|
LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
credentials.writeTokenStorageToStream(tokenOut);
|
||||||
|
} finally {
|
||||||
|
if (tokenOut != null) {
|
||||||
|
tokenOut.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class CacheCleanup extends Thread {
|
static class CacheCleanup extends Thread {
|
||||||
|
|
|
@ -18,7 +18,11 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Events delivered to the {@link ResourceLocalizationService.LocalizerTracker}
|
||||||
|
*/
|
||||||
public class LocalizerEvent extends AbstractEvent<LocalizerEventType> {
|
public class LocalizerEvent extends AbstractEvent<LocalizerEventType> {
|
||||||
|
|
||||||
private final String localizerId;
|
private final String localizerId;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
||||||
|
|
||||||
public enum LocalizerEventType {
|
public enum LocalizerEventType {
|
||||||
|
/** See {@link LocalizerResourceRequestEvent} */
|
||||||
REQUEST_RESOURCE_LOCALIZATION,
|
REQUEST_RESOURCE_LOCALIZATION,
|
||||||
ABORT_LOCALIZATION
|
ABORT_LOCALIZATION
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,13 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizerContext;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizerContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event indicating that the {@link ResourceLocalizationService.LocalizerTracker}
|
||||||
|
* should fetch this resource.
|
||||||
|
*/
|
||||||
public class LocalizerResourceRequestEvent extends LocalizerEvent {
|
public class LocalizerResourceRequestEvent extends LocalizerEvent {
|
||||||
|
|
||||||
private final LocalizerContext context;
|
private final LocalizerContext context;
|
||||||
|
|
|
@ -17,8 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Events delivered to {@link LocalizedResource}. Each of these
|
||||||
|
* events is a subclass of {@link ResourceEvent}.
|
||||||
|
*/
|
||||||
public enum ResourceEventType {
|
public enum ResourceEventType {
|
||||||
|
/** See {@link ResourceRequestEvent} */
|
||||||
REQUEST,
|
REQUEST,
|
||||||
|
/** See {@link ResourceLocalizedEvent} */
|
||||||
LOCALIZED,
|
LOCALIZED,
|
||||||
|
/** See {@link ResourceReleaseEvent} */
|
||||||
RELEASE
|
RELEASE
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue