YARN-6683. Invalid event: COLLECTOR_UPDATE at KILLED. Contributed by Rohith Sharma K S
This commit is contained in:
parent
ce634881ce
commit
2ad147ef29
@ -69,7 +69,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
@ -651,10 +650,7 @@ private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
|
|||||||
String previousCollectorAddr = rmApp.getCollectorAddr();
|
String previousCollectorAddr = rmApp.getCollectorAddr();
|
||||||
if (previousCollectorAddr == null
|
if (previousCollectorAddr == null
|
||||||
|| !previousCollectorAddr.equals(collectorAddr)) {
|
|| !previousCollectorAddr.equals(collectorAddr)) {
|
||||||
// sending collector update event.
|
rmApp.setCollectorAddr(collectorAddr);
|
||||||
RMAppCollectorUpdateEvent event =
|
|
||||||
new RMAppCollectorUpdateEvent(appId, collectorAddr);
|
|
||||||
rmContext.getDispatcher().getEventHandler().handle(event);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,40 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Event used for updating collector address in RMApp on node heartbeat.
|
|
||||||
*/
|
|
||||||
public class RMAppCollectorUpdateEvent extends RMAppEvent {
|
|
||||||
|
|
||||||
private final String appCollectorAddr;
|
|
||||||
|
|
||||||
public RMAppCollectorUpdateEvent(ApplicationId appId,
|
|
||||||
String appCollectorAddr) {
|
|
||||||
super(appId, RMAppEventType.COLLECTOR_UPDATE);
|
|
||||||
this.appCollectorAddr = appCollectorAddr;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getAppCollectorAddr(){
|
|
||||||
return this.appCollectorAddr;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -165,7 +165,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
private long storedFinishTime = 0;
|
private long storedFinishTime = 0;
|
||||||
private int firstAttemptIdInStateStore = 1;
|
private int firstAttemptIdInStateStore = 1;
|
||||||
private int nextAttemptId = 1;
|
private int nextAttemptId = 1;
|
||||||
private String collectorAddr;
|
private volatile String collectorAddr;
|
||||||
// This field isn't protected by readlock now.
|
// This field isn't protected by readlock now.
|
||||||
private volatile RMAppAttempt currentAttempt;
|
private volatile RMAppAttempt currentAttempt;
|
||||||
private String queue;
|
private String queue;
|
||||||
@ -217,8 +217,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
// Transitions from NEW state
|
// Transitions from NEW state
|
||||||
.addTransition(RMAppState.NEW, RMAppState.NEW,
|
.addTransition(RMAppState.NEW, RMAppState.NEW,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
.addTransition(RMAppState.NEW, RMAppState.NEW,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
|
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
|
||||||
RMAppEventType.START, new RMAppNewlySavingTransition())
|
RMAppEventType.START, new RMAppNewlySavingTransition())
|
||||||
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
|
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
|
||||||
@ -235,8 +233,6 @@ RMAppEventType.RECOVER, new RMAppRecoveredTransition())
|
|||||||
// Transitions from NEW_SAVING state
|
// Transitions from NEW_SAVING state
|
||||||
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
|
||||||
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
|
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
|
||||||
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
|
||||||
@ -251,8 +247,6 @@ RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
|
|||||||
// Transitions from SUBMITTED state
|
// Transitions from SUBMITTED state
|
||||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.APP_REJECTED,
|
RMAppEventType.APP_REJECTED,
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
@ -267,8 +261,6 @@ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
|
|||||||
// Transitions from ACCEPTED state
|
// Transitions from ACCEPTED state
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
||||||
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
|
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
|
||||||
YarnApplicationState.RUNNING))
|
YarnApplicationState.RUNNING))
|
||||||
@ -294,8 +286,6 @@ RMAppEventType.KILL, new KillAttemptTransition())
|
|||||||
// Transitions from RUNNING state
|
// Transitions from RUNNING state
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.ATTEMPT_UNREGISTERED,
|
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
@ -325,8 +315,6 @@ RMAppEventType.KILL, new KillAttemptTransition())
|
|||||||
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.APP_RUNNING_ON_NODE,
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
new AppRunningOnNodeTransition())
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
||||||
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
|
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
|
||||||
@ -338,8 +326,6 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||||
RMAppEventType.APP_RUNNING_ON_NODE,
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
new AppRunningOnNodeTransition())
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||||
EnumSet.of(RMAppEventType.NODE_UPDATE,
|
EnumSet.of(RMAppEventType.NODE_UPDATE,
|
||||||
@ -351,8 +337,6 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|||||||
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
||||||
RMAppEventType.APP_RUNNING_ON_NODE,
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
new AppRunningOnNodeTransition())
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
|
||||||
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
||||||
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.ATTEMPT_KILLED,
|
RMAppEventType.ATTEMPT_KILLED,
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
@ -1020,24 +1004,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class RMAppCollectorUpdateTransition
|
|
||||||
extends RMAppTransition {
|
|
||||||
|
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
|
|
||||||
LOG.info("Updating collector info for app: " + app.getApplicationId());
|
|
||||||
|
|
||||||
RMAppCollectorUpdateEvent appCollectorUpdateEvent =
|
|
||||||
(RMAppCollectorUpdateEvent) event;
|
|
||||||
// Update collector address
|
|
||||||
app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
|
|
||||||
|
|
||||||
// TODO persistent to RMStateStore for recover
|
|
||||||
// Save to RMStateStore
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
|
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
|
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user