YARN-9640. Slow event processing could cause too many attempt unregister events. Contributed by Bibin A Chundatt.

This commit is contained in:
Rohith Sharma K S 2019-08-27 08:38:12 +05:30
parent 07e3cf952e
commit d70f5231a7
2 changed files with 64 additions and 3 deletions

View File

@ -94,6 +94,8 @@ public class ApplicationMasterService extends AbstractService implements
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap = private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final ConcurrentHashMap<ApplicationAttemptId, Boolean>
finishedAttemptCache = new ConcurrentHashMap<>();
protected final RMContext rmContext; protected final RMContext rmContext;
private final AMSProcessingChain amsProcessingChain; private final AMSProcessingChain amsProcessingChain;
private boolean timelineServiceV2Enabled; private boolean timelineServiceV2Enabled;
@ -339,11 +341,14 @@ public FinishApplicationMasterResponse finishApplicationMaster(
throw new ApplicationMasterNotRegisteredException(message); throw new ApplicationMasterNotRegisteredException(message);
} }
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
FinishApplicationMasterResponse response = FinishApplicationMasterResponse response =
FinishApplicationMasterResponse.newInstance(false); FinishApplicationMasterResponse.newInstance(false);
this.amsProcessingChain.finishApplicationMaster( if (finishedAttemptCache.putIfAbsent(applicationAttemptId, true)
applicationAttemptId, request, response); == null) {
this.amsProcessingChain
.finishApplicationMaster(applicationAttemptId, request, response);
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
return response; return response;
} }
} }
@ -492,6 +497,7 @@ protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId,
public void unregisterAttempt(ApplicationAttemptId attemptId) { public void unregisterAttempt(ApplicationAttemptId attemptId) {
LOG.info("Unregistering app attempt : " + attemptId); LOG.info("Unregistering app attempt : " + attemptId);
responseMap.remove(attemptId); responseMap.remove(attemptId);
finishedAttemptCache.remove(attemptId);
rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId); rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
} }
@ -506,6 +512,8 @@ protected void serviceStop() throws Exception {
if (this.server != null) { if (this.server != null) {
this.server.stop(); this.server.stop();
} }
responseMap.clear();
finishedAttemptCache.clear();
super.serviceStop(); super.serviceStop();
} }

View File

@ -17,6 +17,10 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -354,6 +358,55 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception {
} }
} }
@Test(timeout = 1200000)
public void testRepeatedFinishApplicationMaster() throws Exception {
CountingDispatcher dispatcher = new CountingDispatcher();
MockRM rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
try {
rm.start();
// Register node1
MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(2048);
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt();
FinishApplicationMasterRequest req = FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.FAILED, "", "");
for (int i = 0; i < 10; i++) {
am1.unregisterAppAttempt(req, false);
}
Assert.assertEquals("Expecting only one event", 1,
dispatcher.getEventCount());
} finally {
rm.stop();
}
}
static class CountingDispatcher extends DrainDispatcher {
private int eventreceived = 0;
@SuppressWarnings("rawtypes")
@Override
protected void dispatch(Event event) {
if (event.getType() == RMAppAttemptEventType.UNREGISTERED) {
eventreceived++;
} else {
super.dispatch(event);
}
}
public int getEventCount() {
return eventreceived;
}
}
@Test(timeout = 3000000) @Test(timeout = 3000000)
public void testResourceTypes() throws Exception { public void testResourceTypes() throws Exception {
HashMap<YarnConfiguration, HashMap<YarnConfiguration,