From d70f5231a794804d9baf0bdb7e6833533fe60db5 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 27 Aug 2019 08:38:12 +0530 Subject: [PATCH] YARN-9640. Slow event processing could cause too many attempt unregister events. Contributed by Bibin A Chundatt. --- .../ApplicationMasterService.java | 14 +++-- .../ApplicationMasterServiceTestBase.java | 53 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index a99da2cc407..31456d625e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -94,6 +94,8 @@ public class ApplicationMasterService extends AbstractService implements RecordFactoryProvider.getRecordFactory(null); private final ConcurrentMap responseMap = new ConcurrentHashMap(); + private final ConcurrentHashMap + finishedAttemptCache = new ConcurrentHashMap<>(); protected final RMContext rmContext; private final AMSProcessingChain amsProcessingChain; private boolean timelineServiceV2Enabled; @@ -339,11 +341,14 @@ public FinishApplicationMasterResponse finishApplicationMaster( throw new ApplicationMasterNotRegisteredException(message); } - this.amLivelinessMonitor.receivedPing(applicationAttemptId); FinishApplicationMasterResponse response = FinishApplicationMasterResponse.newInstance(false); - this.amsProcessingChain.finishApplicationMaster( - applicationAttemptId, request, response); + if (finishedAttemptCache.putIfAbsent(applicationAttemptId, true) + == null) { + this.amsProcessingChain + .finishApplicationMaster(applicationAttemptId, request, response); + } + this.amLivelinessMonitor.receivedPing(applicationAttemptId); return response; } } @@ -492,6 +497,7 @@ protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId, public void unregisterAttempt(ApplicationAttemptId attemptId) { LOG.info("Unregistering app attempt : " + attemptId); responseMap.remove(attemptId); + finishedAttemptCache.remove(attemptId); rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId); } @@ -506,6 +512,8 @@ protected void serviceStop() throws Exception { if (this.server != null) { this.server.stop(); } + responseMap.clear(); + finishedAttemptCache.clear(); super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java index 868b4e05735..0b713e77391 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java @@ -17,6 +17,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -354,6 +358,55 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { } } + @Test(timeout = 1200000) + public void testRepeatedFinishApplicationMaster() throws Exception { + + CountingDispatcher dispatcher = new CountingDispatcher(); + MockRM rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + try { + rm.start(); + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + // Submit an application + RMApp app1 = rm.submitApp(2048); + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + FinishApplicationMasterRequest req = FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.FAILED, "", ""); + for (int i = 0; i < 10; i++) { + am1.unregisterAppAttempt(req, false); + } + Assert.assertEquals("Expecting only one event", 1, + dispatcher.getEventCount()); + } finally { + rm.stop(); + } + } + + static class CountingDispatcher extends DrainDispatcher { + private int eventreceived = 0; + + @SuppressWarnings("rawtypes") + @Override + protected void dispatch(Event event) { + if (event.getType() == RMAppAttemptEventType.UNREGISTERED) { + eventreceived++; + } else { + super.dispatch(event); + } + } + + public int getEventCount() { + return eventreceived; + } + } + @Test(timeout = 3000000) public void testResourceTypes() throws Exception { HashMap