YARN-841. Move Auxiliary service to yarn-api, annotate and document it. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1494031 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-18 06:20:37 +00:00
parent acc0d3eb52
commit f4d80e91ae
18 changed files with 366 additions and 95 deletions

View File

@ -158,7 +158,7 @@ public class ContainerLauncherImpl extends AbstractService implements
StartContainerResponse response = proxy.startContainer(startRequest); StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = ByteBuffer portInfo =
response.getAllServiceResponse().get( response.getAllServicesMetaData().get(
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1; int port = -1;
if(portInfo != null) { if(portInfo != null) {

View File

@ -164,7 +164,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setAllServiceResponse(serviceResponse); startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event"); LOG.info("inserting launch event");
@ -230,7 +230,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setAllServiceResponse(serviceResponse); startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting cleanup event"); LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent = ContainerLauncherEvent mockCleanupEvent =
@ -296,7 +296,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setAllServiceResponse(serviceResponse); startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event"); LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent = ContainerRemoteLaunchEvent mockLaunchEvent =
@ -355,7 +355,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setAllServiceResponse(serviceResponse); startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event"); LOG.info("inserting launch event");

View File

@ -73,10 +73,11 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
@ -112,8 +113,7 @@ import org.jboss.netty.util.CharsetUtil;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ShuffleHandler extends AbstractService public class ShuffleHandler extends AuxiliaryService {
implements AuxServices.AuxiliaryService {
private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
@ -244,7 +244,11 @@ public class ShuffleHandler extends AbstractService
} }
@Override @Override
public void initApp(String user, ApplicationId appId, ByteBuffer secret) { public void initializeApplication(ApplicationInitializationContext context) {
String user = context.getUser();
ApplicationId appId = context.getApplicationId();
ByteBuffer secret = context.getApplicationDataForService();
// TODO these bytes should be versioned // TODO these bytes should be versioned
try { try {
Token<JobTokenIdentifier> jt = deserializeServiceData(secret); Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
@ -260,7 +264,8 @@ public class ShuffleHandler extends AbstractService
} }
@Override @Override
public void stopApp(ApplicationId appId) { public void stopApplication(ApplicationTerminationContext context) {
ApplicationId appId = context.getApplicationId();
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
secretManager.removeTokenForJob(jobId.toString()); secretManager.removeTokenForJob(jobId.toString());
userRsrc.remove(jobId.toString()); userRsrc.remove(jobId.toString());
@ -328,7 +333,7 @@ public class ShuffleHandler extends AbstractService
} }
@Override @Override
public synchronized ByteBuffer getMeta() { public synchronized ByteBuffer getMetaData() {
try { try {
return serializeMetaData(port); return serializeMetaData(port);
} catch (IOException e) { } catch (IOException e) {

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
@ -317,8 +318,10 @@ public class TestShuffleHandler {
new Token<JobTokenIdentifier>("identifier".getBytes(), new Token<JobTokenIdentifier>("identifier".getBytes(),
"password".getBytes(), new Text(user), new Text("shuffleService")); "password".getBytes(), new Text(user), new Text("shuffleService"));
jt.write(outputBuffer); jt.write(outputBuffer);
shuffleHandler.initApp(user, appId, shuffleHandler
ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())); .initializeApplication(new ApplicationInitializationContext(user,
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength())));
URL url = URL url =
new URL( new URL(
"http://127.0.0.1:" "http://127.0.0.1:"

View File

@ -193,6 +193,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-840. Moved ProtoUtils to yarn.api.records.pb.impl. (Jian He via YARN-840. Moved ProtoUtils to yarn.api.records.pb.impl. (Jian He via
acmurthy) acmurthy)
YARN-841. Move Auxiliary service to yarn-api, annotate and document it.
(vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -42,32 +42,45 @@ public abstract class StartContainerResponse {
@Private @Private
@Unstable @Unstable
public static StartContainerResponse newInstance( public static StartContainerResponse newInstance(
Map<String, ByteBuffer> serviceResponses) { Map<String, ByteBuffer> servicesMetaData) {
StartContainerResponse response = StartContainerResponse response =
Records.newRecord(StartContainerResponse.class); Records.newRecord(StartContainerResponse.class);
response.setAllServiceResponse(serviceResponses); response.setAllServicesMetaData(servicesMetaData);
return response; return response;
} }
/** /**
* <p>Get the responses from all auxiliary services running on the * <p>
* <code>NodeManager</code>.</p> * Get the meta-data from all auxiliary services running on the
* <p>The responses are returned as a Map between the auxiliary service names * <code>NodeManager</code>.
* and their corresponding opaque blob <code>ByteBuffer</code>s</p> * </p>
* @return a Map between the auxiliary service names and their outputs * <p>
* The meta-data is returned as a Map between the auxiliary service names and
* their corresponding per service meta-data as an opaque blob
* <code>ByteBuffer</code>
* </p>
*
* <p>
* To be able to interpret the per-service meta-data, you should consult the
* documentation for the Auxiliary-service configured on the NodeManager
* </p>
*
* @return a Map between the names of auxiliary services and their
* corresponding meta-data
*/ */
@Public @Public
@Stable @Stable
public abstract Map<String, ByteBuffer> getAllServiceResponse(); public abstract Map<String, ByteBuffer> getAllServicesMetaData();
/** /**
* Set to the list of auxiliary services which have been started on the * Set to the list of auxiliary services which have been started on the
* <code>NodeManager</code>. This is done only once when the * <code>NodeManager</code>. This is done only once when the
* <code>NodeManager</code> starts up * <code>NodeManager</code> starts up
* @param serviceResponses A map from auxiliary service names to the opaque * @param allServicesMetaData A map from auxiliary service names to the opaque
* blob <code>ByteBuffer</code>s for that auxiliary service * blob <code>ByteBuffer</code> for that auxiliary service
*/ */
@Private @Private
@Unstable @Unstable
public abstract void setAllServiceResponse(Map<String, ByteBuffer> serviceResponses); public abstract void setAllServicesMetaData(
Map<String, ByteBuffer> allServicesMetaData);
} }

View File

@ -42,7 +42,7 @@ public class StartContainerResponsePBImpl extends StartContainerResponse {
StartContainerResponseProto.Builder builder = null; StartContainerResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Map<String, ByteBuffer> serviceResponse = null; private Map<String, ByteBuffer> servicesMetaData = null;
public StartContainerResponsePBImpl() { public StartContainerResponsePBImpl() {
builder = StartContainerResponseProto.newBuilder(); builder = StartContainerResponseProto.newBuilder();
@ -81,8 +81,8 @@ public class StartContainerResponsePBImpl extends StartContainerResponse {
} }
private synchronized void mergeLocalToBuilder() { private synchronized void mergeLocalToBuilder() {
if (this.serviceResponse != null) { if (this.servicesMetaData != null) {
addServiceResponseToProto(); addServicesMetaDataToProto();
} }
} }
@ -112,38 +112,38 @@ public class StartContainerResponsePBImpl extends StartContainerResponse {
@Override @Override
public synchronized Map<String, ByteBuffer> getAllServiceResponse() { public synchronized Map<String, ByteBuffer> getAllServicesMetaData() {
initServiceResponse(); initServicesMetaData();
return this.serviceResponse; return this.servicesMetaData;
} }
@Override @Override
public synchronized void setAllServiceResponse( public synchronized void setAllServicesMetaData(
Map<String, ByteBuffer> serviceResponses) { Map<String, ByteBuffer> servicesMetaData) {
if(serviceResponses == null) { if(servicesMetaData == null) {
return; return;
} }
initServiceResponse(); initServicesMetaData();
this.serviceResponse.clear(); this.servicesMetaData.clear();
this.serviceResponse.putAll(serviceResponses); this.servicesMetaData.putAll(servicesMetaData);
} }
private synchronized void initServiceResponse() { private synchronized void initServicesMetaData() {
if (this.serviceResponse != null) { if (this.servicesMetaData != null) {
return; return;
} }
StartContainerResponseProtoOrBuilder p = viaProto ? proto : builder; StartContainerResponseProtoOrBuilder p = viaProto ? proto : builder;
List<StringBytesMapProto> list = p.getServiceResponseList(); List<StringBytesMapProto> list = p.getServicesMetaDataList();
this.serviceResponse = new HashMap<String, ByteBuffer>(); this.servicesMetaData = new HashMap<String, ByteBuffer>();
for (StringBytesMapProto c : list) { for (StringBytesMapProto c : list) {
this.serviceResponse.put(c.getKey(), convertFromProtoFormat(c.getValue())); this.servicesMetaData.put(c.getKey(), convertFromProtoFormat(c.getValue()));
} }
} }
private synchronized void addServiceResponseToProto() { private synchronized void addServicesMetaDataToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearServiceResponse(); builder.clearServicesMetaData();
if (serviceResponse == null) if (servicesMetaData == null)
return; return;
Iterable<StringBytesMapProto> iterable = new Iterable<StringBytesMapProto>() { Iterable<StringBytesMapProto> iterable = new Iterable<StringBytesMapProto>() {
@ -151,7 +151,7 @@ public class StartContainerResponsePBImpl extends StartContainerResponse {
public synchronized Iterator<StringBytesMapProto> iterator() { public synchronized Iterator<StringBytesMapProto> iterator() {
return new Iterator<StringBytesMapProto>() { return new Iterator<StringBytesMapProto>() {
Iterator<String> keyIter = serviceResponse.keySet().iterator(); Iterator<String> keyIter = servicesMetaData.keySet().iterator();
@Override @Override
public synchronized void remove() { public synchronized void remove() {
@ -161,7 +161,7 @@ public class StartContainerResponsePBImpl extends StartContainerResponse {
@Override @Override
public synchronized StringBytesMapProto next() { public synchronized StringBytesMapProto next() {
String key = keyIter.next(); String key = keyIter.next();
return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(serviceResponse.get(key))).build(); return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(servicesMetaData.get(key))).build();
} }
@Override @Override
@ -171,6 +171,6 @@ public class StartContainerResponsePBImpl extends StartContainerResponse {
}; };
} }
}; };
builder.addAllServiceResponse(iterable); builder.addAllServicesMetaData(iterable);
} }
} }

View File

@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -109,7 +111,19 @@ public abstract class ContainerLaunchContext {
public abstract void setLocalResources(Map<String, LocalResource> localResources); public abstract void setLocalResources(Map<String, LocalResource> localResources);
/** /**
* Get application-specific binary <em>service data</em>. * <p>
* Get application-specific binary <em>service data</em>. This is a map keyed
* by the name of each {@link AuxiliaryService} that is configured on a
* NodeManager and value correspond to the application specific data targeted
* for the keyed {@link AuxiliaryService}.
* </p>
*
* <p>
* This will be used to initialize this application on the specific
* {@link AuxiliaryService} running on the NodeManager by calling
* {@link AuxiliaryService#initializeApplication(ApplicationInitializationContext)}
* </p>
*
* @return application-specific binary <em>service data</em> * @return application-specific binary <em>service data</em>
*/ */
@Public @Public
@ -117,9 +131,16 @@ public abstract class ContainerLaunchContext {
public abstract Map<String, ByteBuffer> getServiceData(); public abstract Map<String, ByteBuffer> getServiceData();
/** /**
* Set application-specific binary <em>service data</em>. All pre-existing Map * <p>
* entries are preserved. * Get application-specific binary <em>service data</em>. This is a map keyed
* @param serviceData application-specific binary <em>service data</em> * by the name of each {@link AuxiliaryService} that is configured on a
* NodeManager and value correspond to the application specific data targeted
* for the keyed {@link AuxiliaryService}. All pre-existing Map entries are
* preserved.
* </p>
*
* @param serviceData
* application-specific binary <em>service data</em>
*/ */
@Public @Public
@Stable @Stable

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
/**
* Initialization context for {@link AuxiliaryService} when starting an
* application.
*/
@Public
@Evolving
public class ApplicationInitializationContext {
private final String user;
private final ApplicationId applicationId;
private ByteBuffer appDataForService;
@Private
@Unstable
public ApplicationInitializationContext(String user, ApplicationId applicationId,
ByteBuffer appDataForService) {
this.user = user;
this.applicationId = applicationId;
this.appDataForService = appDataForService;
}
/**
* Get the user-name of the application-submitter
*
* @return user-name
*/
public String getUser() {
return this.user;
}
/**
* Get {@link ApplicationId} of the application
*
* @return applications ID
*/
public ApplicationId getApplicationId() {
return this.applicationId;
}
/**
* Get the data sent to the NodeManager via
* {@link ContainerManagementProtocol#startContainer(StartContainerRequest)}
* as part of {@link ContainerLaunchContext#getServiceData()}
*
* @return the servicesData for this application.
*/
public ByteBuffer getApplicationDataForService() {
return this.appDataForService;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* Initialization context for {@link AuxiliaryService} when stopping an
* application.
*
*/
@Public
@Evolving
public class ApplicationTerminationContext {
private final ApplicationId applicationId;
@Private
@Unstable
public ApplicationTerminationContext(ApplicationId applicationId) {
this.applicationId = applicationId;
}
/**
* Get {@link ApplicationId} of the application being stopped.
*
* @return applications ID
*/
public ApplicationId getApplicationId() {
return this.applicationId;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* A generic service that will be started by the NodeManager. This is a service
* that administrators have to configure on each node by setting
* {@link YarnConfiguration#NM_AUX_SERVICES}.
*
*/
@Public
@Evolving
public abstract class AuxiliaryService extends AbstractService {
protected AuxiliaryService(String name) {
super(name);
}
/**
* A new application is started on this NodeManager. This is a signal to
* this {@link AuxiliaryService} about the application initialization.
*
* @param initAppContext context for the application's initialization
*/
public abstract void initializeApplication(
ApplicationInitializationContext initAppContext);
/**
* An application is finishing on this NodeManager. This is a signal to this
* {@link AuxiliaryService} about the same.
*
* @param stopAppContext context for the application termination
*/
public abstract void stopApplication(
ApplicationTerminationContext stopAppContext);
/**
* Retrieve meta-data for this {@link AuxiliaryService}. Applications using
* this {@link AuxiliaryService} SHOULD know the format of the meta-data -
* ideally each service should provide a method to parse out the information
* to the applications. One example of meta-data is contact information so
* that applications can access the service remotely. This will only be called
* after the service's {@link #start()} method has finished. the result may be
* cached.
*
* <p>
* The information is passed along to applications via
* {@link StartContainerResponse#getAllServicesMetaData()} that is returned by
* {@link ContainerManagementProtocol#startContainer(StartContainerRequest)}
* </p>
*
* @return meta-data for this service that should be made available to
* applications.
*/
public abstract ByteBuffer getMetaData();
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -160,7 +160,7 @@ message StartContainerRequestProto {
} }
message StartContainerResponseProto { message StartContainerResponseProto {
repeated StringBytesMapProto service_response = 1; repeated StringBytesMapProto services_meta_data = 1;
} }
message StopContainerRequestProto { message StopContainerRequestProto {

View File

@ -242,7 +242,7 @@ public class NMClientImpl extends NMClient {
LOG.warn("Container " + containerId + " failed to start", e); LOG.warn("Container " + containerId + " failed to start", e);
throw e; throw e;
} }
return startResponse.getAllServiceResponse(); return startResponse.getAllServicesMetaData();
} }
public synchronized void stopContainer() throws YarnException, public synchronized void stopContainer() throws YarnException,

View File

@ -32,9 +32,11 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
public class AuxServices extends AbstractService public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> { implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
@ -42,13 +44,13 @@ public class AuxServices extends AbstractService
private static final Log LOG = LogFactory.getLog(AuxServices.class); private static final Log LOG = LogFactory.getLog(AuxServices.class);
protected final Map<String,AuxiliaryService> serviceMap; protected final Map<String,AuxiliaryService> serviceMap;
protected final Map<String,ByteBuffer> serviceMeta; protected final Map<String,ByteBuffer> serviceMetaData;
public AuxServices() { public AuxServices() {
super(AuxServices.class.getName()); super(AuxServices.class.getName());
serviceMap = serviceMap =
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>()); Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
serviceMeta = serviceMetaData =
Collections.synchronizedMap(new HashMap<String,ByteBuffer>()); Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
// Obtain services from configuration in init() // Obtain services from configuration in init()
} }
@ -69,11 +71,11 @@ public class AuxServices extends AbstractService
* If a service has not been started no metadata will be available. The key * If a service has not been started no metadata will be available. The key
* is the name of the service as defined in the configuration. * is the name of the service as defined in the configuration.
*/ */
public Map<String, ByteBuffer> getMeta() { public Map<String, ByteBuffer> getMetaData() {
Map<String, ByteBuffer> metaClone = new HashMap<String, ByteBuffer>( Map<String, ByteBuffer> metaClone = new HashMap<String, ByteBuffer>(
serviceMeta.size()); serviceMetaData.size());
synchronized (serviceMeta) { synchronized (serviceMetaData) {
for (Entry<String, ByteBuffer> entry : serviceMeta.entrySet()) { for (Entry<String, ByteBuffer> entry : serviceMetaData.entrySet()) {
metaClone.put(entry.getKey(), entry.getValue().duplicate()); metaClone.put(entry.getKey(), entry.getValue().duplicate());
} }
} }
@ -121,9 +123,9 @@ public class AuxServices extends AbstractService
String name = entry.getKey(); String name = entry.getKey();
service.start(); service.start();
service.registerServiceListener(this); service.registerServiceListener(this);
ByteBuffer meta = service.getMeta(); ByteBuffer meta = service.getMetaData();
if(meta != null) { if(meta != null) {
serviceMeta.put(name, meta); serviceMetaData.put(name, meta);
} }
} }
super.serviceStart(); super.serviceStart();
@ -140,7 +142,7 @@ public class AuxServices extends AbstractService
} }
} }
serviceMap.clear(); serviceMap.clear();
serviceMeta.clear(); serviceMetaData.clear();
} }
} finally { } finally {
super.serviceStop(); super.serviceStop();
@ -167,12 +169,13 @@ public class AuxServices extends AbstractService
// TODO kill all containers waiting on Application // TODO kill all containers waiting on Application
return; return;
} }
service.initApp(event.getUser(), event.getApplicationID(), service.initializeApplication(new ApplicationInitializationContext(event
event.getServiceData()); .getUser(), event.getApplicationID(), event.getServiceData()));
break; break;
case APPLICATION_STOP: case APPLICATION_STOP:
for (AuxiliaryService serv : serviceMap.values()) { for (AuxiliaryService serv : serviceMap.values()) {
serv.stopApp(event.getApplicationID()); serv.stopApplication(new ApplicationTerminationContext(event
.getApplicationID()));
} }
break; break;
default: default:
@ -180,18 +183,4 @@ public class AuxServices extends AbstractService
} }
} }
public interface AuxiliaryService extends Service {
void initApp(String user, ApplicationId appId, ByteBuffer data);
void stopApp(ApplicationId appId);
/**
* Retreive metadata for this service. This is likely going to be contact
* information so that applications can access the service remotely. Ideally
* each service should provide a method to parse out the information to a usable
* class. This will only be called after the services start method has finished.
* the result may be cached.
* @return metadata for this service that should be made avaiable to applications.
*/
ByteBuffer getMeta();
}
} }

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.Event;
public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> { public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {

View File

@ -491,8 +491,7 @@ public class ContainerManagerImpl extends CompositeService implements
applicationID, containerID); applicationID, containerID);
StartContainerResponse response = StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class); StartContainerResponse.newInstance(auxiliaryServices.getMetaData());
response.setAllServiceResponse(auxiliaryServices.getMeta());
// TODO launchedContainer misplaced -> doesn't necessarily mean a container // TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers. // launch. A finished Application will not launch containers.
metrics.launchedContainer(); metrics.launchedContainer();

View File

@ -33,17 +33,19 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.junit.Test; import org.junit.Test;
public class TestAuxServices { public class TestAuxServices {
private static final Log LOG = LogFactory.getLog(TestAuxServices.class); private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
static class LightService extends AbstractService static class LightService extends AuxiliaryService implements Service
implements AuxServices.AuxiliaryService { {
private final char idef; private final char idef;
private final int expected_appId; private final int expected_appId;
private int remaining_init; private int remaining_init;
@ -79,17 +81,18 @@ public class TestAuxServices {
super.serviceStop(); super.serviceStop();
} }
@Override @Override
public void initApp(String user, ApplicationId appId, ByteBuffer data) { public void initializeApplication(ApplicationInitializationContext context) {
ByteBuffer data = context.getApplicationDataForService();
assertEquals(idef, data.getChar()); assertEquals(idef, data.getChar());
assertEquals(expected_appId, data.getInt()); assertEquals(expected_appId, data.getInt());
assertEquals(expected_appId, appId.getId()); assertEquals(expected_appId, context.getApplicationId().getId());
} }
@Override @Override
public void stopApp(ApplicationId appId) { public void stopApplication(ApplicationTerminationContext context) {
stoppedApps.add(appId.getId()); stoppedApps.add(context.getApplicationId().getId());
} }
@Override @Override
public ByteBuffer getMeta() { public ByteBuffer getMetaData() {
return meta; return meta;
} }
} }
@ -133,8 +136,8 @@ public class TestAuxServices {
AuxServicesEventType.APPLICATION_STOP, "user0", appId2, "Bsrv", null); AuxServicesEventType.APPLICATION_STOP, "user0", appId2, "Bsrv", null);
// verify all services got the stop event // verify all services got the stop event
aux.handle(event); aux.handle(event);
Collection<AuxServices.AuxiliaryService> servs = aux.getServices(); Collection<AuxiliaryService> servs = aux.getServices();
for (AuxServices.AuxiliaryService serv: servs) { for (AuxiliaryService serv: servs) {
ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped(); ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
assertEquals("app not properly stopped", 1, appIds.size()); assertEquals("app not properly stopped", 1, appIds.size());
assertTrue("wrong app stopped", appIds.contains((Integer)66)); assertTrue("wrong app stopped", appIds.contains((Integer)66));
@ -196,7 +199,7 @@ public class TestAuxServices {
assertEquals(STARTED, s.getServiceState()); assertEquals(STARTED, s.getServiceState());
} }
Map<String, ByteBuffer> meta = aux.getMeta(); Map<String, ByteBuffer> meta = aux.getMetaData();
assertEquals(2, meta.size()); assertEquals(2, meta.size());
assertEquals("A", new String(meta.get("Asrv").array())); assertEquals("A", new String(meta.get("Asrv").array()));
assertEquals("B", new String(meta.get("Bsrv").array())); assertEquals("B", new String(meta.get("Bsrv").array()));