YARN-6966. NodeManager metrics may return wrong negative values when NM restart. (Szilard Nemeth via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-08-02 10:06:16 -07:00
parent e2b82b82e2
commit 1991a1d760
10 changed files with 129 additions and 26 deletions

View File

@ -494,7 +494,7 @@ public class ContainerManagerImpl extends CompositeService implements
Container container = new ContainerImpl(getConfig(), dispatcher, Container container = new ContainerImpl(getConfig(), dispatcher,
launchContext, credentials, metrics, token, context, rcs); launchContext, credentials, metrics, token, context, rcs);
context.getContainers().put(token.getContainerID(), container); context.getContainers().put(token.getContainerID(), container);
containerScheduler.recoverActiveContainer(container, rcs.getStatus()); containerScheduler.recoverActiveContainer(container, rcs);
dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent( dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
container)); container));
} }

View File

@ -41,6 +41,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -228,11 +231,11 @@ public class ContainerScheduler extends AbstractService implements
* @param rcs Recovered Container status * @param rcs Recovered Container status
*/ */
public void recoverActiveContainer(Container container, public void recoverActiveContainer(Container container,
RecoveredContainerStatus rcs) { RecoveredContainerState rcs) {
ExecutionType execType = ExecutionType execType =
container.getContainerTokenIdentifier().getExecutionType(); container.getContainerTokenIdentifier().getExecutionType();
if (rcs == RecoveredContainerStatus.QUEUED if (rcs.getStatus() == RecoveredContainerStatus.QUEUED
|| rcs == RecoveredContainerStatus.PAUSED) { || rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
if (execType == ExecutionType.GUARANTEED) { if (execType == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container); queuedGuaranteedContainers.put(container.getContainerId(), container);
} else if (execType == ExecutionType.OPPORTUNISTIC) { } else if (execType == ExecutionType.OPPORTUNISTIC) {
@ -243,10 +246,15 @@ public class ContainerScheduler extends AbstractService implements
"UnKnown execution type received " + container.getContainerId() "UnKnown execution type received " + container.getContainerId()
+ ", execType " + execType); + ", execType " + execType);
} }
} else if (rcs == RecoveredContainerStatus.LAUNCHED) { } else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
runningContainers.put(container.getContainerId(), container); runningContainers.put(container.getContainerId(), container);
utilizationTracker.addContainerResources(container); utilizationTracker.addContainerResources(container);
} }
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
&& rcs.getCapability() != null) {
metrics.launchedContainer();
metrics.allocateContainer(rcs.getCapability());
}
} }
/** /**

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
@ -259,6 +260,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
rcs.startRequest = new StartContainerRequestPBImpl( rcs.startRequest = new StartContainerRequestPBImpl(
StartContainerRequestProto.parseFrom(entry.getValue())); StartContainerRequestProto.parseFrom(entry.getValue()));
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(rcs.startRequest.getContainerToken());
rcs.capability = new ResourcePBImpl(
containerTokenIdentifier.getProto().getResource());
} else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
rcs.version = Integer.parseInt(asString(entry.getValue())); rcs.version = Integer.parseInt(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
@ -323,24 +328,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
LOG.debug("storeContainer: containerId= " + idStr LOG.debug("storeContainer: containerId= " + idStr
+ ", startRequest= " + startRequest); + ", startRequest= " + startRequest);
} }
String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); final String keyVersion = getContainerVersionKey(idStr);
String keyVersion = getContainerVersionKey(idStr); final String keyRequest =
String keyStartTime = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
final StartContainerRequestProto startContainerRequest =
((StartContainerRequestPBImpl) startRequest).getProto();
final String keyStartTime =
getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX); getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
final String startTimeValue = Long.toString(startTime);
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try { batch.put(bytes(keyRequest), startContainerRequest.toByteArray());
batch.put(bytes(keyRequest), batch.put(bytes(keyStartTime), bytes(startTimeValue));
((StartContainerRequestPBImpl) startRequest).getProto().
toByteArray());
batch.put(bytes(keyStartTime), bytes(Long.toString(startTime)));
if (containerVersion != 0) { if (containerVersion != 0) {
batch.put(bytes(keyVersion), batch.put(bytes(keyVersion),
bytes(Integer.toString(containerVersion))); bytes(Integer.toString(containerVersion)));
} }
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);

View File

@ -71,7 +71,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
@Override @Override
public void storeContainer(ContainerId containerId, int version, public void storeContainer(ContainerId containerId, int version,
long startTime, StartContainerRequest startRequest) throws IOException { long startTime, StartContainerRequest startRequest) {
} }
@Override @Override

View File

@ -382,7 +382,8 @@ public abstract class NMStateStoreService extends AbstractService {
* @throws IOException * @throws IOException
*/ */
public abstract void storeContainer(ContainerId containerId, public abstract void storeContainer(ContainerId containerId,
int containerVersion, long startTime, StartContainerRequest startRequest) int containerVersion, long startTime,
StartContainerRequest startRequest)
throws IOException; throws IOException;
/** /**

View File

@ -98,7 +98,7 @@ public abstract class BaseContainerManagerTest {
protected static File remoteLogDir; protected static File remoteLogDir;
protected static File tmpDir; protected static File tmpDir;
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected NodeManagerMetrics metrics = NodeManagerMetrics.create();
public BaseContainerManagerTest() throws UnsupportedFileSystemException { public BaseContainerManagerTest() throws UnsupportedFileSystemException {
localFS = FileContext.getLocalFSFileContext(); localFS = FileContext.getLocalFSFileContext();

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -100,6 +101,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -393,6 +395,61 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.stop(); cm.stop();
} }
@Test
public void testNodeManagerMetricsRecovery() throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
Context context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
cm.init(conf);
cm.start();
metrics.addResource(Resource.newInstance(10240, 8));
// add an application by starting a container
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, String> containerEnv = Collections.emptyMap();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Map<String, LocalResource> localResources = Collections.emptyMap();
List<String> commands = Arrays.asList("sleep 60s".split(" "));
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, commands, serviceData,
null, null);
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, null);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
assertNotNull(app);
// make sure the container reaches RUNNING state
waitForNMContainerState(cm, cid,
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState.RUNNING);
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
// restart and verify metrics could be recovered
cm.stop();
DefaultMetricsSystem.shutdown();
metrics = NodeManagerMetrics.create();
metrics.addResource(Resource.newInstance(10240, 8));
TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8);
context = createContext(conf, stateStore);
cm = createContainerManager(context, delSrvc);
cm.init(conf);
cm.start();
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
cm.stop();
}
@Test @Test
public void testContainerResizeRecovery() throws Exception { public void testContainerResizeRecovery() throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);

View File

@ -92,8 +92,8 @@ public class TestNodeManagerMetrics {
assertGauge("AvailableVCores", 19, rb); assertGauge("AvailableVCores", 19, rb);
} }
private void checkMetrics(int launched, int completed, int failed, int killed, public static void checkMetrics(int launched, int completed, int failed,
int initing, int running, int allocatedGB, int killed, int initing, int running, int allocatedGB,
int allocatedContainers, int availableGB, int allocatedVCores, int allocatedContainers, int availableGB, int allocatedVCores,
int availableVCores) { int availableVCores) {
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics"); MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
public class NMMemoryStateStoreService extends NMStateStoreService { public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<ApplicationId, ContainerManagerApplicationProto> apps; private Map<ApplicationId, ContainerManagerApplicationProto> apps;
private Map<ContainerId, RecoveredContainerState> containerStates; private Map<ContainerId, RecoveredContainerState> containerStates;
@ -127,11 +131,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override @Override
public synchronized void storeContainer(ContainerId containerId, public synchronized void storeContainer(ContainerId containerId,
int version, long startTime, StartContainerRequest startRequest) int version, long startTime, StartContainerRequest startRequest) {
throws IOException {
RecoveredContainerState rcs = new RecoveredContainerState(); RecoveredContainerState rcs = new RecoveredContainerState();
rcs.startRequest = startRequest; rcs.startRequest = startRequest;
rcs.version = version; rcs.version = version;
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(startRequest.getContainerToken());
rcs.capability =
new ResourcePBImpl(containerTokenIdentifier.getProto().getResource());
} catch (IOException e) {
throw new RuntimeException(e);
}
rcs.setStartTime(startTime); rcs.setStartTime(startTime);
containerStates.put(containerId, rcs); containerStates.put(containerId, rcs);
} }

View File

@ -231,7 +231,9 @@ public class TestNMLeveldbStateStoreService {
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 4); ApplicationAttemptId.newInstance(appId, 4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
StartContainerRequest containerReq = createContainerRequest(containerId); Resource containerResource = Resource.newInstance(1024, 2);
StartContainerRequest containerReq =
createContainerRequest(containerId, containerResource);
// store a container and verify recovered // store a container and verify recovered
long containerStartTime = System.currentTimeMillis(); long containerStartTime = System.currentTimeMillis();
@ -253,6 +255,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(false, rcs.getKilled()); assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest()); assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty()); assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
// store a new container record without StartContainerRequest // store a new container record without StartContainerRequest
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
@ -272,6 +275,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(false, rcs.getKilled()); assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest()); assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty()); assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
// launch the container, add some diagnostics, and verify recovered // launch the container, add some diagnostics, and verify recovered
StringBuilder diags = new StringBuilder(); StringBuilder diags = new StringBuilder();
@ -287,6 +291,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(false, rcs.getKilled()); assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest()); assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics()); assertEquals(diags.toString(), rcs.getDiagnostics());
assertEquals(containerResource, rcs.getCapability());
// pause the container, and verify recovered // pause the container, and verify recovered
stateStore.storeContainerPaused(containerId); stateStore.storeContainerPaused(containerId);
@ -371,8 +376,18 @@ public class TestNMLeveldbStateStoreService {
assertTrue(recoveredContainers.isEmpty()); assertTrue(recoveredContainers.isEmpty());
} }
private StartContainerRequest createContainerRequest(
ContainerId containerId, Resource res) {
return createContainerRequestInternal(containerId, res);
}
private StartContainerRequest createContainerRequest( private StartContainerRequest createContainerRequest(
ContainerId containerId) { ContainerId containerId) {
return createContainerRequestInternal(containerId, null);
}
private StartContainerRequest createContainerRequestInternal(ContainerId
containerId, Resource res) {
LocalResource lrsrc = LocalResource.newInstance( LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
@ -398,6 +413,10 @@ public class TestNMLeveldbStateStoreService {
localResources, env, containerCmds, serviceData, containerTokens, localResources, env, containerCmds, serviceData, containerTokens,
acls); acls);
Resource containerRsrc = Resource.newInstance(1357, 3); Resource containerRsrc = Resource.newInstance(1357, 3);
if (res != null) {
containerRsrc = res;
}
ContainerTokenIdentifier containerTokenId = ContainerTokenIdentifier containerTokenId =
new ContainerTokenIdentifier(containerId, "host", "user", new ContainerTokenIdentifier(containerId, "host", "user",
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7), containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),