YARN-9640. Slow event processing could cause too many attempt unregister events. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
d562050cec
commit
81c0809463
|
@ -93,6 +93,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
|
||||
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
||||
private final ConcurrentHashMap<ApplicationAttemptId, Boolean>
|
||||
finishedAttemptCache = new ConcurrentHashMap<>();
|
||||
protected final RMContext rmContext;
|
||||
private final AMSProcessingChain amsProcessingChain;
|
||||
private boolean timelineServiceV2Enabled;
|
||||
|
@ -337,11 +339,14 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
throw new ApplicationMasterNotRegisteredException(message);
|
||||
}
|
||||
|
||||
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
||||
FinishApplicationMasterResponse response =
|
||||
FinishApplicationMasterResponse.newInstance(false);
|
||||
this.amsProcessingChain.finishApplicationMaster(
|
||||
applicationAttemptId, request, response);
|
||||
if (finishedAttemptCache.putIfAbsent(applicationAttemptId, true)
|
||||
== null) {
|
||||
this.amsProcessingChain
|
||||
.finishApplicationMaster(applicationAttemptId, request, response);
|
||||
}
|
||||
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
@ -490,6 +495,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
public void unregisterAttempt(ApplicationAttemptId attemptId) {
|
||||
LOG.info("Unregistering app attempt : " + attemptId);
|
||||
responseMap.remove(attemptId);
|
||||
finishedAttemptCache.remove(attemptId);
|
||||
rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
|
||||
}
|
||||
|
||||
|
@ -504,6 +510,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
responseMap.clear();
|
||||
finishedAttemptCache.clear();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
|
|
@ -60,6 +60,9 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
|
@ -70,6 +73,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
||||
|
@ -990,4 +994,54 @@ public class TestApplicationMasterService {
|
|||
app1.getApplicationId()).getOriginalTrackingUrl());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testRepeatedFinishApplicationMaster() throws Exception {
|
||||
|
||||
CountingDispatcher dispatcher = new CountingDispatcher();
|
||||
MockRM rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
rm.start();
|
||||
// Register node1
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
// Submit an application
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
||||
am1.registerAppAttempt();
|
||||
FinishApplicationMasterRequest req = FinishApplicationMasterRequest
|
||||
.newInstance(FinalApplicationStatus.FAILED, "", "");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
am1.unregisterAppAttempt(req, false);
|
||||
}
|
||||
Assert.assertEquals("Expecting only one event", 1,
|
||||
dispatcher.getEventCount());
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
static class CountingDispatcher extends DrainDispatcher {
|
||||
private int eventreceived = 0;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
protected void dispatch(Event event) {
|
||||
if (event.getType() == RMAppAttemptEventType.UNREGISTERED) {
|
||||
eventreceived++;
|
||||
} else {
|
||||
super.dispatch(event);
|
||||
}
|
||||
}
|
||||
|
||||
public int getEventCount() {
|
||||
return eventreceived;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue