MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish to all the services. Contributed by Thomas Graves.

svn merge -c r1181803 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1181807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-11 14:29:50 +00:00
parent af72e41ee1
commit 73b6474bac
6 changed files with 55 additions and 13 deletions

View File

@ -1539,6 +1539,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3020. Fixed TaskAttemptImpl to log the correct node-address for
a finished Reduce task. (Chackaravarthy via vinodkv)
MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish
to all the services. (Thomas Graves via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -223,6 +223,7 @@ public class ShuffleHandler extends AbstractService
public void stopApp(ApplicationId appId) {
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
secretManager.removeTokenForJob(jobId.toString());
userRsrc.remove(jobId.toString());
}
@Override

View File

@ -42,8 +42,8 @@ public class AuxServices extends AbstractService
private static final Log LOG = LogFactory.getLog(AuxServices.class);
public final Map<String,AuxiliaryService> serviceMap;
public final Map<String,ByteBuffer> serviceMeta;
protected final Map<String,AuxiliaryService> serviceMap;
protected final Map<String,ByteBuffer> serviceMeta;
public AuxServices() {
super(AuxServices.class.getName());
@ -157,20 +157,24 @@ public class AuxServices extends AbstractService
@Override
public void handle(AuxServicesEvent event) {
LOG.info("Got event " + event.getType() + " for service "
+ event.getServiceID());
AuxiliaryService service = serviceMap.get(event.getServiceID());
if (null == service) {
// TODO kill all containers waiting on Application
return;
}
LOG.info("Got event " + event.getType() + " for appId "
+ event.getApplicationID());
switch (event.getType()) {
case APPLICATION_INIT:
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
AuxiliaryService service = serviceMap.get(event.getServiceID());
if (null == service) {
LOG.info("service is null");
// TODO kill all containers waiting on Application
return;
}
service.initApp(event.getUser(), event.getApplicationID(),
event.getServiceData());
break;
case APPLICATION_STOP:
service.stopApp(event.getApplicationID());
for (AuxiliaryService serv : serviceMap.values()) {
serv.stopApp(event.getApplicationID());
}
break;
default:
throw new RuntimeException("Unknown type: " + event.getType());

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
@ -247,6 +249,10 @@ public class ApplicationImpl implements Application {
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
// tell any auxiliary services that the app is done
this.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
// TODO: Trigger the LogsManager
}

View File

@ -22,8 +22,12 @@ import org.junit.Test;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -39,6 +43,7 @@ import org.apache.hadoop.yarn.service.Service;
import static org.apache.hadoop.yarn.service.Service.STATE.*;
public class TestAuxServices {
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
static class LightService extends AbstractService
implements AuxServices.AuxiliaryService {
@ -47,6 +52,7 @@ public class TestAuxServices {
private int remaining_init;
private int remaining_stop;
private ByteBuffer meta = null;
private ArrayList<Integer> stoppedApps;
LightService(String name, char idef, int expected_appId) {
this(name, idef, expected_appId, null);
@ -56,7 +62,13 @@ public class TestAuxServices {
this.idef = idef;
this.expected_appId = expected_appId;
this.meta = meta;
this.stoppedApps = new ArrayList<Integer>();
}
public ArrayList<Integer> getAppIdsStopped() {
return (ArrayList)this.stoppedApps.clone();
}
@Override
public void init(Configuration conf) {
remaining_init = conf.getInt(idef + ".expected.init", 0);
@ -77,7 +89,7 @@ public class TestAuxServices {
}
@Override
public void stopApp(ApplicationId appId) {
assertEquals(expected_appId, appId.getId());
stoppedApps.add(appId.getId());
}
@Override
public ByteBuffer getMeta() {
@ -86,11 +98,15 @@ public class TestAuxServices {
}
static class ServiceA extends LightService {
public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
public ServiceA() {
super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
}
}
static class ServiceB extends LightService {
public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
public ServiceB() {
super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
}
}
@Test
@ -119,6 +135,14 @@ public class TestAuxServices {
appId.setId(66);
event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
// verify all services got the stop event
aux.handle(event);
Collection<AuxServices.AuxiliaryService> servs = aux.getServices();
for (AuxServices.AuxiliaryService serv: servs) {
ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
assertEquals("app not properly stopped", 1, appIds.size());
assertTrue("wrong app stopped", appIds.contains((Integer)66));
}
}
@Test

View File

@ -166,6 +166,10 @@ public class TestApplication {
refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
verify(wa.auxBus).handle(
refEq(new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, wa.appId)));
wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());