From 79bf5c084d4a063f1a933cae498af23edfd46792 Mon Sep 17 00:00:00 2001 From: Billie Rinaldi Date: Fri, 18 Aug 2017 09:01:59 -0700 Subject: [PATCH] YARN-7029. Add more UTs in yarn-native-services. Contributed by Jian He --- .../yarn/service/ContainerLaunchService.java | 2 +- .../hadoop/yarn/service/ServiceMaster.java | 42 +++- .../hadoop/yarn/service/ServiceMonitor.java | 14 +- .../hadoop/yarn/service/ServiceScheduler.java | 68 +++--- .../yarn/service/client/ClientAMProxy.java | 6 +- .../yarn/service/client/ServiceClient.java | 13 +- .../compinstance/ComponentInstance.java | 4 +- .../yarn/service/component/Component.java | 9 +- .../yarn/service/conf/YarnServiceConf.java | 51 ++++ .../service/conf/YarnServiceConfKeys.java | 27 --- .../provider/AbstractProviderService.java | 14 +- .../service/provider/ProviderService.java | 4 +- .../slider/core/launch/AbstractLauncher.java | 9 + .../server/appmaster/RoleLaunchService.java | 2 +- .../hadoop/yarn/service/MockServiceAM.java | 222 ++++++++++++++++++ .../hadoop/yarn/service/ServiceTestUtils.java | 53 +++++ .../yarn/service/TestYarnNativeServices.java | 60 ++--- .../servicemonitor/TestServiceMonitor.java | 131 +++++++++++ 18 files changed, 607 insertions(+), 124 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConfKeys.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/servicemonitor/TestServiceMonitor.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ContainerLaunchService.java index ac5285f569a..2037a3b247c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ContainerLaunchService.java @@ -87,7 +87,7 @@ public class ContainerLaunchService extends AbstractService{ AbstractLauncher launcher = new AbstractLauncher(fs, null); try { provider.buildContainerLaunchContext(launcher, application, - instance, fs); + instance, fs, getConfig()); instance.getComponent().getScheduler().getNmClient() .startContainerAsync(container, launcher.completeContainerLaunch()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java index 1ebd5628f91..c22dec4b225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java @@ -32,14 +32,17 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.service.client.params.SliderAMArgs; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.common.tools.SliderUtils; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.apache.slider.core.exceptions.BadClusterStateException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Map; public class ServiceMaster extends CompositeService { @@ -48,6 +51,7 @@ public class ServiceMaster extends CompositeService { LoggerFactory.getLogger(ServiceMaster.class); private static SliderAMArgs amArgs; + protected ServiceContext context; public ServiceMaster(String name) { super(name); @@ -62,17 +66,15 @@ public class ServiceMaster extends CompositeService { } LOG.info("Login user is {}", UserGroupInformation.getLoginUser()); - ServiceContext context = new ServiceContext(); - Path appDir = new Path(amArgs.getAppDefPath()).getParent(); + context = new ServiceContext(); + Path appDir = getAppDir(); SliderFileSystem fs = new SliderFileSystem(conf); context.fs = fs; fs.setAppDir(appDir); - context.application = ServiceApiUtil - .loadApplicationFrom(fs, new Path(amArgs.getAppDefPath())); - LOG.info(context.application.toString()); - ContainerId amContainerId = ContainerId.fromString(SliderUtils - .mandatoryEnvVariable( - ApplicationConstants.Environment.CONTAINER_ID.name())); + loadApplicationJson(context, fs); + + ContainerId amContainerId = getAMContainerId(); + ApplicationAttemptId attemptId = amContainerId.getApplicationAttemptId(); LOG.info("Application attemptId: " + attemptId); context.attemptId = attemptId; @@ -88,7 +90,7 @@ public class ServiceMaster extends CompositeService { context.clientAMService = clientAMService; addService(clientAMService); - ServiceScheduler scheduler = new ServiceScheduler(context); + ServiceScheduler scheduler = createServiceScheduler(context); addService(scheduler); context.scheduler = scheduler; @@ -98,6 +100,26 @@ public class ServiceMaster extends CompositeService { super.serviceInit(conf); } + protected ContainerId getAMContainerId() throws BadClusterStateException { + return ContainerId.fromString(SliderUtils.mandatoryEnvVariable( + ApplicationConstants.Environment.CONTAINER_ID.name())); + } + + protected Path getAppDir() { + return new Path(amArgs.getAppDefPath()).getParent(); + } + + protected ServiceScheduler createServiceScheduler(ServiceContext context) + throws IOException, YarnException { + return new ServiceScheduler(context); + } + + protected void loadApplicationJson(ServiceContext context, + SliderFileSystem fs) throws IOException { + context.application = ServiceApiUtil + .loadApplicationFrom(fs, new Path(amArgs.getAppDefPath())); + LOG.info(context.application.toString()); + } @Override protected void serviceStop() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMonitor.java index 82d768e4b5c..bc376149d55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMonitor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.slider.api.InternalKeys; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent; @@ -38,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.yarn.service.compinstance.ComponentInstanceState.RUNNING_BUT_UNREADY; import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX; import static org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType.BECOME_NOT_READY; import static org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType.BECOME_READY; @@ -51,6 +53,7 @@ public class ServiceMonitor extends AbstractService { public ScheduledExecutorService executorService; private Map liveInstances = null; private ServiceContext context; + private Configuration conf; public ServiceMonitor(String name, ServiceContext context) { super(name); @@ -61,14 +64,17 @@ public class ServiceMonitor extends AbstractService { @Override public void serviceInit(Configuration conf) throws Exception { executorService = Executors.newScheduledThreadPool(1); + this.conf = conf; super.serviceInit(conf); } @Override public void serviceStart() throws Exception { - long readinessCheckInterval = context.application.getConfiguration() - .getPropertyLong(InternalKeys.MONITOR_INTERVAL, - InternalKeys.DEFAULT_MONITOR_INTERVAL); + long readinessCheckInterval = + YarnServiceConf.getLong(InternalKeys.MONITOR_INTERVAL, + InternalKeys.DEFAULT_MONITOR_INTERVAL, + context.application.getConfiguration(), conf); + executorService .scheduleAtFixedRate(new ReadinessChecker(), readinessCheckInterval, readinessCheckInterval, TimeUnit.SECONDS); @@ -104,7 +110,7 @@ public class ServiceMonitor extends AbstractService { ProbeStatus status = instance.ping(); if (status.isSuccess()) { - if (instance.getState() != READY) { + if (instance.getState() == RUNNING_BUT_UNREADY) { // synchronously update the state. instance.handle( new ComponentInstanceEvent(entry.getKey(), BECOME_READY)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index bea2924efc4..590655f77af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -54,27 +54,26 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.conf.SliderKeys; +import org.apache.hadoop.yarn.service.metrics.ServiceMetrics; +import org.apache.hadoop.yarn.service.provider.ProviderUtils; +import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink; +import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.util.BoundedAppender; import org.apache.slider.api.RoleKeys; import org.apache.slider.api.ServiceApiConstants; import org.apache.slider.api.resource.Application; import org.apache.slider.api.resource.ConfigFile; -import org.apache.hadoop.yarn.service.conf.SliderKeys; -import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.registry.info.CustomRegistryConstants; import org.apache.slider.core.zk.ZKIntegration; -import org.apache.hadoop.yarn.service.provider.ProviderUtils; -import org.apache.hadoop.yarn.service.metrics.ServiceMetrics; -import org.apache.hadoop.yarn.service.component.ComponentEvent; -import org.apache.hadoop.yarn.service.component.ComponentEventType; -import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent; -import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType; -import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; -import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink; import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +81,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; +import java.text.MessageFormat; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -93,8 +93,8 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.registry.client.api.RegistryConstants.*; -import static org.apache.slider.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; +import static org.apache.slider.api.ServiceApiConstants.*; /** * @@ -110,7 +110,7 @@ public class ServiceScheduler extends CompositeService { new ConcurrentHashMap<>(); // id - > component - private final Map componentsById = + protected final Map componentsById = new ConcurrentHashMap<>(); private final Map liveInstances = @@ -151,6 +151,8 @@ public class ServiceScheduler extends CompositeService { RegistryOperations registryClient = RegistryOperationsFactory .createInstance("ServiceScheduler", configuration); addIfService(registryClient); + yarnRegistryOperations = + createYarnRegistryOperations(context, registryClient); // register metrics serviceMetrics = ServiceMetrics @@ -158,11 +160,10 @@ public class ServiceScheduler extends CompositeService { serviceMetrics.tag("type", "Metrics type [component or service]", "service"); serviceMetrics.tag("appId", "Application id for service", app.getId()); - amRMClient = - AMRMClientAsync.createAMRMClientAsync(1000, new AMRMClientCallback()); + amRMClient = createAMRMClient(); addIfService(amRMClient); - nmClient = NMClientAsync.createNMClientAsync(new NMClientCallback()); + nmClient = createNMClient(); addIfService(nmClient); dispatcher = new AsyncDispatcher("Component dispatcher"); @@ -191,10 +192,6 @@ public class ServiceScheduler extends CompositeService { LOG.info("Timeline v2 is enabled."); } - yarnRegistryOperations = - new YarnRegistryViewForProviders(registryClient, - RegistryUtils.currentUser(), SliderKeys.APP_TYPE, app.getName(), - context.attemptId); initGlobalTokensForSubstitute(context); //substitute quicklinks ProviderUtils.substituteMapWithTokens(app.getQuicklinks(), globalTokens); @@ -203,6 +200,22 @@ public class ServiceScheduler extends CompositeService { createAllComponents(); } + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return new YarnRegistryViewForProviders(registryClient, + RegistryUtils.currentUser(), SliderKeys.APP_TYPE, app.getName(), + context.attemptId); + } + + protected NMClientAsync createNMClient() { + return NMClientAsync.createNMClientAsync(new NMClientCallback()); + } + + protected AMRMClientAsync createAMRMClient() { + return AMRMClientAsync + .createAMRMClientAsync(1000, new AMRMClientCallback()); + } + @Override public void serviceInit(Configuration conf) throws Exception { try { @@ -323,7 +336,7 @@ public class ServiceScheduler extends CompositeService { context.configCache = configFileCache; } - private void registerServiceInstance(ApplicationAttemptId attemptId, + protected void registerServiceInstance(ApplicationAttemptId attemptId, Application application) throws IOException { LOG.info("Registering " + attemptId + ", " + application.getName() + " into registry"); @@ -413,8 +426,9 @@ public class ServiceScheduler extends CompositeService { try { component.handle(event); } catch (Throwable t) { - LOG.error("Error in handling event type " + event.getType() - + " for component " + event.getName(), t); + LOG.error(MessageFormat + .format("[COMPONENT {0}]: Error in handling event type {1}", + component.getName(), event.getType()), t); } } } @@ -432,13 +446,13 @@ public class ServiceScheduler extends CompositeService { try { instance.handle(event); } catch (Throwable t) { - LOG.error("Error in handling event type " + event.getType() - + " for component instance " + instance.getCompInstanceId(), t); + LOG.error(instance.getCompInstanceId() + + ": Error in handling event type " + event.getType(), t); } } } - private class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler { + class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler { @Override public void onContainersAllocated(List containers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java index dbc1f51d7fd..0749077f63e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java @@ -24,7 +24,7 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.client.ServerProxy; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.service.conf.YarnServiceConfKeys; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import java.net.InetSocketAddress; @@ -35,8 +35,8 @@ public class ClientAMProxy extends ServerProxy{ final YarnRPC rpc, final InetSocketAddress serverAddress) { RetryPolicy retryPolicy = - createRetryPolicy(conf, YarnServiceConfKeys.CLIENT_AM_RETRY_MAX_WAIT_MS, - 15 * 60 * 1000, YarnServiceConfKeys.CLIENT_AM_RETRY_MAX_INTERVAL_MS, + createRetryPolicy(conf, YarnServiceConf.CLIENT_AM_RETRY_MAX_WAIT_MS, + 15 * 60 * 1000, YarnServiceConf.CLIENT_AM_RETRY_MAX_INTERVAL_MS, 2 * 1000); Configuration confClone = new Configuration(conf); confClone.setInt( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 3d026034a9b..0ed4860b6c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.service.client.params.CommonArgs; import org.apache.hadoop.yarn.service.conf.SliderExitCodes; import org.apache.hadoop.yarn.service.conf.SliderKeys; import org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.util.Records; @@ -487,10 +488,14 @@ public class ServiceClient extends CompositeService // create AM CLI String cmdStr = buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j); - - submissionContext.setResource(Resource.newInstance( - conf.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM), 1)); - submissionContext.setQueue(conf.get(KEY_YARN_QUEUE, app.getQueue())); + submissionContext.setResource(Resource.newInstance(YarnServiceConf + .getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM, + app.getConfiguration(), conf), 1)); + String queue = app.getQueue(); + if (StringUtils.isEmpty(queue)) { + queue = conf.get(KEY_YARN_QUEUE, "default"); + } + submissionContext.setQueue(queue); submissionContext.setApplicationName(appName); submissionContext.setApplicationType(SliderKeys.APP_TYPE); Set appTags = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/compinstance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/compinstance/ComponentInstance.java index aeef4fc84be..dcb455f31e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/compinstance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/compinstance/ComponentInstance.java @@ -263,8 +263,8 @@ public class ComponentInstance implements EventHandler, try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { - LOG.error("Invalid event " + event.getType() + - " at " + oldState + " for component instance " + compInstanceId, e); + LOG.error(getCompInstanceId() + ": Invalid event " + event.getType() + + " at " + oldState, e); } if (oldState != getState()) { LOG.info(getCompInstanceId() + " Transitioned from " + oldState + " to " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index a4a0a15d6f9..bfe40c0524c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -45,6 +45,7 @@ import org.apache.slider.server.servicemonitor.Probe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -436,6 +437,10 @@ public class Component implements EventHandler { return priority; } + public long getAllocateId() { + return allocateId; + } + public String getName () { return componentSpec.getName(); } @@ -461,8 +466,8 @@ public class Component implements EventHandler { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { - LOG.error("Invalid event " + event.getType() + - " at " + oldState + " for component " + componentSpec.getName(), e); + LOG.error(MessageFormat.format("[COMPONENT {0}]: Invalid event {1} at {2}", + componentSpec.getName(), event.getType(), oldState), e); } if (oldState != getState()) { LOG.info("[COMPONENT {}] Transitioned from {} to {} on {} event.", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java new file mode 100644 index 00000000000..9225570c89a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.conf; + +import org.apache.slider.api.resource.Configuration; + +public class YarnServiceConf { + + // Retry settings for the ServiceClient to talk to Service AppMaster + public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms"; + public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms"; + + // Retry settings for container failures + public static final String CONTAINER_RETRY_MAX = "yarn.service.container-failure.retry.max"; + public static final String CONTAINER_RETRY_INTERVAL = "yarn.service.container-failure.retry-interval"; + + /** + * Get long value for the property + * @param name name of the property + * @param defaultValue default value of the property, if it is not defined in + * userConf and systemConf. + * @param userConf Configuration provided by client in the JSON definition + * @param systemConf The YarnConfiguration in the system. + * @return long value for the property + */ + public static long getLong(String name, long defaultValue, + Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) { + return userConf.getPropertyLong(name, systemConf.getLong(name, defaultValue)); + } + + public static int getInt(String name, int defaultValue, + Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) { + return userConf.getPropertyInt(name, systemConf.getInt(name, defaultValue)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConfKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConfKeys.java deleted file mode 100644 index 4fda686edb0..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConfKeys.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.service.conf; - -public interface YarnServiceConfKeys { - - // Retry settings for the ServiceClient to talk to Service AppMaster - String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms"; - String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms"; - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java index 472ee210206..6f9f5175eeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.yarn.service.provider; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.slider.api.resource.Application; import org.apache.slider.api.resource.Component; import org.apache.hadoop.yarn.service.conf.SliderKeys; @@ -35,6 +37,8 @@ import java.io.IOException; import java.util.Map; import java.util.Map.Entry; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_RETRY_INTERVAL; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_RETRY_MAX; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.$; public abstract class AbstractProviderService implements ProviderService, @@ -50,7 +54,8 @@ public abstract class AbstractProviderService implements ProviderService, public void buildContainerLaunchContext(AbstractLauncher launcher, Application application, ComponentInstance instance, - SliderFileSystem fileSystem) throws IOException, SliderException { + SliderFileSystem fileSystem, Configuration yarnConf) + throws IOException, SliderException { Component component = instance.getComponent().getComponentSpec();; processArtifact(launcher, instance, fileSystem, application); @@ -93,5 +98,12 @@ public abstract class AbstractProviderService implements ProviderService, operation.add(launchCommand); operation.addOutAndErrFiles(OUT_FILE, ERR_FILE); launcher.addCommand(operation.build()); + + // By default retry forever every 30 seconds + launcher.setRetryContext(YarnServiceConf + .getInt(CONTAINER_RETRY_MAX, -1, application.getConfiguration(), + yarnConf), YarnServiceConf + .getInt(CONTAINER_RETRY_INTERVAL, 30000, application.getConfiguration(), + yarnConf)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java index a28c3b81e3f..306620da22b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.provider; +import org.apache.hadoop.conf.Configuration; import org.apache.slider.api.resource.Application; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.core.exceptions.SliderException; @@ -33,5 +34,6 @@ public interface ProviderService { */ void buildContainerLaunchContext(AbstractLauncher containerLauncher, Application application, ComponentInstance instance, - SliderFileSystem sliderFileSystem) throws IOException, SliderException; + SliderFileSystem sliderFileSystem, Configuration yarnConf) + throws IOException, SliderException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java index 55ffbf7d723..a3e1bf29402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.service.conf.SliderKeys; @@ -186,6 +188,13 @@ public class AbstractLauncher { return containerLaunchContext; } + public void setRetryContext(int maxRetries, int retryInterval) { + ContainerRetryContext retryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, maxRetries, + retryInterval); + containerLaunchContext.setContainerRetryContext(retryContext); + } + /** * Dump local resources at debug level */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java index a03d4abbdcf..d96d13e17e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java @@ -138,7 +138,7 @@ public class RoleLaunchService ProviderService provider = ProviderFactory.getProviderService( compSpec.getArtifact()); provider.buildContainerLaunchContext(containerLauncher, application, - instance, fs); + instance, fs, getConfig()); long delay = compSpec.getConfiguration() .getPropertyLong(KEY_CONTAINER_LAUNCH_DELAY, 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java new file mode 100644 index 00000000000..9746d339da5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import com.google.common.base.Supplier; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.*; +import org.apache.hadoop.yarn.proto.ClientAMProtocol; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentState; +import org.apache.slider.api.resource.Application; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Mockito.mock; + +public class MockServiceAM extends ServiceMaster { + + Application application; + // The list of containers fed by tests to be returned on + // AMRMClientCallBackHandler#onContainersAllocated + final List feedContainers = + Collections.synchronizedList(new LinkedList<>()); + + public MockServiceAM(Application application) { + super(application.getName()); + this.application = application; + } + + + @Override + protected ContainerId getAMContainerId() + throws BadClusterStateException { + return ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.fromString(application.getId()), 1), 1); + } + + @Override + protected Path getAppDir() { + Path path = new Path(new Path("target", "apps"), application.getName()); + System.out.println("Application path: " + path); + return path; + } + + @Override + protected ServiceScheduler createServiceScheduler(ServiceContext context) + throws IOException, YarnException { + return new ServiceScheduler(context) { + + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + + @Override + protected AMRMClientAsync createAMRMClient() { + AMRMClientImpl client1 = new AMRMClientImpl() { + @Override public AllocateResponse allocate(float progressIndicator) + throws YarnException, IOException { + + AllocateResponse.AllocateResponseBuilder builder = + AllocateResponse.newBuilder(); + synchronized (feedContainers) { + if (feedContainers.isEmpty()) { + System.out.println("Allocating........ no containers"); + return builder.build(); + } else { + // The AMRMClient will return containers for compoenent that are + // at FLEXING state + List allocatedContainers = new LinkedList<>(); + Iterator itor = feedContainers.iterator(); + while (itor.hasNext()) { + Container c = itor.next(); + org.apache.hadoop.yarn.service.component.Component component = + componentsById.get(c.getAllocationRequestId()); + if (component.getState() == ComponentState.FLEXING) { + System.out.println("Allocated container " + c.getId()); + allocatedContainers.add(c); + itor.remove(); + } + } + return builder.allocatedContainers(allocatedContainers).build(); + } + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl) { + return mock(RegisterApplicationMasterResponse.class); + } + + @Override public void unregisterApplicationMaster( + FinalApplicationStatus appStatus, String appMessage, + String appTrackingUrl) { + // DO nothing + } + }; + + return AMRMClientAsync + .createAMRMClientAsync(client1, 1000, + this.new AMRMClientCallback()); + } + + @Override + public NMClientAsync createNMClient() { + NMClientAsync nmClientAsync = super.createNMClient(); + nmClientAsync.setClient(mock(NMClient.class)); + return nmClientAsync; + } + }; + } + + @Override protected void loadApplicationJson(ServiceContext context, + SliderFileSystem fs) throws IOException { + context.application = application; + } + + /** + * + * @param application The application for the component + * @param id The id for the container + * @param compName The component to which the container is fed + * @return + */ + public Container feedContainerToComp(Application application, int id, + String compName) { + ApplicationId applicationId = ApplicationId.fromString(application.getId()); + ContainerId containerId = ContainerId + .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id); + NodeId nodeId = NodeId.newInstance("localhost", 1234); + Container container = Container + .newInstance(containerId, nodeId, "localhost", + Resource.newInstance(100, 1), Priority.newInstance(0), null); + + long allocateId = + context.scheduler.getAllComponents().get(compName).getAllocateId(); + container.setAllocationRequestId(allocateId); + synchronized (feedContainers) { + feedContainers.add(container); + } + return container; + } + + public void flexComponent(String compName, long numberOfContainers) + throws IOException { + ClientAMProtocol.ComponentCountProto componentCountProto = + ClientAMProtocol.ComponentCountProto.newBuilder().setName(compName) + .setNumberOfContainers(numberOfContainers).build(); + ClientAMProtocol.FlexComponentsRequestProto requestProto = + ClientAMProtocol.FlexComponentsRequestProto.newBuilder() + .addComponents(componentCountProto).build(); + context.clientAMService.flexComponents(requestProto); + } + + public Component getComponent(String compName) { + return context.scheduler.getAllComponents().get(compName); + } + + public void waitForDependenciesSatisfied(String compName) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return context.scheduler.getAllComponents().get(compName) + .areDependenciesReady(); + } + }, 1000, 20000); + } + + public void waitForNumDesiredContainers(String compName, + int numDesiredContainers) throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return context.scheduler.getAllComponents().get(compName) + .getNumDesiredInstances() == numDesiredContainers; + } + }, 1000, 20000); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java new file mode 100644 index 00000000000..ea75a9052af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.Component; +import org.apache.slider.api.resource.Resource; + +public class ServiceTestUtils { + + // Example service definition + // 2 components, each of which has 2 containers. + protected Application createExampleApplication() { + Application exampleApp = new Application(); + exampleApp.setName("example-app"); + exampleApp.addComponent(createComponent("compa")); + exampleApp.addComponent(createComponent("compb")); + return exampleApp; + } + + protected Component createComponent(String name) { + return createComponent(name, 2L, "sleep 1000"); + } + + protected Component createComponent(String name, long numContainers, + String command) { + Component comp1 = new Component(); + comp1.setNumberOfContainers(numContainers); + comp1.setLaunchCommand(command); + comp1.setName(name); + Resource resource = new Resource(); + comp1.setResource(resource); + resource.setMemory("128"); + resource.setCpus(1); + return comp1; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 45be54d47bd..28105b29ffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.base.Supplier; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,14 +36,15 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.slider.api.InternalKeys; import org.apache.slider.api.resource.Application; import org.apache.slider.api.resource.Component; import org.apache.slider.api.resource.Container; import org.apache.slider.api.resource.ContainerState; -import org.apache.slider.api.resource.Resource; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.core.exceptions.SliderException; import org.junit.After; @@ -59,9 +61,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,8 +70,7 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; import static org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys.KEY_AM_RESOURCE_MEM; import static org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys.KEY_SLIDER_BASE_PATH; @@ -79,7 +78,7 @@ import static org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys.KEY_SLIDER_B * End to end tests to test deploying services with MiniYarnCluster and a in-JVM * ZK testing cluster. */ -public class TestYarnNativeServices { +public class TestYarnNativeServices extends ServiceTestUtils{ private static final Log LOG = LogFactory.getLog(TestYarnNativeServices.class); @@ -118,14 +117,16 @@ public class TestYarnNativeServices { LinuxResourceCalculatorPlugin.class.getName()); conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, ProcfsBasedProcessTree.class.getName()); - conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); conf.setBoolean( YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true); conf.setBoolean(TIMELINE_SERVICE_ENABLED, false); conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100); conf.setLong(DEBUG_NM_DELETE_DELAY_SEC, 60000); - conf.setLong(KEY_AM_RESOURCE_MEM, 128); + conf.setLong(KEY_AM_RESOURCE_MEM, 526); + conf.setLong(InternalKeys.MONITOR_INTERVAL, 5); + // Disable vmem check to disallow NM killing the container + conf.setBoolean(NM_VMEM_CHECK_ENABLED, false); + conf.setBoolean(NM_PMEM_CHECK_ENABLED, false); // setup zk cluster TestingCluster zkCluster; zkCluster = new TestingCluster(1); @@ -233,11 +234,16 @@ public class TestYarnNativeServices { // 4. Flex up each component to 2 containers and check the component instance names // 5. Stop the service // 6. Destroy the service - @Test (timeout = 500000) + @Test (timeout = 200000) public void testCreateFlexStopDestroyService() throws Exception { ServiceClient client = createClient(); Application exampleApp = createExampleApplication(); client.actionCreate(exampleApp); + SliderFileSystem fileSystem = new SliderFileSystem(conf); + Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName()); + // check app.json is persisted. + Assert.assertTrue( + fs.exists(new Path(appDir, exampleApp.getName() + ".json"))); waitForAllCompToBeReady(client, exampleApp); // Flex two components, each from 2 container to 3 containers. @@ -272,8 +278,6 @@ public class TestYarnNativeServices { LOG.info("Destroy the service"); //destroy the service and check the app dir is deleted from fs. client.actionDestroy(exampleApp.getName()); - SliderFileSystem fileSystem = new SliderFileSystem(conf); - Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName()); // check the application dir on hdfs (in this case, local fs) are deleted. Assert.assertFalse(fs.exists(appDir)); } @@ -281,7 +285,7 @@ public class TestYarnNativeServices { // Create compa with 2 containers // Create compb with 2 containers which depends on compa // Check containers for compa started before containers for compb - @Test (timeout = 500000) + @Test (timeout = 200000) public void testComponentStartOrder() throws Exception { ServiceClient client = createClient(); Application exampleApp = new Application(); @@ -400,7 +404,7 @@ public class TestYarnNativeServices { e.printStackTrace(); return false; } - }, 5000, 200000); + }, 2000, 200000); } // wait until all the containers for all components become ready state @@ -442,7 +446,7 @@ public class TestYarnNativeServices { e.printStackTrace(); return false; } - }, 5000, 900000); + }, 2000, 200000); } private ServiceClient createClient() throws Exception { @@ -467,30 +471,4 @@ public class TestYarnNativeServices { } return totalContainers; } - // Example service definition - // 2 components, each of which has 2 containers. - private Application createExampleApplication() { - Application exampleApp = new Application(); - exampleApp.setName("example-app"); - exampleApp.addComponent(createComponent("compa")); - exampleApp.addComponent(createComponent("compb")); - return exampleApp; - } - - private Component createComponent(String name) { - return createComponent(name, 2L, "sleep 1000"); - } - - private Component createComponent(String name, long numContainers, - String command) { - Component comp1 = new Component(); - comp1.setNumberOfContainers(numContainers); - comp1.setLaunchCommand(command); - comp1.setName(name); - Resource resource = new Resource(); - comp1.setResource(resource); - resource.setMemory("128"); - resource.setCpus(1); - return comp1; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/servicemonitor/TestServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/servicemonitor/TestServiceMonitor.java new file mode 100644 index 00000000000..db83cb6d2f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/hadoop/yarn/service/servicemonitor/TestServiceMonitor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.service.servicemonitor; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.service.MockServiceAM; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.apache.slider.api.InternalKeys; +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.Component; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Matchers.anyFloat; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; + +public class TestServiceMonitor extends ServiceTestUtils { + + private File basedir; + YarnConfiguration conf = new YarnConfiguration(); + + @Before + public void setup() throws Exception { + basedir = new File("target", "apps"); + if (basedir.exists()) { + FileUtils.deleteDirectory(basedir); + } else { + basedir.mkdirs(); + } + conf.setLong(InternalKeys.MONITOR_INTERVAL, 2); + } + + @After + public void tearDown() throws IOException { + if (basedir != null) { + FileUtils.deleteDirectory(basedir); + } + } + + // Create compa with 1 container + // Create compb with 1 container + // Verify compb dependency satisfied + // Increase compa to 2 containers + // Verify compb dependency becomes unsatisfied. + @Test + public void testComponentDependency() throws Exception{ + ApplicationId applicationId = ApplicationId.newInstance(123456, 1); + Application exampleApp = new Application(); + exampleApp.setId(applicationId.toString()); + exampleApp.setName("testComponentDependency"); + exampleApp.addComponent(createComponent("compa", 1, "sleep 1000")); + Component compb = createComponent("compb", 1, "sleep 1000"); + + // Let compb depends on compa; + compb.setDependencies(Collections.singletonList("compa")); + exampleApp.addComponent(compb); + + MockServiceAM am = new MockServiceAM(exampleApp); + am.init(conf); + am.start(); + + // compa ready + Assert.assertTrue(am.getComponent("compa").areDependenciesReady()); + //compb not ready + Assert.assertFalse(am.getComponent("compb").areDependenciesReady()); + + // feed 1 container to compa, + am.feedContainerToComp(exampleApp, 1, "compa"); + // waiting for compb's dependencies are satisfied + am.waitForDependenciesSatisfied("compb"); + + // feed 1 container to compb + am.feedContainerToComp(exampleApp, 2, "compb"); + am.flexComponent("compa", 2); + am.waitForNumDesiredContainers("compa", 2); + + // compb dependencies not satisfied again. + Assert.assertFalse(am.getComponent("compb").areDependenciesReady()); + am.stop(); + } +}