YARN-4924. NM recovery race can lead to container not cleaned up. Contributed by sandflee

(cherry picked from commit 9b5c5bd42f)
This commit is contained in:
Jason Lowe 2016-04-14 19:44:31 +00:00
parent b4d24d7b0e
commit a996889313
8 changed files with 57 additions and 73 deletions

View File

@ -127,6 +127,9 @@ Release 2.7.3 - UNRELEASED
YARN-4938. MiniYarnCluster should not request transitionToActive to RM
on non-HA environment. (Eric Badger via aajisaka)
YARN-4924. NM recovery race can lead to container not cleaned up.
(sandflee via jlowe)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES

View File

@ -262,12 +262,6 @@ public class ContainerManagerImpl extends CompositeService implements
for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
recoverContainer(rcs);
}
String diagnostic = "Application marked finished during recovery";
for (ApplicationId appId : appsState.getFinishedApplications()) {
dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appId, diagnostic));
}
}
}
@ -1109,11 +1103,6 @@ public class ContainerManagerImpl extends CompositeService implements
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
try {
this.context.getNMStateStore().storeFinishedApplication(appID);
} catch (IOException e) {
LOG.error("Unable to update application state in store", e);
}
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
diagnostic));

View File

@ -81,6 +81,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String APPLICATIONS_KEY_PREFIX =
"ContainerManager/applications/";
@Deprecated
private static final String FINISHED_APPS_KEY_PREFIX =
"ContainerManager/finishedApps/";
@ -339,20 +340,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
state.applications.add(
ContainerManagerApplicationProto.parseFrom(entry.getValue()));
}
state.finishedApplications = new ArrayList<ApplicationId>();
keyPrefix = FINISHED_APPS_KEY_PREFIX;
iter.seek(bytes(keyPrefix));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
ApplicationId appId =
ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
state.finishedApplications.add(appId);
}
} catch (DBException e) {
throw new IOException(e);
} finally {
@ -361,6 +348,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
cleanupDeprecatedFinishedApps();
return state;
}
@ -375,17 +364,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
@Override
public void storeFinishedApplication(ApplicationId appId)
throws IOException {
String key = FINISHED_APPS_KEY_PREFIX + appId;
try {
db.put(bytes(key), new byte[0]);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeApplication(ApplicationId appId)
throws IOException {
@ -394,8 +372,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
String key = APPLICATIONS_KEY_PREFIX + appId;
batch.delete(bytes(key));
key = FINISHED_APPS_KEY_PREFIX + appId;
batch.delete(bytes(key));
db.write(batch);
} finally {
batch.close();
@ -913,6 +889,52 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
@SuppressWarnings("deprecation")
private void cleanupDeprecatedFinishedApps() {
try {
cleanupKeysWithPrefix(FINISHED_APPS_KEY_PREFIX);
} catch (Exception e) {
LOG.warn("cleanup keys with prefix " + FINISHED_APPS_KEY_PREFIX +
" from leveldb failed", e);
}
}
private void cleanupKeysWithPrefix(String prefix) throws IOException {
WriteBatch batch = null;
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
try {
batch = db.createWriteBatch();
iter.seek(bytes(prefix));
while (iter.hasNext()) {
byte[] key = iter.next().getKey();
String keyStr = asString(key);
if (!keyStr.startsWith(prefix)) {
break;
}
batch.delete(key);
if (LOG.isDebugEnabled()) {
LOG.debug("cleanup " + keyStr + " from leveldb");
}
}
db.write(batch);
} catch (DBException e) {
throw new IOException(e);
} finally {
if (batch != null) {
batch.close();
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
}
private String getLogDeleterKey(ApplicationId appId) {
return LOG_DELETER_KEY_PREFIX + appId;
}

View File

@ -57,10 +57,6 @@ public class NMNullStateStoreService extends NMStateStoreService {
ContainerManagerApplicationProto p) throws IOException {
}
@Override
public void storeFinishedApplication(ApplicationId appId) {
}
@Override
public void removeApplication(ApplicationId appId) throws IOException {
}

View File

@ -51,15 +51,11 @@ public abstract class NMStateStoreService extends AbstractService {
public static class RecoveredApplicationsState {
List<ContainerManagerApplicationProto> applications;
List<ApplicationId> finishedApplications;
public List<ContainerManagerApplicationProto> getApplications() {
return applications;
}
public List<ApplicationId> getFinishedApplications() {
return finishedApplications;
}
}
public enum RecoveredContainerStatus {
@ -241,14 +237,6 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void storeApplication(ApplicationId appId,
ContainerManagerApplicationProto p) throws IOException;
/**
* Record that an application has finished
* @param appId the application ID
* @throws IOException
*/
public abstract void storeFinishedApplication(ApplicationId appId)
throws IOException;
/**
* Remove records corresponding to an application
* @param appId the application ID

View File

@ -210,6 +210,10 @@ public class TestContainerManagerRecovery {
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
// no longer saving FINISH_APP event in NM stateStore,
// simulate by resending FINISH_APP event
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
assertTrue(context.getApplicationACLsManager().checkAccess(
UserGroupInformation.createRemoteUser(modUser),

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
private Set<ApplicationId> finishedApps;
private Map<ContainerId, RecoveredContainerState> containerStates;
private Map<TrackerKey, TrackerState> trackerStates;
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
@ -58,7 +57,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
protected void initStorage(Configuration conf) {
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
finishedApps = new HashSet<ApplicationId>();
containerStates = new HashMap<ContainerId, RecoveredContainerState>();
nmTokenState = new RecoveredNMTokensState();
nmTokenState.applicationMasterKeys =
@ -85,7 +83,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
RecoveredApplicationsState state = new RecoveredApplicationsState();
state.applications = new ArrayList<ContainerManagerApplicationProto>(
apps.values());
state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
return state;
}
@ -97,16 +94,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
apps.put(appId, protoCopy);
}
@Override
public synchronized void storeFinishedApplication(ApplicationId appId) {
finishedApps.add(appId);
}
@Override
public synchronized void removeApplication(ApplicationId appId)
throws IOException {
apps.remove(appId);
finishedApps.remove(appId);
}
@Override
@ -384,7 +375,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
logDeleterState.remove(appId);
}
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();

View File

@ -174,7 +174,6 @@ public class TestNMLeveldbStateStoreService {
// test empty when no state
RecoveredApplicationsState state = stateStore.loadApplicationsState();
assertTrue(state.getApplications().isEmpty());
assertTrue(state.getFinishedApplications().isEmpty());
// store an application and verify recovered
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
@ -188,10 +187,8 @@ public class TestNMLeveldbStateStoreService {
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
assertTrue(state.getFinishedApplications().isEmpty());
// finish an application and add a new one
stateStore.storeFinishedApplication(appId1);
// add a new app
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
builder = ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId2).getProto());
@ -203,18 +200,13 @@ public class TestNMLeveldbStateStoreService {
assertEquals(2, state.getApplications().size());
assertTrue(state.getApplications().contains(appProto1));
assertTrue(state.getApplications().contains(appProto2));
assertEquals(1, state.getFinishedApplications().size());
assertEquals(appId1, state.getFinishedApplications().get(0));
// test removing an application
stateStore.storeFinishedApplication(appId2);
stateStore.removeApplication(appId2);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
assertEquals(1, state.getFinishedApplications().size());
assertEquals(appId1, state.getFinishedApplications().get(0));
}
@Test