YARN-8048. Support auto-spawning of admin configured services during bootstrap of RM (Rohith Sharma K S via wangda)

Change-Id: I2d8d61ccad55e1118009294d7e17822df3cd0fd5
This commit is contained in:
Wangda Tan 2018-04-06 21:24:58 -07:00 committed by Arpit Agarwal
parent d98c3ca5fd
commit 7031a853f4
14 changed files with 889 additions and 1 deletions

View File

@ -341,6 +341,10 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_API_SERVICES_ENABLE = "yarn." public static final String YARN_API_SERVICES_ENABLE = "yarn."
+ "webapp.api-service.enable"; + "webapp.api-service.enable";
@Private
public static final String DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS =
"org.apache.hadoop.yarn.service.client.SystemServiceManagerImpl";
public static final String RM_RESOURCE_TRACKER_ADDRESS = public static final String RM_RESOURCE_TRACKER_ADDRESS =
RM_PREFIX + "resource-tracker.address"; RM_PREFIX + "resource-tracker.address";
public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031; public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031;

View File

@ -71,6 +71,7 @@
<configuration> <configuration>
<excludes> <excludes>
<exclude>**/*.json</exclude> <exclude>**/*.json</exclude>
<exclude>**/*.yarnfile</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>
@ -94,6 +95,10 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId> <artifactId>hadoop-yarn-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>

View File

@ -0,0 +1,381 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.service.SystemServiceManager;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
/**
* SystemServiceManager implementation.
* Scan for configure system service path.
*
* The service path structure is as follows:
* SYSTEM_SERVICE_DIR_PATH
* |---- sync
* | |--- user1
* | | |---- service1.yarnfile
* | | |---- service2.yarnfile
* | |--- user2
* | | |---- service1.yarnfile
* | | ....
* | |
* |---- async
* | |--- user3
* | | |---- service1.yarnfile
* | | |---- service2.yarnfile
* | |--- user4
* | | |---- service1.yarnfile
* | | ....
* | |
*
* sync: These services are launched at the time of service start synchronously.
* It is a blocking service start.
* async: These services are launched in separate thread without any delay after
* service start. Non-blocking service start.
*/
public class SystemServiceManagerImpl extends AbstractService
implements SystemServiceManager {
private static final Logger LOG =
LoggerFactory.getLogger(SystemServiceManagerImpl.class);
private static final String YARN_FILE_SUFFIX = ".yarnfile";
private static final String SYNC = "sync";
private static final String ASYNC = "async";
private FileSystem fs;
private Path systemServiceDir;
private AtomicBoolean stopExecutors = new AtomicBoolean(false);
private Map<String, Set<Service>> syncUserServices = new HashMap<>();
private Map<String, Set<Service>> asyncUserServices = new HashMap<>();
private UserGroupInformation loginUGI;
private Thread serviceLaucher;
@VisibleForTesting
private int skipCounter;
@VisibleForTesting
private Map<String, Integer> ignoredUserServices =
new HashMap<>();
public SystemServiceManagerImpl() {
super(SystemServiceManagerImpl.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
String dirPath =
conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY);
if (dirPath != null) {
systemServiceDir = new Path(dirPath);
LOG.info("System Service Directory is configured to {}",
systemServiceDir);
fs = systemServiceDir.getFileSystem(conf);
this.loginUGI = UserGroupInformation.isSecurityEnabled() ?
UserGroupInformation.getLoginUser() :
UserGroupInformation.getCurrentUser();
LOG.info("UserGroupInformation initialized to {}", loginUGI);
}
}
@Override
protected void serviceStart() throws Exception {
scanForUserServices();
launchUserService(syncUserServices);
// Create a thread and submit services in background otherwise it
// block RM switch time.
serviceLaucher = new Thread(createRunnable());
serviceLaucher.setName("System service launcher");
serviceLaucher.start();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping {}", getName());
stopExecutors.set(true);
if (serviceLaucher != null) {
serviceLaucher.interrupt();
try {
serviceLaucher.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
}
private Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
launchUserService(asyncUserServices);
}
};
}
void launchUserService(Map<String, Set<Service>> userServices) {
for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) {
String user = entry.getKey();
Set<Service> services = entry.getValue();
if (services.isEmpty()) {
continue;
}
ServiceClient serviceClient = null;
try {
UserGroupInformation userUgi = getProxyUser(user);
serviceClient = createServiceClient(userUgi);
for (Service service : services) {
LOG.info("POST: createService = {} user = {}", service, userUgi);
try {
launchServices(userUgi, serviceClient, service);
} catch (IOException | UndeclaredThrowableException e) {
if (e.getCause() != null) {
LOG.warn(e.getCause().getMessage());
} else {
String message =
"Failed to create service " + service.getName() + " : ";
LOG.error(message, e);
}
}
}
} catch (InterruptedException e) {
LOG.warn("System service launcher thread interrupted", e);
break;
} catch (Exception e) {
LOG.error("Error while submitting services for user " + user, e);
} finally {
if (serviceClient != null) {
try {
serviceClient.close();
} catch (IOException e) {
LOG.warn("Error while closing serviceClient for user {}", user);
}
}
}
}
}
private ServiceClient createServiceClient(UserGroupInformation userUgi)
throws IOException, InterruptedException {
ServiceClient serviceClient =
userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() {
@Override public ServiceClient run()
throws IOException, YarnException {
ServiceClient sc = getServiceClient();
sc.init(getConfig());
sc.start();
return sc;
}
});
return serviceClient;
}
private void launchServices(UserGroupInformation userUgi,
ServiceClient serviceClient, Service service)
throws IOException, InterruptedException {
if (service.getState() == ServiceState.STOPPED) {
userUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override public Void run() throws IOException, YarnException {
serviceClient.actionBuild(service);
return null;
}
});
LOG.info("Service {} version {} saved.", service.getName(),
service.getVersion());
} else {
ApplicationId applicationId =
userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
@Override public ApplicationId run()
throws IOException, YarnException {
ApplicationId applicationId = serviceClient.actionCreate(service);
return applicationId;
}
});
LOG.info("Service {} submitted with Application ID: {}",
service.getName(), applicationId);
}
}
ServiceClient getServiceClient() {
return new ServiceClient();
}
private UserGroupInformation getProxyUser(String user) {
UserGroupInformation ugi;
if (UserGroupInformation.isSecurityEnabled()) {
ugi = UserGroupInformation.createProxyUser(user, loginUGI);
} else {
ugi = UserGroupInformation.createRemoteUser(user);
}
return ugi;
}
// scan for both launch service types i.e sync and async
void scanForUserServices() throws IOException {
if (systemServiceDir == null) {
return;
}
try {
LOG.info("Scan for launch type on {}", systemServiceDir);
RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir);
while (iterLaunchType.hasNext()) {
FileStatus launchType = iterLaunchType.next();
if (!launchType.isDirectory()) {
LOG.debug("Scanner skips for unknown file {}", launchType.getPath());
continue;
}
if (launchType.getPath().getName().equals(SYNC)) {
scanForUserServiceDefinition(launchType.getPath(), syncUserServices);
} else if (launchType.getPath().getName().equals(ASYNC)) {
scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
} else {
LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
}
}
} catch (FileNotFoundException e) {
LOG.warn("System service directory {} doesn't not exist.",
systemServiceDir);
}
}
// Files are under systemServiceDir/<users>. Scan for 2 levels
// 1st level for users
// 2nd level for service definitions under user
private void scanForUserServiceDefinition(Path userDirPath,
Map<String, Set<Service>> userServices) throws IOException {
LOG.info("Scan for users on {}", userDirPath);
RemoteIterator<FileStatus> iterUsers = list(userDirPath);
while (iterUsers.hasNext()) {
FileStatus userDir = iterUsers.next();
// if 1st level is not user directory then skip it.
if (!userDir.isDirectory()) {
LOG.info(
"Service definition {} doesn't belong to any user. Ignoring.. ",
userDir.getPath().getName());
continue;
}
String userName = userDir.getPath().getName();
LOG.info("Scanning service definitions for user {}.", userName);
//2nd level scan
RemoteIterator<FileStatus> iterServices = list(userDir.getPath());
while (iterServices.hasNext()) {
FileStatus serviceCache = iterServices.next();
String filename = serviceCache.getPath().getName();
if (!serviceCache.isFile()) {
LOG.info("Scanner skips for unknown dir {}", filename);
continue;
}
if (!filename.endsWith(YARN_FILE_SUFFIX)) {
LOG.info("Scanner skips for unknown file extension, filename = {}",
filename);
skipCounter++;
continue;
}
Service service = getServiceDefinition(serviceCache.getPath());
if (service != null) {
Set<Service> services = userServices.get(userName);
if (services == null) {
services = new HashSet<>();
userServices.put(userName, services);
}
if (!services.add(service)) {
int count = ignoredUserServices.containsKey(userName) ?
ignoredUserServices.get(userName) : 0;
ignoredUserServices.put(userName, count + 1);
LOG.warn(
"Ignoring service {} for the user {} as it is already present,"
+ " filename = {}", service.getName(), userName, filename);
}
LOG.info("Added service {} for the user {}, filename = {}",
service.getName(), userName, filename);
}
}
}
}
private Service getServiceDefinition(Path filePath) {
Service service = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading service definition from FS: " + filePath);
}
service = jsonSerDeser.load(fs, filePath);
} catch (IOException e) {
LOG.info("Error while loading service definition from FS: {}", e);
}
return service;
}
private RemoteIterator<FileStatus> list(Path path) throws IOException {
return new StoppableRemoteIterator(fs.listStatusIterator(path));
}
@VisibleForTesting Map<String, Integer> getIgnoredUserServices() {
return ignoredUserServices;
}
private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
private final RemoteIterator<FileStatus> remote;
StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
this.remote = remote;
}
@Override public boolean hasNext() throws IOException {
return !stopExecutors.get() && remote.hasNext();
}
@Override public FileStatus next() throws IOException {
return remote.next();
}
}
@VisibleForTesting
Map<String, Set<Service>> getSyncUserServices() {
return syncUserServices;
}
@VisibleForTesting int getSkipCounter() {
return skipCounter;
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* Test class for system service manager.
*/
public class TestSystemServiceImpl {
private static final Logger LOG =
LoggerFactory.getLogger(TestSystemServiceImpl.class);
private SystemServiceManagerImpl systemService;
private Configuration conf;
private String resourcePath = "users";
private String[] users = new String[] {"user1", "user2"};
private static Map<String, Set<String>> loadedServices = new HashMap<>();
private static Map<String, Set<String>> submittedServices = new HashMap<>();
@Before
public void setup() {
File file = new File(
getClass().getClassLoader().getResource(resourcePath).getFile());
conf = new Configuration();
conf.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY,
file.getAbsolutePath());
systemService = new SystemServiceManagerImpl() {
@Override ServiceClient getServiceClient() {
return new TestServiceClient();
}
};
systemService.init(conf); // do not call explicit start
constructUserService(users[0], "example-app1");
constructUserService(users[1], "example-app1", "example-app2");
}
@After
public void teadDown() {
systemService.stop();
}
@Test
public void testSystemServiceSubmission() throws Exception {
systemService.start();
/* verify for ignored sevices count */
Map<String, Integer> ignoredUserServices =
systemService.getIgnoredUserServices();
Assert.assertEquals(1, ignoredUserServices.size());
Assert.assertTrue("User user1 doesn't exist.",
ignoredUserServices.containsKey(users[0]));
int count = ignoredUserServices.get(users[0]);
Assert.assertEquals(1, count);
Assert.assertEquals(1, systemService.getSkipCounter());
Map<String, Set<Service>> userServices =
systemService.getSyncUserServices();
Assert.assertEquals(loadedServices.size(), userServices.size());
verifyForScannedUserServices(userServices);
verifyForLaunchedUserServices();
// 2nd time launch service to handle if service exist scenario
systemService.launchUserService(userServices);
verifyForLaunchedUserServices();
}
private void verifyForScannedUserServices(
Map<String, Set<Service>> userServices) {
for (String user : users) {
Set<Service> services = userServices.get(user);
Set<String> serviceNames = loadedServices.get(user);
Assert.assertEquals(serviceNames.size(), services.size());
Iterator<Service> iterator = services.iterator();
while (iterator.hasNext()) {
Service next = iterator.next();
Assert.assertTrue(
"Service name doesn't exist in expected " + "userService "
+ serviceNames, serviceNames.contains(next.getName()));
}
}
}
public void constructUserService(String user, String... serviceNames) {
Set<String> service = loadedServices.get(user);
if (service == null) {
service = new HashSet<>();
for (String serviceName : serviceNames) {
service.add(serviceName);
}
loadedServices.put(user, service);
}
}
class TestServiceClient extends ServiceClient {
@Override
protected void serviceStart() throws Exception {
// do nothing
}
@Override
protected void serviceStop() throws Exception {
// do nothing
}
@Override
protected void serviceInit(Configuration configuration)
throws Exception {
// do nothing
}
@Override
public ApplicationId actionCreate(Service service)
throws YarnException, IOException {
String userName =
UserGroupInformation.getCurrentUser().getShortUserName();
Set<String> services = submittedServices.get(userName);
if (services == null) {
services = new HashSet<>();
submittedServices.put(userName, services);
}
if (services.contains(service.getName())) {
String message = "Failed to create service " + service.getName()
+ ", because it already exists.";
throw new YarnException(message);
}
services.add(service.getName());
return ApplicationId.newInstance(System.currentTimeMillis(), 1);
}
}
private void verifyForLaunchedUserServices() {
Assert.assertEquals(loadedServices.size(), submittedServices.size());
for (Map.Entry<String, Set<String>> entry : submittedServices.entrySet()) {
String user = entry.getKey();
Set<String> serviceSet = entry.getValue();
Assert.assertTrue(loadedServices.containsKey(user));
Set<String> services = loadedServices.get(user);
Assert.assertEquals(services.size(), serviceSet.size());
Assert.assertTrue(services.containsAll(serviceSet));
}
}
}

View File

@ -0,0 +1,16 @@
{
"name": "example-app1",
"version": "1.0.0",
"components" :
[
{
"name": "simple",
"number_of_containers": 1,
"launch_command": "sleep 2",
"resource": {
"cpus": 1,
"memory": "128"
}
}
]
}

View File

@ -0,0 +1,16 @@
{
"name": "example-app1",
"version": "1.0.0",
"components" :
[
{
"name": "simple",
"number_of_containers": 1,
"launch_command": "sleep 2",
"resource": {
"cpus": 1,
"memory": "128"
}
}
]
}

View File

@ -0,0 +1,16 @@
{
"name": "example-app3",
"version": "1.0.0",
"components" :
[
{
"name": "simple",
"number_of_containers": 1,
"launch_command": "sleep 2",
"resource": {
"cpus": 1,
"memory": "128"
}
}
]
}

View File

@ -0,0 +1,16 @@
{
"name": "example-app1",
"version": "1.0.0",
"components" :
[
{
"name": "simple",
"number_of_containers": 1,
"launch_command": "sleep 2",
"resource": {
"cpus": 1,
"memory": "128"
}
}
]
}

View File

@ -0,0 +1,16 @@
{
"name": "example-app2",
"version": "1.0.0",
"components" :
[
{
"name": "simple",
"number_of_containers": 1,
"launch_command": "sleep 2",
"resource": {
"cpus": 1,
"memory": "128"
}
}
]
}

View File

@ -50,6 +50,8 @@ public class YarnServiceConf {
public static final String ROLLING_LOG_INCLUSION_PATTERN = "yarn.service.rolling-log.include-pattern"; public static final String ROLLING_LOG_INCLUSION_PATTERN = "yarn.service.rolling-log.include-pattern";
public static final String ROLLING_LOG_EXCLUSION_PATTERN = "yarn.service.rolling-log.exclude-pattern"; public static final String ROLLING_LOG_EXCLUSION_PATTERN = "yarn.service.rolling-log.exclude-pattern";
public static final String YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY =
YARN_SERVICE_PREFIX + "system-service.dir";
/** /**
* The yarn service base path: * The yarn service base path:

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link ServiceManager}.
*/
public class TestSystemServiceManager {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testUpgrade() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testUpgrade");
upgrade(serviceManager, "v2", false);
Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartNothingToUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", false);
//make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp -> {
comp.setState(ComponentState.STABLE);
});
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service not re-started", ServiceState.STABLE,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartWithPendingUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", true);
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service should still be upgrading",
ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
}
private void upgrade(ServiceManager service, String version,
boolean upgradeArtifact)
throws IOException, SliderException {
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(service.getName());
upgradedDef.setVersion(version);
if (upgradeArtifact) {
Artifact upgradedArtifact = createTestArtifact("2");
upgradedDef.getComponents().forEach(component -> {
component.setArtifact(upgradedArtifact);
});
}
writeUpgradedDef(upgradedDef);
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
upgradeEvent.setVersion("v2");
service.handle(upgradeEvent);
}
private ServiceManager createTestServiceManager(String name)
throws IOException {
ServiceContext context = new ServiceContext();
context.service = createBaseDef(name);
context.fs = rule.getFs();
context.scheduler = new ServiceScheduler(context) {
@Override
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
};
context.scheduler.init(rule.getConf());
Map<String, org.apache.hadoop.yarn.service.component.Component>
componentState = context.scheduler.getAllComponents();
context.service.getComponents().forEach(component -> {
componentState.put(component.getName(),
new org.apache.hadoop.yarn.service.component.Component(component,
1L, context));
});
return new ServiceManager(context);
}
static Service createBaseDef(String name) {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
Service serviceDef = ServiceTestUtils.createExampleApplication();
serviceDef.setId(applicationId.toString());
serviceDef.setName(name);
serviceDef.setState(ServiceState.STARTED);
Artifact artifact = createTestArtifact("1");
serviceDef.getComponents().forEach(component ->
component.setArtifact(artifact));
return serviceDef;
}
static Artifact createTestArtifact(String artifactId) {
Artifact artifact = new Artifact();
artifact.setId(artifactId);
artifact.setType(Artifact.TypeEnum.TARBALL);
return artifact;
}
private void writeUpgradedDef(Service upgradedDef)
throws IOException, SliderException {
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
upgradedDef.getName(), upgradedDef.getVersion());
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
upgradedDef);
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.service;
/**
* Marker interface for starting services from RM. The implementation should
* launch configured services.
*/
public interface SystemServiceManager {
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.service contains service related
* classes.
*/
@InterfaceAudience.Private @InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineC
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.service.SystemServiceManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@ -486,6 +487,27 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
protected SystemServiceManager createServiceManager() {
String schedulerClassName =
YarnConfiguration.DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS;
LOG.info("Using SystemServiceManager: " + schedulerClassName);
try {
Class<?> schedulerClazz = Class.forName(schedulerClassName);
if (SystemServiceManager.class.isAssignableFrom(schedulerClazz)) {
return (SystemServiceManager) ReflectionUtils
.newInstance(schedulerClazz, this.conf);
} else {
throw new YarnRuntimeException(
"Class: " + schedulerClassName + " not instance of "
+ SystemServiceManager.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate SystemServiceManager: " + schedulerClassName,
e);
}
}
protected ApplicationMasterLauncher createAMLauncher() { protected ApplicationMasterLauncher createAMLauncher() {
return new ApplicationMasterLauncher(this.rmContext); return new ApplicationMasterLauncher(this.rmContext);
} }
@ -795,6 +817,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
new RMNMInfo(rmContext, scheduler); new RMNMInfo(rmContext, scheduler);
if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
false)) {
SystemServiceManager systemServiceManager = createServiceManager();
addIfService(systemServiceManager);
}
super.serviceInit(conf); super.serviceInit(conf);
} }