From 73b6474bac47dd5b0e325174f12bfde141505ac2 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 11 Oct 2011 14:29:50 +0000 Subject: [PATCH] MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish to all the services. Contributed by Thomas Graves. svn merge -c r1181803 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1181807 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../apache/hadoop/mapred/ShuffleHandler.java | 1 + .../containermanager/AuxServices.java | 24 ++++++++------- .../application/ApplicationImpl.java | 6 ++++ .../containermanager/TestAuxServices.java | 30 +++++++++++++++++-- .../application/TestApplication.java | 4 +++ 6 files changed, 55 insertions(+), 13 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 541f5504082..ff5c9a681bb 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1539,6 +1539,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3020. Fixed TaskAttemptImpl to log the correct node-address for a finished Reduce task. (Chackaravarthy via vinodkv) + MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish + to all the services. (Thomas Graves via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 0ef8d95aa0e..a3bbf2b344f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -223,6 +223,7 @@ public void initApp(String user, ApplicationId appId, ByteBuffer secret) { public void stopApp(ApplicationId appId) { JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); secretManager.removeTokenForJob(jobId.toString()); + userRsrc.remove(jobId.toString()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index ddfc1c58159..4b6e8a62493 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -42,8 +42,8 @@ public class AuxServices extends AbstractService private static final Log LOG = LogFactory.getLog(AuxServices.class); - public final Map serviceMap; - public final Map serviceMeta; + protected final Map serviceMap; + protected final Map serviceMeta; public AuxServices() { super(AuxServices.class.getName()); @@ -157,20 +157,24 @@ public void stateChanged(Service service) { @Override public void handle(AuxServicesEvent event) { - LOG.info("Got event " + event.getType() + " for service " - + event.getServiceID()); - AuxiliaryService service = serviceMap.get(event.getServiceID()); - if (null == service) { - // TODO kill all containers waiting on Application - return; - } + LOG.info("Got event " + event.getType() + " for appId " + + event.getApplicationID()); switch (event.getType()) { case APPLICATION_INIT: + LOG.info("Got APPLICATION_INIT for service " + event.getServiceID()); + AuxiliaryService service = serviceMap.get(event.getServiceID()); + if (null == service) { + LOG.info("service is null"); + // TODO kill all containers waiting on Application + return; + } service.initApp(event.getUser(), event.getApplicationID(), event.getServiceData()); break; case APPLICATION_STOP: - service.stopApp(event.getApplicationID()); + for (AuxiliaryService serv : serviceMap.values()) { + serv.stopApp(event.getApplicationID()); + } break; default: throw new RuntimeException("Unknown type: " + event.getType()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 6ea9c041ad4..ab96388080e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; @@ -247,6 +249,10 @@ void handleAppFinishWithContainersCleanedup() { new ApplicationLocalizationEvent( LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this)); + // tell any auxiliary services that the app is done + this.dispatcher.getEventHandler().handle( + new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId)); + // TODO: Trigger the LogsManager } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index d52647c8cb1..46c9faa24b8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -22,8 +22,12 @@ import static org.junit.Assert.*; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -39,6 +43,7 @@ import static org.apache.hadoop.yarn.service.Service.STATE.*; public class TestAuxServices { + private static final Log LOG = LogFactory.getLog(TestAuxServices.class); static class LightService extends AbstractService implements AuxServices.AuxiliaryService { @@ -47,6 +52,7 @@ static class LightService extends AbstractService private int remaining_init; private int remaining_stop; private ByteBuffer meta = null; + private ArrayList stoppedApps; LightService(String name, char idef, int expected_appId) { this(name, idef, expected_appId, null); @@ -56,7 +62,13 @@ static class LightService extends AbstractService this.idef = idef; this.expected_appId = expected_appId; this.meta = meta; + this.stoppedApps = new ArrayList(); } + + public ArrayList getAppIdsStopped() { + return (ArrayList)this.stoppedApps.clone(); + } + @Override public void init(Configuration conf) { remaining_init = conf.getInt(idef + ".expected.init", 0); @@ -77,7 +89,7 @@ public void initApp(String user, ApplicationId appId, ByteBuffer data) { } @Override public void stopApp(ApplicationId appId) { - assertEquals(expected_appId, appId.getId()); + stoppedApps.add(appId.getId()); } @Override public ByteBuffer getMeta() { @@ -86,11 +98,15 @@ public ByteBuffer getMeta() { } static class ServiceA extends LightService { - public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); } + public ServiceA() { + super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); + } } static class ServiceB extends LightService { - public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); } + public ServiceB() { + super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); + } } @Test @@ -119,6 +135,14 @@ public void testAuxEventDispatch() { appId.setId(66); event = new AuxServicesEvent( AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null); + // verify all services got the stop event + aux.handle(event); + Collection servs = aux.getServices(); + for (AuxServices.AuxiliaryService serv: servs) { + ArrayList appIds = ((LightService)serv).getAppIdsStopped(); + assertEquals("app not properly stopped", 1, appIds.size()); + assertTrue("wrong app stopped", appIds.contains((Integer)66)); + } } @Test diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 33bf85c64cf..084a454fc6e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -166,6 +166,10 @@ public void testAppFinishedOnRunningContainers() { refEq(new ApplicationLocalizationEvent( LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); + verify(wa.auxBus).handle( + refEq(new AuxServicesEvent( + AuxServicesEventType.APPLICATION_STOP, wa.appId))); + wa.appResourcesCleanedup(); assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());