diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9898cba1d0c..58c288b6343 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -341,6 +341,10 @@ public class YarnConfiguration extends Configuration { public static final String YARN_API_SERVICES_ENABLE = "yarn." + "webapp.api-service.enable"; + @Private + public static final String DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS = + "org.apache.hadoop.yarn.service.client.SystemServiceManagerImpl"; + public static final String RM_RESOURCE_TRACKER_ADDRESS = RM_PREFIX + "resource-tracker.address"; public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml index 0dfa92dfca3..d45da093102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml @@ -71,6 +71,7 @@ **/*.json + **/*.yarnfile @@ -94,6 +95,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-server-common + org.apache.hadoop hadoop-common diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java new file mode 100644 index 00000000000..225f8bdfedb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.service.SystemServiceManager; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; + +/** + * SystemServiceManager implementation. + * Scan for configure system service path. + * + * The service path structure is as follows: + * SYSTEM_SERVICE_DIR_PATH + * |---- sync + * | |--- user1 + * | | |---- service1.yarnfile + * | | |---- service2.yarnfile + * | |--- user2 + * | | |---- service1.yarnfile + * | | .... + * | | + * |---- async + * | |--- user3 + * | | |---- service1.yarnfile + * | | |---- service2.yarnfile + * | |--- user4 + * | | |---- service1.yarnfile + * | | .... + * | | + * + * sync: These services are launched at the time of service start synchronously. + * It is a blocking service start. + * async: These services are launched in separate thread without any delay after + * service start. Non-blocking service start. + */ +public class SystemServiceManagerImpl extends AbstractService + implements SystemServiceManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SystemServiceManagerImpl.class); + + private static final String YARN_FILE_SUFFIX = ".yarnfile"; + private static final String SYNC = "sync"; + private static final String ASYNC = "async"; + + private FileSystem fs; + private Path systemServiceDir; + private AtomicBoolean stopExecutors = new AtomicBoolean(false); + private Map> syncUserServices = new HashMap<>(); + private Map> asyncUserServices = new HashMap<>(); + private UserGroupInformation loginUGI; + private Thread serviceLaucher; + + @VisibleForTesting + private int skipCounter; + @VisibleForTesting + private Map ignoredUserServices = + new HashMap<>(); + + public SystemServiceManagerImpl() { + super(SystemServiceManagerImpl.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + String dirPath = + conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY); + if (dirPath != null) { + systemServiceDir = new Path(dirPath); + LOG.info("System Service Directory is configured to {}", + systemServiceDir); + fs = systemServiceDir.getFileSystem(conf); + this.loginUGI = UserGroupInformation.isSecurityEnabled() ? + UserGroupInformation.getLoginUser() : + UserGroupInformation.getCurrentUser(); + LOG.info("UserGroupInformation initialized to {}", loginUGI); + } + } + + @Override + protected void serviceStart() throws Exception { + scanForUserServices(); + launchUserService(syncUserServices); + // Create a thread and submit services in background otherwise it + // block RM switch time. + serviceLaucher = new Thread(createRunnable()); + serviceLaucher.setName("System service launcher"); + serviceLaucher.start(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping {}", getName()); + stopExecutors.set(true); + + if (serviceLaucher != null) { + serviceLaucher.interrupt(); + try { + serviceLaucher.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted Exception while stopping", ie); + } + } + } + + private Runnable createRunnable() { + return new Runnable() { + @Override + public void run() { + launchUserService(asyncUserServices); + } + }; + } + + void launchUserService(Map> userServices) { + for (Map.Entry> entry : userServices.entrySet()) { + String user = entry.getKey(); + Set services = entry.getValue(); + if (services.isEmpty()) { + continue; + } + ServiceClient serviceClient = null; + try { + UserGroupInformation userUgi = getProxyUser(user); + serviceClient = createServiceClient(userUgi); + for (Service service : services) { + LOG.info("POST: createService = {} user = {}", service, userUgi); + try { + launchServices(userUgi, serviceClient, service); + } catch (IOException | UndeclaredThrowableException e) { + if (e.getCause() != null) { + LOG.warn(e.getCause().getMessage()); + } else { + String message = + "Failed to create service " + service.getName() + " : "; + LOG.error(message, e); + } + } + } + } catch (InterruptedException e) { + LOG.warn("System service launcher thread interrupted", e); + break; + } catch (Exception e) { + LOG.error("Error while submitting services for user " + user, e); + } finally { + if (serviceClient != null) { + try { + serviceClient.close(); + } catch (IOException e) { + LOG.warn("Error while closing serviceClient for user {}", user); + } + } + } + } + } + + private ServiceClient createServiceClient(UserGroupInformation userUgi) + throws IOException, InterruptedException { + ServiceClient serviceClient = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public ServiceClient run() + throws IOException, YarnException { + ServiceClient sc = getServiceClient(); + sc.init(getConfig()); + sc.start(); + return sc; + } + }); + return serviceClient; + } + + private void launchServices(UserGroupInformation userUgi, + ServiceClient serviceClient, Service service) + throws IOException, InterruptedException { + if (service.getState() == ServiceState.STOPPED) { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public Void run() throws IOException, YarnException { + serviceClient.actionBuild(service); + return null; + } + }); + LOG.info("Service {} version {} saved.", service.getName(), + service.getVersion()); + } else { + ApplicationId applicationId = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public ApplicationId run() + throws IOException, YarnException { + ApplicationId applicationId = serviceClient.actionCreate(service); + return applicationId; + } + }); + LOG.info("Service {} submitted with Application ID: {}", + service.getName(), applicationId); + } + } + + ServiceClient getServiceClient() { + return new ServiceClient(); + } + + private UserGroupInformation getProxyUser(String user) { + UserGroupInformation ugi; + if (UserGroupInformation.isSecurityEnabled()) { + ugi = UserGroupInformation.createProxyUser(user, loginUGI); + } else { + ugi = UserGroupInformation.createRemoteUser(user); + } + return ugi; + } + + // scan for both launch service types i.e sync and async + void scanForUserServices() throws IOException { + if (systemServiceDir == null) { + return; + } + try { + LOG.info("Scan for launch type on {}", systemServiceDir); + RemoteIterator iterLaunchType = list(systemServiceDir); + while (iterLaunchType.hasNext()) { + FileStatus launchType = iterLaunchType.next(); + if (!launchType.isDirectory()) { + LOG.debug("Scanner skips for unknown file {}", launchType.getPath()); + continue; + } + if (launchType.getPath().getName().equals(SYNC)) { + scanForUserServiceDefinition(launchType.getPath(), syncUserServices); + } else if (launchType.getPath().getName().equals(ASYNC)) { + scanForUserServiceDefinition(launchType.getPath(), asyncUserServices); + } else { + LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath()); + } + } + } catch (FileNotFoundException e) { + LOG.warn("System service directory {} doesn't not exist.", + systemServiceDir); + } + } + + // Files are under systemServiceDir/. Scan for 2 levels + // 1st level for users + // 2nd level for service definitions under user + private void scanForUserServiceDefinition(Path userDirPath, + Map> userServices) throws IOException { + LOG.info("Scan for users on {}", userDirPath); + RemoteIterator iterUsers = list(userDirPath); + while (iterUsers.hasNext()) { + FileStatus userDir = iterUsers.next(); + // if 1st level is not user directory then skip it. + if (!userDir.isDirectory()) { + LOG.info( + "Service definition {} doesn't belong to any user. Ignoring.. ", + userDir.getPath().getName()); + continue; + } + String userName = userDir.getPath().getName(); + LOG.info("Scanning service definitions for user {}.", userName); + + //2nd level scan + RemoteIterator iterServices = list(userDir.getPath()); + while (iterServices.hasNext()) { + FileStatus serviceCache = iterServices.next(); + String filename = serviceCache.getPath().getName(); + if (!serviceCache.isFile()) { + LOG.info("Scanner skips for unknown dir {}", filename); + continue; + } + if (!filename.endsWith(YARN_FILE_SUFFIX)) { + LOG.info("Scanner skips for unknown file extension, filename = {}", + filename); + skipCounter++; + continue; + } + Service service = getServiceDefinition(serviceCache.getPath()); + if (service != null) { + Set services = userServices.get(userName); + if (services == null) { + services = new HashSet<>(); + userServices.put(userName, services); + } + if (!services.add(service)) { + int count = ignoredUserServices.containsKey(userName) ? + ignoredUserServices.get(userName) : 0; + ignoredUserServices.put(userName, count + 1); + LOG.warn( + "Ignoring service {} for the user {} as it is already present," + + " filename = {}", service.getName(), userName, filename); + } + LOG.info("Added service {} for the user {}, filename = {}", + service.getName(), userName, filename); + } + } + } + } + + private Service getServiceDefinition(Path filePath) { + Service service = null; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Loading service definition from FS: " + filePath); + } + service = jsonSerDeser.load(fs, filePath); + } catch (IOException e) { + LOG.info("Error while loading service definition from FS: {}", e); + } + return service; + } + + private RemoteIterator list(Path path) throws IOException { + return new StoppableRemoteIterator(fs.listStatusIterator(path)); + } + + @VisibleForTesting Map getIgnoredUserServices() { + return ignoredUserServices; + } + + private class StoppableRemoteIterator implements RemoteIterator { + private final RemoteIterator remote; + + StoppableRemoteIterator(RemoteIterator remote) { + this.remote = remote; + } + + @Override public boolean hasNext() throws IOException { + return !stopExecutors.get() && remote.hasNext(); + } + + @Override public FileStatus next() throws IOException { + return remote.next(); + } + } + + @VisibleForTesting + Map> getSyncUserServices() { + return syncUserServices; + } + + @VisibleForTesting int getSkipCounter() { + return skipCounter; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java new file mode 100644 index 00000000000..27632f9956a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * Test class for system service manager. + */ +public class TestSystemServiceImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSystemServiceImpl.class); + private SystemServiceManagerImpl systemService; + private Configuration conf; + private String resourcePath = "users"; + + private String[] users = new String[] {"user1", "user2"}; + private static Map> loadedServices = new HashMap<>(); + private static Map> submittedServices = new HashMap<>(); + + @Before + public void setup() { + File file = new File( + getClass().getClassLoader().getResource(resourcePath).getFile()); + conf = new Configuration(); + conf.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY, + file.getAbsolutePath()); + systemService = new SystemServiceManagerImpl() { + @Override ServiceClient getServiceClient() { + return new TestServiceClient(); + } + }; + systemService.init(conf); // do not call explicit start + + constructUserService(users[0], "example-app1"); + constructUserService(users[1], "example-app1", "example-app2"); + } + + @After + public void teadDown() { + systemService.stop(); + } + + @Test + public void testSystemServiceSubmission() throws Exception { + systemService.start(); + + /* verify for ignored sevices count */ + Map ignoredUserServices = + systemService.getIgnoredUserServices(); + Assert.assertEquals(1, ignoredUserServices.size()); + Assert.assertTrue("User user1 doesn't exist.", + ignoredUserServices.containsKey(users[0])); + int count = ignoredUserServices.get(users[0]); + Assert.assertEquals(1, count); + Assert.assertEquals(1, systemService.getSkipCounter()); + + Map> userServices = + systemService.getSyncUserServices(); + Assert.assertEquals(loadedServices.size(), userServices.size()); + verifyForScannedUserServices(userServices); + + verifyForLaunchedUserServices(); + + // 2nd time launch service to handle if service exist scenario + systemService.launchUserService(userServices); + verifyForLaunchedUserServices(); + } + + private void verifyForScannedUserServices( + Map> userServices) { + for (String user : users) { + Set services = userServices.get(user); + Set serviceNames = loadedServices.get(user); + Assert.assertEquals(serviceNames.size(), services.size()); + Iterator iterator = services.iterator(); + while (iterator.hasNext()) { + Service next = iterator.next(); + Assert.assertTrue( + "Service name doesn't exist in expected " + "userService " + + serviceNames, serviceNames.contains(next.getName())); + } + } + } + + public void constructUserService(String user, String... serviceNames) { + Set service = loadedServices.get(user); + if (service == null) { + service = new HashSet<>(); + for (String serviceName : serviceNames) { + service.add(serviceName); + } + loadedServices.put(user, service); + } + } + + class TestServiceClient extends ServiceClient { + @Override + protected void serviceStart() throws Exception { + // do nothing + } + + @Override + protected void serviceStop() throws Exception { + // do nothing + } + + @Override + protected void serviceInit(Configuration configuration) + throws Exception { + // do nothing + } + + @Override + public ApplicationId actionCreate(Service service) + throws YarnException, IOException { + String userName = + UserGroupInformation.getCurrentUser().getShortUserName(); + Set services = submittedServices.get(userName); + if (services == null) { + services = new HashSet<>(); + submittedServices.put(userName, services); + } + if (services.contains(service.getName())) { + String message = "Failed to create service " + service.getName() + + ", because it already exists."; + throw new YarnException(message); + } + services.add(service.getName()); + return ApplicationId.newInstance(System.currentTimeMillis(), 1); + } + } + + private void verifyForLaunchedUserServices() { + Assert.assertEquals(loadedServices.size(), submittedServices.size()); + for (Map.Entry> entry : submittedServices.entrySet()) { + String user = entry.getKey(); + Set serviceSet = entry.getValue(); + Assert.assertTrue(loadedServices.containsKey(user)); + Set services = loadedServices.get(user); + Assert.assertEquals(services.size(), serviceSet.size()); + Assert.assertTrue(services.containsAll(serviceSet)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json new file mode 100644 index 00000000000..8a3a5612392 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json @@ -0,0 +1,16 @@ +{ + "name": "example-app3", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile new file mode 100644 index 00000000000..d8fd1d12fca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile @@ -0,0 +1,16 @@ +{ + "name": "example-app2", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java index 1506532239d..6002346fcaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -50,6 +50,8 @@ public class YarnServiceConf { public static final String ROLLING_LOG_INCLUSION_PATTERN = "yarn.service.rolling-log.include-pattern"; public static final String ROLLING_LOG_EXCLUSION_PATTERN = "yarn.service.rolling-log.exclude-pattern"; + public static final String YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY = + YARN_SERVICE_PREFIX + "system-service.dir"; /** * The yarn service base path: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java new file mode 100644 index 00000000000..dbff02f5aab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link ServiceManager}. + */ +public class TestSystemServiceManager { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testUpgrade() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testUpgrade"); + upgrade(serviceManager, "v2", false); + Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, + serviceManager.getServiceSpec().getState()); + } + + @Test + public void testRestartNothingToUpgrade() + throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testRestart"); + upgrade(serviceManager, "v2", false); + + //make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> { + comp.setState(ComponentState.STABLE); + }); + serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + Assert.assertEquals("service not re-started", ServiceState.STABLE, + serviceManager.getServiceSpec().getState()); + } + + @Test + public void testRestartWithPendingUpgrade() + throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testRestart"); + upgrade(serviceManager, "v2", true); + serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + Assert.assertEquals("service should still be upgrading", + ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + } + + + private void upgrade(ServiceManager service, String version, + boolean upgradeArtifact) + throws IOException, SliderException { + Service upgradedDef = ServiceTestUtils.createExampleApplication(); + upgradedDef.setName(service.getName()); + upgradedDef.setVersion(version); + if (upgradeArtifact) { + Artifact upgradedArtifact = createTestArtifact("2"); + upgradedDef.getComponents().forEach(component -> { + component.setArtifact(upgradedArtifact); + }); + } + writeUpgradedDef(upgradedDef); + ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); + upgradeEvent.setVersion("v2"); + service.handle(upgradeEvent); + } + + private ServiceManager createTestServiceManager(String name) + throws IOException { + ServiceContext context = new ServiceContext(); + context.service = createBaseDef(name); + context.fs = rule.getFs(); + + context.scheduler = new ServiceScheduler(context) { + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + }; + + context.scheduler.init(rule.getConf()); + + Map + componentState = context.scheduler.getAllComponents(); + context.service.getComponents().forEach(component -> { + componentState.put(component.getName(), + new org.apache.hadoop.yarn.service.component.Component(component, + 1L, context)); + }); + return new ServiceManager(context); + } + + static Service createBaseDef(String name) { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service serviceDef = ServiceTestUtils.createExampleApplication(); + serviceDef.setId(applicationId.toString()); + serviceDef.setName(name); + serviceDef.setState(ServiceState.STARTED); + Artifact artifact = createTestArtifact("1"); + + serviceDef.getComponents().forEach(component -> + component.setArtifact(artifact)); + return serviceDef; + } + + static Artifact createTestArtifact(String artifactId) { + Artifact artifact = new Artifact(); + artifact.setId(artifactId); + artifact.setType(Artifact.TypeEnum.TARBALL); + return artifact; + } + + private void writeUpgradedDef(Service upgradedDef) + throws IOException, SliderException { + Path upgradePath = rule.getFs().buildClusterUpgradeDirPath( + upgradedDef.getName(), upgradedDef.getVersion()); + ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath, + upgradedDef); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java new file mode 100644 index 00000000000..660f378cbaf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.service; + +/** + * Marker interface for starting services from RM. The implementation should + * launch configured services. + */ +public interface SystemServiceManager { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java new file mode 100644 index 00000000000..c448bab134d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.service contains service related + * classes. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable + +package org.apache.hadoop.yarn.server.service; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 733da5bd718..f5d84a395b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineC import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.service.SystemServiceManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -310,7 +311,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } rmContext.setYarnConfiguration(conf); - + createAndInitActiveServices(false); webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, @@ -486,6 +487,27 @@ public class ResourceManager extends CompositeService implements Recoverable { } } + protected SystemServiceManager createServiceManager() { + String schedulerClassName = + YarnConfiguration.DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS; + LOG.info("Using SystemServiceManager: " + schedulerClassName); + try { + Class schedulerClazz = Class.forName(schedulerClassName); + if (SystemServiceManager.class.isAssignableFrom(schedulerClazz)) { + return (SystemServiceManager) ReflectionUtils + .newInstance(schedulerClazz, this.conf); + } else { + throw new YarnRuntimeException( + "Class: " + schedulerClassName + " not instance of " + + SystemServiceManager.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate SystemServiceManager: " + schedulerClassName, + e); + } + } + protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(this.rmContext); } @@ -795,6 +817,12 @@ public class ResourceManager extends CompositeService implements Recoverable { new RMNMInfo(rmContext, scheduler); + if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, + false)) { + SystemServiceManager systemServiceManager = createServiceManager(); + addIfService(systemServiceManager); + } + super.serviceInit(conf); }