YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg.
This commit is contained in:
parent
80440231d4
commit
7f515f57ed
|
@ -451,6 +451,7 @@ public class NodeManager extends CompositeService
|
|||
// so that we make sure everything is up before registering with RM.
|
||||
addService(nodeStatusUpdater);
|
||||
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
nmStore.setNodeStatusUpdater(nodeStatusUpdater);
|
||||
|
||||
super.serviceInit(conf);
|
||||
// TODO add local dirs to del
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
|
|||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
|
@ -155,6 +156,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
|
||||
private DB db;
|
||||
private boolean isNewlyCreated;
|
||||
private boolean isHealthy;
|
||||
private Timer compactionTimer;
|
||||
|
||||
/**
|
||||
|
@ -169,6 +171,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
|
||||
@Override
|
||||
protected void startStorage() throws IOException {
|
||||
// Assume that we're healthy when we start
|
||||
isHealthy = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -187,6 +191,36 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
return isNewlyCreated;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the state store throws an error after recovery has been performed
|
||||
* then we can not trust it any more to reflect the NM state. We need to
|
||||
* mark the store and node unhealthy.
|
||||
* Errors during the recovery will cause a service failure and thus a NM
|
||||
* start failure. Do not need to mark the store unhealthy for those.
|
||||
* @param dbErr Exception
|
||||
*/
|
||||
private void markStoreUnHealthy(DBException dbErr) {
|
||||
// Always log the error here, we might not see the error in the caller
|
||||
LOG.error("Statestore exception: ", dbErr);
|
||||
// We have already been marked unhealthy so no need to do it again.
|
||||
if (!isHealthy) {
|
||||
return;
|
||||
}
|
||||
// Mark unhealthy, an out of band heartbeat will be sent and the state
|
||||
// will remain unhealthy (not recoverable).
|
||||
// No need to close the store: does not make any difference at this point.
|
||||
isHealthy = false;
|
||||
// We could get here before the nodeStatusUpdater is set
|
||||
NodeStatusUpdater nsu = getNodeStatusUpdater();
|
||||
if (nsu != null) {
|
||||
nsu.reportException(dbErr);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isHealthy() {
|
||||
return isHealthy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RecoveredContainerState> loadContainersState()
|
||||
|
@ -354,6 +388,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -378,6 +413,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), EMPTY_VALUE);
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -393,6 +429,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -408,6 +445,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), EMPTY_VALUE);
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -424,6 +462,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -441,6 +480,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), bytes(diagnostics.toString()));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -459,6 +499,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), EMPTY_VALUE);
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -488,6 +529,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -504,6 +546,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), EMPTY_VALUE);
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -520,6 +563,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), bytes(Integer.toString(exitCode)));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -532,6 +576,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -544,6 +589,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), bytes(workDir));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -556,6 +602,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), bytes(logDir));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -589,6 +636,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -638,6 +686,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), p.toByteArray());
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -659,6 +708,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -815,6 +865,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), proto.toByteArray());
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -838,6 +889,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -861,6 +913,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -926,6 +979,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), taskProto.toByteArray());
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -936,6 +990,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1009,6 +1064,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1023,6 +1079,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(dbKey), pb.getProto().toByteArray());
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1096,6 +1153,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), bytes(expTime.toString()));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1107,6 +1165,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1157,6 +1216,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), proto.toByteArray());
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1167,6 +1227,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1198,6 +1259,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
|
@ -1361,6 +1423,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(dbkey));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
return;
|
||||
|
@ -1375,6 +1438,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(fullkey), data);
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1386,6 +1450,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(fullkey));
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1409,6 +1474,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
candidates.add(key);
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
|
@ -1422,6 +1488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
db.delete(bytes(key));
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
@ -1555,6 +1622,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
return db;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setDB(DB testDb) {
|
||||
this.db = testDb;
|
||||
}
|
||||
|
||||
/**
|
||||
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
||||
* 2) Any incompatible change of state-store is a major upgrade, and any
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
|
|||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||
|
||||
|
@ -51,10 +52,20 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Reso
|
|||
@Unstable
|
||||
public abstract class NMStateStoreService extends AbstractService {
|
||||
|
||||
private NodeStatusUpdater nodeStatusUpdater = null;
|
||||
|
||||
public NMStateStoreService(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
protected NodeStatusUpdater getNodeStatusUpdater() {
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
|
||||
public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
|
||||
this.nodeStatusUpdater = nodeStatusUpdater;
|
||||
}
|
||||
|
||||
public static class RecoveredApplicationsState {
|
||||
List<ContainerManagerApplicationProto> applications;
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotEquals;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.isNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
|
@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
|||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.iq80.leveldb.DB;
|
||||
import org.iq80.leveldb.DBException;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestNMLeveldbStateStoreService {
|
||||
private static final File TMP_DIR = new File(
|
||||
|
@ -1165,6 +1168,38 @@ public class TestNMLeveldbStateStoreService {
|
|||
resourceMappings.getAssignedResources("numa").equals(numaRes));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateStoreNodeHealth() throws IOException {
|
||||
// keep the working DB clean, break a temp DB
|
||||
DB keepDB = stateStore.getDB();
|
||||
DB myMocked = mock(DB.class);
|
||||
stateStore.setDB(myMocked);
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(1234, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
DBException toThrow = new DBException();
|
||||
Mockito.doThrow(toThrow).when(myMocked).
|
||||
put(any(byte[].class), any(byte[].class));
|
||||
// write some data
|
||||
try {
|
||||
// chosen a simple method could be any of the "void" methods
|
||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
stateStore.storeContainerKilled(containerId);
|
||||
} catch (IOException ioErr) {
|
||||
// Cause should be wrapped DBException
|
||||
assertTrue(ioErr.getCause() instanceof DBException);
|
||||
// check the store is marked unhealthy
|
||||
assertFalse("Statestore should have been unhealthy",
|
||||
stateStore.isHealthy());
|
||||
return;
|
||||
} finally {
|
||||
// restore the working DB
|
||||
stateStore.setDB(keepDB);
|
||||
}
|
||||
Assert.fail("Expected exception not thrown");
|
||||
}
|
||||
|
||||
private StartContainerRequest storeMockContainer(ContainerId containerId)
|
||||
throws IOException {
|
||||
// create a container request
|
||||
|
|
Loading…
Reference in New Issue