YARN-6966. NodeManager metrics may return wrong negative values when NM restart. (Szilard Nemeth via Haibo Chen)
(cherry picked from commit 9d3c39e9dd
)
This commit is contained in:
parent
4488fd8295
commit
7e7792dd7b
|
@ -496,7 +496,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||||
launchContext, credentials, metrics, token, context, rcs);
|
launchContext, credentials, metrics, token, context, rcs);
|
||||||
context.getContainers().put(token.getContainerID(), container);
|
context.getContainers().put(token.getContainerID(), container);
|
||||||
containerScheduler.recoverActiveContainer(container, rcs.getStatus());
|
containerScheduler.recoverActiveContainer(container, rcs);
|
||||||
app.handle(new ApplicationContainerInitEvent(container));
|
app.handle(new ApplicationContainerInitEvent(container));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
|
||||||
|
.RecoveredContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -229,11 +232,11 @@ public class ContainerScheduler extends AbstractService implements
|
||||||
* @param rcs Recovered Container status
|
* @param rcs Recovered Container status
|
||||||
*/
|
*/
|
||||||
public void recoverActiveContainer(Container container,
|
public void recoverActiveContainer(Container container,
|
||||||
RecoveredContainerStatus rcs) {
|
RecoveredContainerState rcs) {
|
||||||
ExecutionType execType =
|
ExecutionType execType =
|
||||||
container.getContainerTokenIdentifier().getExecutionType();
|
container.getContainerTokenIdentifier().getExecutionType();
|
||||||
if (rcs == RecoveredContainerStatus.QUEUED
|
if (rcs.getStatus() == RecoveredContainerStatus.QUEUED
|
||||||
|| rcs == RecoveredContainerStatus.PAUSED) {
|
|| rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
|
||||||
if (execType == ExecutionType.GUARANTEED) {
|
if (execType == ExecutionType.GUARANTEED) {
|
||||||
queuedGuaranteedContainers.put(container.getContainerId(), container);
|
queuedGuaranteedContainers.put(container.getContainerId(), container);
|
||||||
} else if (execType == ExecutionType.OPPORTUNISTIC) {
|
} else if (execType == ExecutionType.OPPORTUNISTIC) {
|
||||||
|
@ -244,10 +247,15 @@ public class ContainerScheduler extends AbstractService implements
|
||||||
"UnKnown execution type received " + container.getContainerId()
|
"UnKnown execution type received " + container.getContainerId()
|
||||||
+ ", execType " + execType);
|
+ ", execType " + execType);
|
||||||
}
|
}
|
||||||
} else if (rcs == RecoveredContainerStatus.LAUNCHED) {
|
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
|
||||||
runningContainers.put(container.getContainerId(), container);
|
runningContainers.put(container.getContainerId(), container);
|
||||||
utilizationTracker.addContainerResources(container);
|
utilizationTracker.addContainerResources(container);
|
||||||
}
|
}
|
||||||
|
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
|
||||||
|
&& rcs.getCapability() != null) {
|
||||||
|
metrics.launchedContainer();
|
||||||
|
metrics.allocateContainer(rcs.getCapability());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.fusesource.leveldbjni.JniDBFactory;
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
|
@ -299,6 +300,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
|
if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
|
||||||
rcs.startRequest = new StartContainerRequestPBImpl(
|
rcs.startRequest = new StartContainerRequestPBImpl(
|
||||||
StartContainerRequestProto.parseFrom(entry.getValue()));
|
StartContainerRequestProto.parseFrom(entry.getValue()));
|
||||||
|
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||||
|
.newContainerTokenIdentifier(rcs.startRequest.getContainerToken());
|
||||||
|
rcs.capability = new ResourcePBImpl(
|
||||||
|
containerTokenIdentifier.getProto().getResource());
|
||||||
} else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
|
} else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
|
||||||
rcs.version = Integer.parseInt(asString(entry.getValue()));
|
rcs.version = Integer.parseInt(asString(entry.getValue()));
|
||||||
} else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
|
} else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
|
||||||
|
@ -382,24 +387,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
LOG.debug("storeContainer: containerId= " + idStr
|
LOG.debug("storeContainer: containerId= " + idStr
|
||||||
+ ", startRequest= " + startRequest);
|
+ ", startRequest= " + startRequest);
|
||||||
}
|
}
|
||||||
String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
|
final String keyVersion = getContainerVersionKey(idStr);
|
||||||
String keyVersion = getContainerVersionKey(idStr);
|
final String keyRequest =
|
||||||
String keyStartTime =
|
getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
|
||||||
|
final StartContainerRequestProto startContainerRequest =
|
||||||
|
((StartContainerRequestPBImpl) startRequest).getProto();
|
||||||
|
|
||||||
|
final String keyStartTime =
|
||||||
getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
|
getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
|
||||||
|
final String startTimeValue = Long.toString(startTime);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
WriteBatch batch = db.createWriteBatch();
|
try (WriteBatch batch = db.createWriteBatch()) {
|
||||||
try {
|
batch.put(bytes(keyRequest), startContainerRequest.toByteArray());
|
||||||
batch.put(bytes(keyRequest),
|
batch.put(bytes(keyStartTime), bytes(startTimeValue));
|
||||||
((StartContainerRequestPBImpl) startRequest).getProto().
|
|
||||||
toByteArray());
|
|
||||||
batch.put(bytes(keyStartTime), bytes(Long.toString(startTime)));
|
|
||||||
if (containerVersion != 0) {
|
if (containerVersion != 0) {
|
||||||
batch.put(bytes(keyVersion),
|
batch.put(bytes(keyVersion),
|
||||||
bytes(Integer.toString(containerVersion)));
|
bytes(Integer.toString(containerVersion)));
|
||||||
}
|
}
|
||||||
db.write(batch);
|
db.write(batch);
|
||||||
} finally {
|
|
||||||
batch.close();
|
|
||||||
}
|
}
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
markStoreUnHealthy(e);
|
markStoreUnHealthy(e);
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeContainer(ContainerId containerId, int version,
|
public void storeContainer(ContainerId containerId, int version,
|
||||||
long startTime, StartContainerRequest startRequest) throws IOException {
|
long startTime, StartContainerRequest startRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -416,7 +416,8 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public abstract void storeContainer(ContainerId containerId,
|
public abstract void storeContainer(ContainerId containerId,
|
||||||
int containerVersion, long startTime, StartContainerRequest startRequest)
|
int containerVersion, long startTime,
|
||||||
|
StartContainerRequest startRequest)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -107,7 +107,7 @@ public abstract class BaseContainerManagerTest {
|
||||||
protected static File remoteLogDir;
|
protected static File remoteLogDir;
|
||||||
protected static File tmpDir;
|
protected static File tmpDir;
|
||||||
|
|
||||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
protected NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
|
|
||||||
public BaseContainerManagerTest() throws UnsupportedFileSystemException {
|
public BaseContainerManagerTest() throws UnsupportedFileSystemException {
|
||||||
localFS = FileContext.getLocalFSFileContext();
|
localFS = FileContext.getLocalFSFileContext();
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.ServerSocketUtil;
|
import org.apache.hadoop.net.ServerSocketUtil;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -106,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
@ -400,6 +402,61 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
cm.stop();
|
cm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeManagerMetricsRecovery() throws Exception {
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
|
||||||
|
|
||||||
|
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||||
|
stateStore.init(conf);
|
||||||
|
stateStore.start();
|
||||||
|
Context context = createContext(conf, stateStore);
|
||||||
|
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
metrics.addResource(Resource.newInstance(10240, 8));
|
||||||
|
|
||||||
|
// add an application by starting a container
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
||||||
|
Map<String, String> containerEnv = Collections.emptyMap();
|
||||||
|
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
|
||||||
|
Map<String, LocalResource> localResources = Collections.emptyMap();
|
||||||
|
List<String> commands = Arrays.asList("sleep 60s".split(" "));
|
||||||
|
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, containerEnv, commands, serviceData,
|
||||||
|
null, null);
|
||||||
|
StartContainersResponse startResponse = startContainer(context, cm, cid,
|
||||||
|
clc, null, ContainerType.TASK);
|
||||||
|
assertTrue(startResponse.getFailedRequests().isEmpty());
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
Application app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
|
||||||
|
// make sure the container reaches RUNNING state
|
||||||
|
waitForNMContainerState(cm, cid,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager
|
||||||
|
.containermanager.container.ContainerState.RUNNING);
|
||||||
|
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
|
||||||
|
|
||||||
|
// restart and verify metrics could be recovered
|
||||||
|
cm.stop();
|
||||||
|
DefaultMetricsSystem.shutdown();
|
||||||
|
metrics = NodeManagerMetrics.create();
|
||||||
|
metrics.addResource(Resource.newInstance(10240, 8));
|
||||||
|
TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8);
|
||||||
|
context = createContext(conf, stateStore);
|
||||||
|
cm = createContainerManager(context, delSrvc);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
|
||||||
|
cm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerResizeRecovery() throws Exception {
|
public void testContainerResizeRecovery() throws Exception {
|
||||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
@ -31,6 +32,8 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
|
||||||
|
.RecoveredContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -71,6 +74,13 @@ public class TestContainerSchedulerRecovery {
|
||||||
|
|
||||||
private ContainerScheduler spy;
|
private ContainerScheduler spy;
|
||||||
|
|
||||||
|
private RecoveredContainerState createRecoveredContainerState(
|
||||||
|
RecoveredContainerStatus status) {
|
||||||
|
RecoveredContainerState mockState = mock(RecoveredContainerState.class);
|
||||||
|
when(mockState.getStatus()).thenReturn(status);
|
||||||
|
return mockState;
|
||||||
|
}
|
||||||
|
|
||||||
@Before public void setUp() throws Exception {
|
@Before public void setUp() throws Exception {
|
||||||
MockitoAnnotations.initMocks(this);
|
MockitoAnnotations.initMocks(this);
|
||||||
spy = spy(tempContainerScheduler);
|
spy = spy(tempContainerScheduler);
|
||||||
|
@ -94,7 +104,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -113,7 +124,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -132,7 +144,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -151,7 +164,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -170,7 +184,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -189,7 +204,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -208,7 +224,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.REQUESTED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -227,7 +244,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.REQUESTED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -246,7 +264,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.COMPLETED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -265,7 +284,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.COMPLETED);
|
||||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
|
@ -284,7 +304,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
|
@ -302,7 +323,8 @@ public class TestContainerSchedulerRecovery {
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||||
assertEquals(0, spy.getNumRunningContainers());
|
assertEquals(0, spy.getNumRunningContainers());
|
||||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
|
RecoveredContainerState rcs =
|
||||||
|
createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
|
||||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||||
spy.recoverActiveContainer(container, rcs);
|
spy.recoverActiveContainer(container, rcs);
|
||||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||||
|
|
|
@ -113,8 +113,8 @@ public class TestNodeManagerMetrics {
|
||||||
assertGauge("AvailableVCores", 19, rb);
|
assertGauge("AvailableVCores", 19, rb);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkMetrics(int launched, int completed, int failed, int killed,
|
public static void checkMetrics(int launched, int completed, int failed,
|
||||||
int initing, int running, int allocatedGB,
|
int killed, int initing, int running, int allocatedGB,
|
||||||
int allocatedContainers, int availableGB, int allocatedVCores,
|
int allocatedContainers, int availableGB, int allocatedVCores,
|
||||||
int availableVCores) {
|
int availableVCores) {
|
||||||
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
|
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
|
@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
public class NMMemoryStateStoreService extends NMStateStoreService {
|
public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
|
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
|
||||||
private Map<ContainerId, RecoveredContainerState> containerStates;
|
private Map<ContainerId, RecoveredContainerState> containerStates;
|
||||||
|
@ -132,11 +136,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeContainer(ContainerId containerId,
|
public synchronized void storeContainer(ContainerId containerId,
|
||||||
int version, long startTime, StartContainerRequest startRequest)
|
int version, long startTime, StartContainerRequest startRequest) {
|
||||||
throws IOException {
|
|
||||||
RecoveredContainerState rcs = new RecoveredContainerState();
|
RecoveredContainerState rcs = new RecoveredContainerState();
|
||||||
rcs.startRequest = startRequest;
|
rcs.startRequest = startRequest;
|
||||||
rcs.version = version;
|
rcs.version = version;
|
||||||
|
try {
|
||||||
|
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||||
|
.newContainerTokenIdentifier(startRequest.getContainerToken());
|
||||||
|
rcs.capability =
|
||||||
|
new ResourcePBImpl(containerTokenIdentifier.getProto().getResource());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
rcs.setStartTime(startTime);
|
rcs.setStartTime(startTime);
|
||||||
containerStates.put(containerId, rcs);
|
containerStates.put(containerId, rcs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,7 +238,9 @@ public class TestNMLeveldbStateStoreService {
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptId.newInstance(appId, 4);
|
ApplicationAttemptId.newInstance(appId, 4);
|
||||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
||||||
StartContainerRequest containerReq = createContainerRequest(containerId);
|
Resource containerResource = Resource.newInstance(1024, 2);
|
||||||
|
StartContainerRequest containerReq =
|
||||||
|
createContainerRequest(containerId, containerResource);
|
||||||
|
|
||||||
// store a container and verify recovered
|
// store a container and verify recovered
|
||||||
long containerStartTime = System.currentTimeMillis();
|
long containerStartTime = System.currentTimeMillis();
|
||||||
|
@ -260,6 +262,7 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertEquals(false, rcs.getKilled());
|
assertEquals(false, rcs.getKilled());
|
||||||
assertEquals(containerReq, rcs.getStartRequest());
|
assertEquals(containerReq, rcs.getStartRequest());
|
||||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||||
|
assertEquals(containerResource, rcs.getCapability());
|
||||||
|
|
||||||
// store a new container record without StartContainerRequest
|
// store a new container record without StartContainerRequest
|
||||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
|
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
|
||||||
|
@ -279,6 +282,7 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertEquals(false, rcs.getKilled());
|
assertEquals(false, rcs.getKilled());
|
||||||
assertEquals(containerReq, rcs.getStartRequest());
|
assertEquals(containerReq, rcs.getStartRequest());
|
||||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||||
|
assertEquals(containerResource, rcs.getCapability());
|
||||||
|
|
||||||
// launch the container, add some diagnostics, and verify recovered
|
// launch the container, add some diagnostics, and verify recovered
|
||||||
StringBuilder diags = new StringBuilder();
|
StringBuilder diags = new StringBuilder();
|
||||||
|
@ -294,6 +298,7 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertEquals(false, rcs.getKilled());
|
assertEquals(false, rcs.getKilled());
|
||||||
assertEquals(containerReq, rcs.getStartRequest());
|
assertEquals(containerReq, rcs.getStartRequest());
|
||||||
assertEquals(diags.toString(), rcs.getDiagnostics());
|
assertEquals(diags.toString(), rcs.getDiagnostics());
|
||||||
|
assertEquals(containerResource, rcs.getCapability());
|
||||||
|
|
||||||
// pause the container, and verify recovered
|
// pause the container, and verify recovered
|
||||||
stateStore.storeContainerPaused(containerId);
|
stateStore.storeContainerPaused(containerId);
|
||||||
|
@ -394,8 +399,18 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
|
assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StartContainerRequest createContainerRequest(
|
||||||
|
ContainerId containerId, Resource res) {
|
||||||
|
return createContainerRequestInternal(containerId, res);
|
||||||
|
}
|
||||||
|
|
||||||
private StartContainerRequest createContainerRequest(
|
private StartContainerRequest createContainerRequest(
|
||||||
ContainerId containerId) {
|
ContainerId containerId) {
|
||||||
|
return createContainerRequestInternal(containerId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private StartContainerRequest createContainerRequestInternal(ContainerId
|
||||||
|
containerId, Resource res) {
|
||||||
LocalResource lrsrc = LocalResource.newInstance(
|
LocalResource lrsrc = LocalResource.newInstance(
|
||||||
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
||||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
||||||
|
@ -421,6 +436,10 @@ public class TestNMLeveldbStateStoreService {
|
||||||
localResources, env, containerCmds, serviceData, containerTokens,
|
localResources, env, containerCmds, serviceData, containerTokens,
|
||||||
acls);
|
acls);
|
||||||
Resource containerRsrc = Resource.newInstance(1357, 3);
|
Resource containerRsrc = Resource.newInstance(1357, 3);
|
||||||
|
|
||||||
|
if (res != null) {
|
||||||
|
containerRsrc = res;
|
||||||
|
}
|
||||||
ContainerTokenIdentifier containerTokenId =
|
ContainerTokenIdentifier containerTokenId =
|
||||||
new ContainerTokenIdentifier(containerId, "host", "user",
|
new ContainerTokenIdentifier(containerId, "host", "user",
|
||||||
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
|
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
|
||||||
|
|
Loading…
Reference in New Issue