YARN-9248. RMContainerImpl:Invalid event: ACQUIRED at KILLED. Contributed by lujie.

(cherry picked from commit 8c30114b00)
This commit is contained in:
Weiwei Yang 2019-02-27 17:29:02 +08:00
parent 51b010b19f
commit 10d4a9a7fb
2 changed files with 79 additions and 3 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -154,7 +155,8 @@ public class RMContainerImpl implements RMContainer {
// Transitions from KILLED state
.addTransition(RMContainerState.KILLED, RMContainerState.KILLED,
EnumSet.of(RMContainerEventType.EXPIRE, RMContainerEventType.RELEASED,
RMContainerEventType.KILL, RMContainerEventType.FINISHED))
RMContainerEventType.KILL, RMContainerEventType.ACQUIRED,
RMContainerEventType.FINISHED))
// create the topology tables
.installTopology();
@ -473,8 +475,7 @@ public class RMContainerImpl implements RMContainer {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Invalid event " + event.getType() +
" on container " + this.getContainerId());
onInvalidStateTransition(event.getType(), oldState);
}
if (oldState != getState()) {
LOG.info(event.getContainerId() + " Container Transitioned from "
@ -899,4 +900,17 @@ public class RMContainerImpl implements RMContainer {
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
}
}
/**
* catch the InvalidStateTransition.
* @param state
* @param rmContainerEventType
*/
@VisibleForTesting
protected void onInvalidStateTransition(
RMContainerEventType rmContainerEventType,
RMContainerState state){
LOG.error("Invalid event " + rmContainerEventType +
" on container " + this.getContainerId());
}
}

View File

@ -560,4 +560,66 @@ public class TestRMContainerImpl {
AllocationTags.createSingleAppAllocationTags(appId, null),
Long::max));
}
@Test(timeout = 30000)
public void testContainerAcquiredAtKilled() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
appAttemptEventHandler);
drainDispatcher.register(RMNodeEventType.class, generic);
drainDispatcher.init(new YarnConfiguration());
drainDispatcher.start();
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
Resource resource = BuilderUtils.newResource(512, 1);
Priority priority = BuilderUtils.newPriority(5);
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
ConcurrentMap<ApplicationId, RMApp> appMap = new ConcurrentHashMap<>();
RMApp rmApp = mock(RMApp.class);
appMap.putIfAbsent(appId, rmApp);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
when(rmContext.getRMApps()).thenReturn(appMap);
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext) {
@Override
protected void onInvalidStateTransition(
RMContainerEventType rmContainerEventType, RMContainerState state) {
Assert.fail("RMContainerImpl: can't handle " + rmContainerEventType
+ " at state " + state);
}
};
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.KILL));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED));
}
}