YARN-9180. Port YARN-7033 NM recovery of assigned resources to branch-2
This commit is contained in:
parent
56259bcecb
commit
631dfc7277
|
@ -98,4 +98,11 @@ public interface Container extends EventHandler<ContainerEvent> {
|
||||||
void sendPauseEvent(String description);
|
void sendPauseEvent(String description);
|
||||||
|
|
||||||
Priority getPriority();
|
Priority getPriority();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get assigned resource mappings to the container.
|
||||||
|
*
|
||||||
|
* @return Resource Mappings of the container
|
||||||
|
*/
|
||||||
|
ResourceMappings getResourceMappings();
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,6 +188,7 @@ public class ContainerImpl implements Container {
|
||||||
private boolean recoveredAsKilled = false;
|
private boolean recoveredAsKilled = false;
|
||||||
private Context context;
|
private Context context;
|
||||||
private ResourceSet resourceSet;
|
private ResourceSet resourceSet;
|
||||||
|
private ResourceMappings resourceMappings;
|
||||||
|
|
||||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||||
ContainerLaunchContext launchContext, Credentials creds,
|
ContainerLaunchContext launchContext, Credentials creds,
|
||||||
|
@ -245,6 +246,7 @@ public class ContainerImpl implements Container {
|
||||||
stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
|
stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
|
||||||
context.getContainerStateTransitionListener());
|
context.getContainerStateTransitionListener());
|
||||||
this.resourceSet = new ResourceSet();
|
this.resourceSet = new ResourceSet();
|
||||||
|
this.resourceMappings = new ResourceMappings();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ContainerRetryContext configureRetryContext(
|
private static ContainerRetryContext configureRetryContext(
|
||||||
|
@ -285,6 +287,7 @@ public class ContainerImpl implements Container {
|
||||||
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
|
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
|
||||||
this.workDir = rcs.getWorkDir();
|
this.workDir = rcs.getWorkDir();
|
||||||
this.logDir = rcs.getLogDir();
|
this.logDir = rcs.getLogDir();
|
||||||
|
this.resourceMappings = rcs.getResourceMappings();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
|
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
|
||||||
|
@ -2172,4 +2175,14 @@ public class ContainerImpl implements Container {
|
||||||
public Priority getPriority() {
|
public Priority getPriority() {
|
||||||
return containerTokenIdentifier.getPriority();
|
return containerTokenIdentifier.getPriority();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get assigned resource mappings to the container.
|
||||||
|
*
|
||||||
|
* @return Resource Mappings of the container
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ResourceMappings getResourceMappings() {
|
||||||
|
return resourceMappings;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
|
import java.io.ObjectOutputStream;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is used to store assigned resource to a single container by
|
||||||
|
* resource types.
|
||||||
|
*
|
||||||
|
* Assigned resource could be list of String
|
||||||
|
*
|
||||||
|
* For example, we can assign container to:
|
||||||
|
* "numa": ["numa0"]
|
||||||
|
* "gpu": ["0", "1", "2", "3"]
|
||||||
|
* "fpga": ["1", "3"]
|
||||||
|
*
|
||||||
|
* This will be used for NM restart container recovery.
|
||||||
|
*/
|
||||||
|
public class ResourceMappings {
|
||||||
|
|
||||||
|
private Map<String, AssignedResources> assignedResourcesMap = new HashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all resource mappings.
|
||||||
|
* @param resourceType resourceType
|
||||||
|
* @return map of resource mapping
|
||||||
|
*/
|
||||||
|
public List<Serializable> getAssignedResources(String resourceType) {
|
||||||
|
AssignedResources ar = assignedResourcesMap.get(resourceType);
|
||||||
|
if (null == ar) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
return ar.getAssignedResources();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the resources for a given resource type.
|
||||||
|
*
|
||||||
|
* @param resourceType Resource Type
|
||||||
|
* @param assigned Assigned resources to add
|
||||||
|
*/
|
||||||
|
public void addAssignedResources(String resourceType,
|
||||||
|
AssignedResources assigned) {
|
||||||
|
assignedResourcesMap.put(resourceType, assigned);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores resources assigned to a container for a given resource type.
|
||||||
|
*/
|
||||||
|
public static class AssignedResources implements Serializable {
|
||||||
|
private static final long serialVersionUID = -1059491941955757926L;
|
||||||
|
private List<Serializable> resources = Collections.emptyList();
|
||||||
|
|
||||||
|
public List<Serializable> getAssignedResources() {
|
||||||
|
return Collections.unmodifiableList(resources);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateAssignedResources(List<Serializable> list) {
|
||||||
|
this.resources = new ArrayList<>(list);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static AssignedResources fromBytes(byte[] bytes)
|
||||||
|
throws IOException {
|
||||||
|
ObjectInputStream ois = null;
|
||||||
|
List<Serializable> resources;
|
||||||
|
try {
|
||||||
|
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
|
||||||
|
ois = new ObjectInputStream(bis);
|
||||||
|
resources = (List<Serializable>) ois.readObject();
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeQuietly(ois);
|
||||||
|
}
|
||||||
|
AssignedResources ar = new AssignedResources();
|
||||||
|
ar.updateAssignedResources(resources);
|
||||||
|
return ar;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] toBytes() throws IOException {
|
||||||
|
ObjectOutputStream oos = null;
|
||||||
|
byte[] bytes;
|
||||||
|
try {
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
oos = new ObjectOutputStream(bos);
|
||||||
|
oos.writeObject(resources);
|
||||||
|
bytes = bos.toByteArray();
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeQuietly(oos);
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
|
||||||
|
@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
|
||||||
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
|
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
@ -148,6 +151,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
|
|
||||||
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
|
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
|
||||||
|
|
||||||
|
private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
|
||||||
|
"/assignedResources_";
|
||||||
|
|
||||||
private static final byte[] EMPTY_VALUE = new byte[0];
|
private static final byte[] EMPTY_VALUE = new byte[0];
|
||||||
|
|
||||||
private DB db;
|
private DB db;
|
||||||
|
@ -309,6 +315,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
rcs.setWorkDir(asString(entry.getValue()));
|
rcs.setWorkDir(asString(entry.getValue()));
|
||||||
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
|
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
|
||||||
rcs.setLogDir(asString(entry.getValue()));
|
rcs.setLogDir(asString(entry.getValue()));
|
||||||
|
} else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) {
|
||||||
|
String resourceType = suffix.substring(
|
||||||
|
CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length());
|
||||||
|
ResourceMappings.AssignedResources assignedResources =
|
||||||
|
ResourceMappings.AssignedResources.fromBytes(entry.getValue());
|
||||||
|
rcs.getResourceMappings().addAssignedResources(resourceType,
|
||||||
|
assignedResources);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("the container " + containerId
|
LOG.warn("the container " + containerId
|
||||||
+ " will be killed because of the unknown key " + key
|
+ " will be killed because of the unknown key " + key
|
||||||
|
@ -1166,6 +1179,35 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeAssignedResources(ContainerId containerId,
|
||||||
|
String resourceType, List<Serializable> assignedResources)
|
||||||
|
throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("storeAssignedResources: containerId=" + containerId
|
||||||
|
+ ", assignedResources=" + StringUtils.join(",", assignedResources));
|
||||||
|
}
|
||||||
|
|
||||||
|
String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
|
||||||
|
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
|
||||||
|
try {
|
||||||
|
WriteBatch batch = db.createWriteBatch();
|
||||||
|
try {
|
||||||
|
ResourceMappings.AssignedResources res =
|
||||||
|
new ResourceMappings.AssignedResources();
|
||||||
|
res.updateAssignedResources(assignedResources);
|
||||||
|
|
||||||
|
// New value will overwrite old values for the same key
|
||||||
|
batch.put(bytes(keyResChng), res.toBytes());
|
||||||
|
db.write(batch);
|
||||||
|
} finally {
|
||||||
|
batch.close();
|
||||||
|
}
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private void cleanupDeprecatedFinishedApps() {
|
private void cleanupDeprecatedFinishedApps() {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -266,6 +267,12 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeAssignedResources(ContainerId containerId,
|
||||||
|
String resourceType, List<Serializable> assignedResources)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initStorage(Configuration conf) throws IOException {
|
protected void initStorage(Configuration conf) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -90,6 +92,7 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
private RecoveredContainerType recoveryType =
|
private RecoveredContainerType recoveryType =
|
||||||
RecoveredContainerType.RECOVER;
|
RecoveredContainerType.RECOVER;
|
||||||
private long startTime;
|
private long startTime;
|
||||||
|
private ResourceMappings resMappings = new ResourceMappings();
|
||||||
|
|
||||||
public RecoveredContainerStatus getStatus() {
|
public RecoveredContainerStatus getStatus() {
|
||||||
return status;
|
return status;
|
||||||
|
@ -174,6 +177,14 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
public void setRecoveryType(RecoveredContainerType recoveryType) {
|
public void setRecoveryType(RecoveredContainerType recoveryType) {
|
||||||
this.recoveryType = recoveryType;
|
this.recoveryType = recoveryType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ResourceMappings getResourceMappings() {
|
||||||
|
return resMappings;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResourceMappings(ResourceMappings mappings) {
|
||||||
|
this.resMappings = mappings;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class LocalResourceTrackerState {
|
public static class LocalResourceTrackerState {
|
||||||
|
@ -718,6 +729,18 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
|
public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the assigned resources to a container.
|
||||||
|
*
|
||||||
|
* @param containerId Container Id
|
||||||
|
* @param resourceType Resource Type
|
||||||
|
* @param assignedResources Assigned resources
|
||||||
|
* @throws IOException if fails
|
||||||
|
*/
|
||||||
|
public abstract void storeAssignedResources(ContainerId containerId,
|
||||||
|
String resourceType, List<Serializable> assignedResources)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
protected abstract void initStorage(Configuration conf) throws IOException;
|
protected abstract void initStorage(Configuration conf) throws IOException;
|
||||||
|
|
||||||
protected abstract void startStorage() throws IOException;
|
protected abstract void startStorage() throws IOException;
|
||||||
|
|
|
@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.io.Serializable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -91,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
|
@ -110,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerIn
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
|
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -457,7 +460,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||||
stateStore.init(conf);
|
stateStore.init(conf);
|
||||||
stateStore.start();
|
stateStore.start();
|
||||||
Context context = createContext(conf, stateStore);
|
context = createContext(conf, stateStore);
|
||||||
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
||||||
((NMContext) context).setContainerManager(cm);
|
((NMContext) context).setContainerManager(cm);
|
||||||
cm.init(conf);
|
cm.init(conf);
|
||||||
|
@ -467,55 +470,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
ApplicationAttemptId attemptId =
|
ApplicationAttemptId attemptId =
|
||||||
ApplicationAttemptId.newInstance(appId, 1);
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
||||||
Map<String, String> containerEnv = new HashMap<>();
|
|
||||||
setFlowContext(containerEnv, "app_name1", appId);
|
commonLaunchContainer(appId, cid, cm);
|
||||||
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
|
|
||||||
Credentials containerCreds = new Credentials();
|
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
|
||||||
containerCreds.writeTokenStorageToStream(dob);
|
|
||||||
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
|
|
||||||
dob.getLength());
|
|
||||||
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
|
|
||||||
File tmpDir = new File("target",
|
|
||||||
this.getClass().getSimpleName() + "-tmpDir");
|
|
||||||
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
|
|
||||||
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
|
||||||
if (Shell.WINDOWS) {
|
|
||||||
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
|
|
||||||
} else {
|
|
||||||
fileWriter.write("\numask 0");
|
|
||||||
fileWriter.write("\nexec sleep 100");
|
|
||||||
}
|
|
||||||
fileWriter.close();
|
|
||||||
FileContext localFS = FileContext.getLocalFSFileContext();
|
|
||||||
URL resource_alpha =
|
|
||||||
URL.fromPath(localFS
|
|
||||||
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
|
||||||
LocalResource rsrc_alpha = RecordFactoryProvider
|
|
||||||
.getRecordFactory(null).newRecordInstance(LocalResource.class);
|
|
||||||
rsrc_alpha.setResource(resource_alpha);
|
|
||||||
rsrc_alpha.setSize(-1);
|
|
||||||
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
||||||
rsrc_alpha.setType(LocalResourceType.FILE);
|
|
||||||
rsrc_alpha.setTimestamp(scriptFile.lastModified());
|
|
||||||
String destinationFile = "dest_file";
|
|
||||||
Map<String, LocalResource> localResources = new HashMap<>();
|
|
||||||
localResources.put(destinationFile, rsrc_alpha);
|
|
||||||
List<String> commands =
|
|
||||||
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
|
||||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
|
||||||
localResources, containerEnv, commands, serviceData,
|
|
||||||
containerTokens, acls);
|
|
||||||
StartContainersResponse startResponse = startContainer(
|
|
||||||
context, cm, cid, clc, null);
|
|
||||||
assertTrue(startResponse.getFailedRequests().isEmpty());
|
|
||||||
assertEquals(1, context.getApplications().size());
|
|
||||||
Application app = context.getApplications().get(appId);
|
Application app = context.getApplications().get(appId);
|
||||||
assertNotNull(app);
|
assertNotNull(app);
|
||||||
// make sure the container reaches RUNNING state
|
|
||||||
waitForNMContainerState(cm, cid,
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager
|
|
||||||
.containermanager.container.ContainerState.RUNNING);
|
|
||||||
Resource targetResource = Resource.newInstance(2048, 2);
|
Resource targetResource = Resource.newInstance(2048, 2);
|
||||||
ContainerUpdateResponse updateResponse =
|
ContainerUpdateResponse updateResponse =
|
||||||
updateContainers(context, cm, cid, targetResource);
|
updateContainers(context, cm, cid, targetResource);
|
||||||
|
@ -538,6 +498,62 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
assertEquals(targetResource, containerStatus.getCapability());
|
assertEquals(targetResource, containerStatus.getCapability());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceMappingRecoveryForContainer() throws Exception {
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
|
||||||
|
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||||
|
stateStore.init(conf);
|
||||||
|
stateStore.start();
|
||||||
|
context = createContext(conf, stateStore);
|
||||||
|
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
||||||
|
((NMContext) context).setContainerManager(cm);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
|
||||||
|
// add an application by starting a container
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
||||||
|
|
||||||
|
commonLaunchContainer(appId, cid, cm);
|
||||||
|
|
||||||
|
Application app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
|
||||||
|
// store resource mapping of the container
|
||||||
|
List<Serializable> gpuResources =
|
||||||
|
Arrays.<Serializable>asList("1", "2", "3");
|
||||||
|
stateStore.storeAssignedResources(cid, "gpu", gpuResources);
|
||||||
|
List<Serializable> numaResources = Arrays.<Serializable>asList("numa1");
|
||||||
|
stateStore.storeAssignedResources(cid, "numa", numaResources);
|
||||||
|
List<Serializable> fpgaResources =
|
||||||
|
Arrays.<Serializable>asList("fpga1", "fpga2");
|
||||||
|
stateStore.storeAssignedResources(cid, "fpga", fpgaResources);
|
||||||
|
|
||||||
|
cm.stop();
|
||||||
|
context = createContext(conf, stateStore);
|
||||||
|
cm = createContainerManager(context);
|
||||||
|
((NMContext) context).setContainerManager(cm);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
|
||||||
|
Container nmContainer = context.getContainers().get(cid);
|
||||||
|
Assert.assertNotNull(nmContainer);
|
||||||
|
ResourceMappings resourceMappings = nmContainer.getResourceMappings();
|
||||||
|
List<Serializable> assignedResource = resourceMappings
|
||||||
|
.getAssignedResources("gpu");
|
||||||
|
Assert.assertTrue(assignedResource.equals(gpuResources));
|
||||||
|
Assert.assertTrue(
|
||||||
|
resourceMappings.getAssignedResources("numa").equals(numaResources));
|
||||||
|
Assert.assertTrue(
|
||||||
|
resourceMappings.getAssignedResources("fpga").equals(fpgaResources));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerCleanupOnShutdown() throws Exception {
|
public void testContainerCleanupOnShutdown() throws Exception {
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
@ -610,6 +626,57 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
|
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
|
||||||
|
ContainerManagerImpl cm) throws Exception {
|
||||||
|
Map<String, String> containerEnv = new HashMap<>();
|
||||||
|
setFlowContext(containerEnv, "app_name1", appId);
|
||||||
|
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
|
||||||
|
Credentials containerCreds = new Credentials();
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
containerCreds.writeTokenStorageToStream(dob);
|
||||||
|
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
|
||||||
|
dob.getLength());
|
||||||
|
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
|
||||||
|
File tmpDir = new File("target",
|
||||||
|
this.getClass().getSimpleName() + "-tmpDir");
|
||||||
|
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
|
||||||
|
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||||
|
if (Shell.WINDOWS) {
|
||||||
|
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
|
||||||
|
} else {
|
||||||
|
fileWriter.write("\numask 0");
|
||||||
|
fileWriter.write("\nexec sleep 100");
|
||||||
|
}
|
||||||
|
fileWriter.close();
|
||||||
|
FileContext localFS = FileContext.getLocalFSFileContext();
|
||||||
|
URL resource_alpha =
|
||||||
|
URL.fromPath(localFS
|
||||||
|
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||||
|
LocalResource rsrc_alpha = RecordFactoryProvider
|
||||||
|
.getRecordFactory(null).newRecordInstance(LocalResource.class);
|
||||||
|
rsrc_alpha.setResource(resource_alpha);
|
||||||
|
rsrc_alpha.setSize(-1);
|
||||||
|
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||||
|
rsrc_alpha.setType(LocalResourceType.FILE);
|
||||||
|
rsrc_alpha.setTimestamp(scriptFile.lastModified());
|
||||||
|
String destinationFile = "dest_file";
|
||||||
|
Map<String, LocalResource> localResources = new HashMap<>();
|
||||||
|
localResources.put(destinationFile, rsrc_alpha);
|
||||||
|
List<String> commands =
|
||||||
|
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||||
|
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, containerEnv, commands, serviceData,
|
||||||
|
containerTokens, acls);
|
||||||
|
StartContainersResponse startResponse = startContainer(
|
||||||
|
context, cm, cid, clc, null);
|
||||||
|
assertTrue(startResponse.getFailedRequests().isEmpty());
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
// make sure the container reaches RUNNING state
|
||||||
|
waitForNMContainerState(cm, cid,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager
|
||||||
|
.containermanager.container.ContainerState.RUNNING);
|
||||||
|
}
|
||||||
|
|
||||||
private ContainerManagerImpl createContainerManager(Context context,
|
private ContainerManagerImpl createContainerManager(Context context,
|
||||||
DeletionService delSrvc) {
|
DeletionService delSrvc) {
|
||||||
return new ContainerManagerImpl(context, exec, delSrvc,
|
return new ContainerManagerImpl(context, exec, delSrvc,
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDelet
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
@ -124,6 +126,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
|
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
|
||||||
rcsCopy.setWorkDir(rcs.getWorkDir());
|
rcsCopy.setWorkDir(rcs.getWorkDir());
|
||||||
rcsCopy.setLogDir(rcs.getLogDir());
|
rcsCopy.setLogDir(rcs.getLogDir());
|
||||||
|
rcsCopy.setResourceMappings(rcs.getResourceMappings());
|
||||||
result.add(rcsCopy);
|
result.add(rcsCopy);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
@ -511,6 +514,17 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
amrmProxyState.getAppContexts().remove(attempt);
|
amrmProxyState.getAppContexts().remove(attempt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeAssignedResources(ContainerId containerId,
|
||||||
|
String resourceType, List<Serializable> assignedResources)
|
||||||
|
throws IOException {
|
||||||
|
ResourceMappings.AssignedResources ar =
|
||||||
|
new ResourceMappings.AssignedResources();
|
||||||
|
ar.updateAssignedResources(assignedResources);
|
||||||
|
containerStates.get(containerId).getResourceMappings()
|
||||||
|
.addAssignedResources(resourceType, ar);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TrackerState {
|
private static class TrackerState {
|
||||||
Map<Path, LocalResourceProto> inProgressMap =
|
Map<Path, LocalResourceProto> inProgressMap =
|
||||||
new HashMap<Path, LocalResourceProto>();
|
new HashMap<Path, LocalResourceProto>();
|
||||||
|
|
|
@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -1003,46 +1004,12 @@ public class TestNMLeveldbStateStoreService {
|
||||||
.loadContainersState();
|
.loadContainersState();
|
||||||
assertTrue(recoveredContainers.isEmpty());
|
assertTrue(recoveredContainers.isEmpty());
|
||||||
|
|
||||||
// create a container request
|
|
||||||
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
||||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||||
4);
|
4);
|
||||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
||||||
LocalResource lrsrc = LocalResource.newInstance(
|
StartContainerRequest startContainerRequest = storeMockContainer(
|
||||||
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
containerId);
|
||||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
|
||||||
1234567890L);
|
|
||||||
Map<String, LocalResource> localResources =
|
|
||||||
new HashMap<String, LocalResource>();
|
|
||||||
localResources.put("rsrc", lrsrc);
|
|
||||||
Map<String, String> env = new HashMap<String, String>();
|
|
||||||
env.put("somevar", "someval");
|
|
||||||
List<String> containerCmds = new ArrayList<String>();
|
|
||||||
containerCmds.add("somecmd");
|
|
||||||
containerCmds.add("somearg");
|
|
||||||
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
||||||
serviceData.put("someservice",
|
|
||||||
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
|
|
||||||
ByteBuffer containerTokens = ByteBuffer
|
|
||||||
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
|
|
||||||
Map<ApplicationAccessType, String> acls =
|
|
||||||
new HashMap<ApplicationAccessType, String>();
|
|
||||||
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
|
|
||||||
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
|
|
||||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
|
||||||
localResources, env, containerCmds,
|
|
||||||
serviceData, containerTokens, acls);
|
|
||||||
Resource containerRsrc = Resource.newInstance(1357, 3);
|
|
||||||
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
|
|
||||||
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
|
|
||||||
Priority.newInstance(7), 13579);
|
|
||||||
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
|
|
||||||
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
|
|
||||||
"tokenservice");
|
|
||||||
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
|
|
||||||
containerToken);
|
|
||||||
|
|
||||||
stateStore.storeContainer(containerId, 0, 0, containerReq);
|
|
||||||
|
|
||||||
// add a invalid key
|
// add a invalid key
|
||||||
byte[] invalidKey = ("ContainerManager/containers/"
|
byte[] invalidKey = ("ContainerManager/containers/"
|
||||||
|
@ -1055,7 +1022,7 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
|
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
|
||||||
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
||||||
assertEquals(false, rcs.getKilled());
|
assertEquals(false, rcs.getKilled());
|
||||||
assertEquals(containerReq, rcs.getStartRequest());
|
assertEquals(startContainerRequest, rcs.getStartRequest());
|
||||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||||
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
|
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
|
||||||
// assert unknown keys are cleaned up finally
|
// assert unknown keys are cleaned up finally
|
||||||
|
@ -1163,6 +1130,87 @@ public class TestNMLeveldbStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateStoreForResourceMapping() throws IOException {
|
||||||
|
// test empty when no state
|
||||||
|
List<RecoveredContainerState> recoveredContainers = stateStore
|
||||||
|
.loadContainersState();
|
||||||
|
assertTrue(recoveredContainers.isEmpty());
|
||||||
|
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||||
|
4);
|
||||||
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
||||||
|
storeMockContainer(containerId);
|
||||||
|
|
||||||
|
// Store ResourceMapping
|
||||||
|
stateStore.storeAssignedResources(containerId, "gpu",
|
||||||
|
Arrays.<Serializable>asList("1", "2", "3"));
|
||||||
|
// This will overwrite above
|
||||||
|
List<Serializable> gpuRes1 = Arrays.<Serializable>asList("1", "2", "4");
|
||||||
|
stateStore.storeAssignedResources(containerId, "gpu", gpuRes1);
|
||||||
|
List<Serializable> fpgaRes =
|
||||||
|
Arrays.<Serializable>asList("3", "4", "5", "6");
|
||||||
|
stateStore.storeAssignedResources(containerId, "fpga", fpgaRes);
|
||||||
|
List<Serializable> numaRes = Arrays.<Serializable>asList("numa1");
|
||||||
|
stateStore.storeAssignedResources(containerId, "numa", numaRes);
|
||||||
|
|
||||||
|
// add a invalid key
|
||||||
|
restartStateStore();
|
||||||
|
recoveredContainers = stateStore.loadContainersState();
|
||||||
|
assertEquals(1, recoveredContainers.size());
|
||||||
|
RecoveredContainerState rcs = recoveredContainers.get(0);
|
||||||
|
List<Serializable> res = rcs.getResourceMappings()
|
||||||
|
.getAssignedResources("gpu");
|
||||||
|
Assert.assertTrue(res.equals(gpuRes1));
|
||||||
|
|
||||||
|
res = rcs.getResourceMappings().getAssignedResources("fpga");
|
||||||
|
Assert.assertTrue(res.equals(fpgaRes));
|
||||||
|
|
||||||
|
res = rcs.getResourceMappings().getAssignedResources("numa");
|
||||||
|
Assert.assertTrue(res.equals(numaRes));
|
||||||
|
}
|
||||||
|
|
||||||
|
private StartContainerRequest storeMockContainer(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
// create a container request
|
||||||
|
LocalResource lrsrc = LocalResource.newInstance(
|
||||||
|
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
||||||
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
||||||
|
1234567890L);
|
||||||
|
Map<String, LocalResource> localResources =
|
||||||
|
new HashMap<String, LocalResource>();
|
||||||
|
localResources.put("rsrc", lrsrc);
|
||||||
|
Map<String, String> env = new HashMap<String, String>();
|
||||||
|
env.put("somevar", "someval");
|
||||||
|
List<String> containerCmds = new ArrayList<String>();
|
||||||
|
containerCmds.add("somecmd");
|
||||||
|
containerCmds.add("somearg");
|
||||||
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
||||||
|
serviceData.put("someservice",
|
||||||
|
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
|
||||||
|
ByteBuffer containerTokens = ByteBuffer
|
||||||
|
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
|
||||||
|
Map<ApplicationAccessType, String> acls =
|
||||||
|
new HashMap<ApplicationAccessType, String>();
|
||||||
|
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
|
||||||
|
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
|
||||||
|
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, env, containerCmds,
|
||||||
|
serviceData, containerTokens, acls);
|
||||||
|
Resource containerRsrc = Resource.newInstance(1357, 3);
|
||||||
|
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
|
||||||
|
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
|
||||||
|
Priority.newInstance(7), 13579);
|
||||||
|
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
|
||||||
|
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
|
||||||
|
"tokenservice");
|
||||||
|
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
|
||||||
|
containerToken);
|
||||||
|
stateStore.storeContainer(containerId, 0, 0, containerReq);
|
||||||
|
return containerReq;
|
||||||
|
}
|
||||||
|
|
||||||
private static class NMTokenSecretManagerForTest extends
|
private static class NMTokenSecretManagerForTest extends
|
||||||
BaseNMTokenSecretManager {
|
BaseNMTokenSecretManager {
|
||||||
public MasterKey generateKey() {
|
public MasterKey generateKey() {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
|
@ -242,4 +243,9 @@ public class MockContainer implements Container {
|
||||||
public long getContainerStartTime() {
|
public long getContainerStartTime() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceMappings getResourceMappings() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue