YARN-7029. Add more UTs in yarn-native-services. Contributed by Jian He

This commit is contained in:
Billie Rinaldi 2017-08-18 09:01:59 -07:00 committed by Jian He
parent 1888318c89
commit 79bf5c084d
18 changed files with 607 additions and 124 deletions

View File

@ -87,7 +87,7 @@ public class ContainerLaunchService extends AbstractService{
AbstractLauncher launcher = new AbstractLauncher(fs, null);
try {
provider.buildContainerLaunchContext(launcher, application,
instance, fs);
instance, fs, getConfig());
instance.getComponent().getScheduler().getNmClient()
.startContainerAsync(container,
launcher.completeContainerLaunch());

View File

@ -32,14 +32,17 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.service.client.params.SliderAMArgs;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
public class ServiceMaster extends CompositeService {
@ -48,6 +51,7 @@ public class ServiceMaster extends CompositeService {
LoggerFactory.getLogger(ServiceMaster.class);
private static SliderAMArgs amArgs;
protected ServiceContext context;
public ServiceMaster(String name) {
super(name);
@ -62,17 +66,15 @@ public class ServiceMaster extends CompositeService {
}
LOG.info("Login user is {}", UserGroupInformation.getLoginUser());
ServiceContext context = new ServiceContext();
Path appDir = new Path(amArgs.getAppDefPath()).getParent();
context = new ServiceContext();
Path appDir = getAppDir();
SliderFileSystem fs = new SliderFileSystem(conf);
context.fs = fs;
fs.setAppDir(appDir);
context.application = ServiceApiUtil
.loadApplicationFrom(fs, new Path(amArgs.getAppDefPath()));
LOG.info(context.application.toString());
ContainerId amContainerId = ContainerId.fromString(SliderUtils
.mandatoryEnvVariable(
ApplicationConstants.Environment.CONTAINER_ID.name()));
loadApplicationJson(context, fs);
ContainerId amContainerId = getAMContainerId();
ApplicationAttemptId attemptId = amContainerId.getApplicationAttemptId();
LOG.info("Application attemptId: " + attemptId);
context.attemptId = attemptId;
@ -88,7 +90,7 @@ public class ServiceMaster extends CompositeService {
context.clientAMService = clientAMService;
addService(clientAMService);
ServiceScheduler scheduler = new ServiceScheduler(context);
ServiceScheduler scheduler = createServiceScheduler(context);
addService(scheduler);
context.scheduler = scheduler;
@ -98,6 +100,26 @@ public class ServiceMaster extends CompositeService {
super.serviceInit(conf);
}
protected ContainerId getAMContainerId() throws BadClusterStateException {
return ContainerId.fromString(SliderUtils.mandatoryEnvVariable(
ApplicationConstants.Environment.CONTAINER_ID.name()));
}
protected Path getAppDir() {
return new Path(amArgs.getAppDefPath()).getParent();
}
protected ServiceScheduler createServiceScheduler(ServiceContext context)
throws IOException, YarnException {
return new ServiceScheduler(context);
}
protected void loadApplicationJson(ServiceContext context,
SliderFileSystem fs) throws IOException {
context.application = ServiceApiUtil
.loadApplicationFrom(fs, new Path(amArgs.getAppDefPath()));
LOG.info(context.application.toString());
}
@Override
protected void serviceStop() throws Exception {

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.slider.api.InternalKeys;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent;
@ -38,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.service.compinstance.ComponentInstanceState.RUNNING_BUT_UNREADY;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
import static org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType.BECOME_NOT_READY;
import static org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType.BECOME_READY;
@ -51,6 +53,7 @@ public class ServiceMonitor extends AbstractService {
public ScheduledExecutorService executorService;
private Map<ContainerId, ComponentInstance> liveInstances = null;
private ServiceContext context;
private Configuration conf;
public ServiceMonitor(String name, ServiceContext context) {
super(name);
@ -61,14 +64,17 @@ public class ServiceMonitor extends AbstractService {
@Override
public void serviceInit(Configuration conf) throws Exception {
executorService = Executors.newScheduledThreadPool(1);
this.conf = conf;
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
long readinessCheckInterval = context.application.getConfiguration()
.getPropertyLong(InternalKeys.MONITOR_INTERVAL,
InternalKeys.DEFAULT_MONITOR_INTERVAL);
long readinessCheckInterval =
YarnServiceConf.getLong(InternalKeys.MONITOR_INTERVAL,
InternalKeys.DEFAULT_MONITOR_INTERVAL,
context.application.getConfiguration(), conf);
executorService
.scheduleAtFixedRate(new ReadinessChecker(), readinessCheckInterval,
readinessCheckInterval, TimeUnit.SECONDS);
@ -104,7 +110,7 @@ public class ServiceMonitor extends AbstractService {
ProbeStatus status = instance.ping();
if (status.isSuccess()) {
if (instance.getState() != READY) {
if (instance.getState() == RUNNING_BUT_UNREADY) {
// synchronously update the state.
instance.handle(
new ComponentInstanceEvent(entry.getKey(), BECOME_READY));

View File

@ -54,27 +54,26 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.conf.SliderKeys;
import org.apache.hadoop.yarn.service.metrics.ServiceMetrics;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.api.ServiceApiConstants;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.ConfigFile;
import org.apache.hadoop.yarn.service.conf.SliderKeys;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.core.zk.ZKIntegration;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.metrics.ServiceMetrics;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -82,6 +81,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -93,8 +93,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
import static org.apache.slider.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.slider.api.ServiceApiConstants.*;
/**
*
@ -110,7 +110,7 @@ public class ServiceScheduler extends CompositeService {
new ConcurrentHashMap<>();
// id - > component
private final Map<Long, Component> componentsById =
protected final Map<Long, Component> componentsById =
new ConcurrentHashMap<>();
private final Map<ContainerId, ComponentInstance> liveInstances =
@ -151,6 +151,8 @@ public class ServiceScheduler extends CompositeService {
RegistryOperations registryClient = RegistryOperationsFactory
.createInstance("ServiceScheduler", configuration);
addIfService(registryClient);
yarnRegistryOperations =
createYarnRegistryOperations(context, registryClient);
// register metrics
serviceMetrics = ServiceMetrics
@ -158,11 +160,10 @@ public class ServiceScheduler extends CompositeService {
serviceMetrics.tag("type", "Metrics type [component or service]", "service");
serviceMetrics.tag("appId", "Application id for service", app.getId());
amRMClient =
AMRMClientAsync.createAMRMClientAsync(1000, new AMRMClientCallback());
amRMClient = createAMRMClient();
addIfService(amRMClient);
nmClient = NMClientAsync.createNMClientAsync(new NMClientCallback());
nmClient = createNMClient();
addIfService(nmClient);
dispatcher = new AsyncDispatcher("Component dispatcher");
@ -191,10 +192,6 @@ public class ServiceScheduler extends CompositeService {
LOG.info("Timeline v2 is enabled.");
}
yarnRegistryOperations =
new YarnRegistryViewForProviders(registryClient,
RegistryUtils.currentUser(), SliderKeys.APP_TYPE, app.getName(),
context.attemptId);
initGlobalTokensForSubstitute(context);
//substitute quicklinks
ProviderUtils.substituteMapWithTokens(app.getQuicklinks(), globalTokens);
@ -203,6 +200,22 @@ public class ServiceScheduler extends CompositeService {
createAllComponents();
}
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return new YarnRegistryViewForProviders(registryClient,
RegistryUtils.currentUser(), SliderKeys.APP_TYPE, app.getName(),
context.attemptId);
}
protected NMClientAsync createNMClient() {
return NMClientAsync.createNMClientAsync(new NMClientCallback());
}
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
return AMRMClientAsync
.createAMRMClientAsync(1000, new AMRMClientCallback());
}
@Override
public void serviceInit(Configuration conf) throws Exception {
try {
@ -323,7 +336,7 @@ public class ServiceScheduler extends CompositeService {
context.configCache = configFileCache;
}
private void registerServiceInstance(ApplicationAttemptId attemptId,
protected void registerServiceInstance(ApplicationAttemptId attemptId,
Application application) throws IOException {
LOG.info("Registering " + attemptId + ", " + application.getName()
+ " into registry");
@ -413,8 +426,9 @@ public class ServiceScheduler extends CompositeService {
try {
component.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for component " + event.getName(), t);
LOG.error(MessageFormat
.format("[COMPONENT {0}]: Error in handling event type {1}",
component.getName(), event.getType()), t);
}
}
}
@ -432,13 +446,13 @@ public class ServiceScheduler extends CompositeService {
try {
instance.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for component instance " + instance.getCompInstanceId(), t);
LOG.error(instance.getCompInstanceId() +
": Error in handling event type " + event.getType(), t);
}
}
}
private class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler {
class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler {
@Override
public void onContainersAllocated(List<Container> containers) {

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.ServerProxy;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.conf.YarnServiceConfKeys;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import java.net.InetSocketAddress;
@ -35,8 +35,8 @@ public class ClientAMProxy extends ServerProxy{
final YarnRPC rpc, final InetSocketAddress serverAddress) {
RetryPolicy retryPolicy =
createRetryPolicy(conf, YarnServiceConfKeys.CLIENT_AM_RETRY_MAX_WAIT_MS,
15 * 60 * 1000, YarnServiceConfKeys.CLIENT_AM_RETRY_MAX_INTERVAL_MS,
createRetryPolicy(conf, YarnServiceConf.CLIENT_AM_RETRY_MAX_WAIT_MS,
15 * 60 * 1000, YarnServiceConf.CLIENT_AM_RETRY_MAX_INTERVAL_MS,
2 * 1000);
Configuration confClone = new Configuration(conf);
confClone.setInt(

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.service.client.params.CommonArgs;
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
import org.apache.hadoop.yarn.service.conf.SliderKeys;
import org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.util.Records;
@ -487,10 +488,14 @@ public class ServiceClient extends CompositeService
// create AM CLI
String cmdStr =
buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j);
submissionContext.setResource(Resource.newInstance(
conf.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM), 1));
submissionContext.setQueue(conf.get(KEY_YARN_QUEUE, app.getQueue()));
submissionContext.setResource(Resource.newInstance(YarnServiceConf
.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM,
app.getConfiguration(), conf), 1));
String queue = app.getQueue();
if (StringUtils.isEmpty(queue)) {
queue = conf.get(KEY_YARN_QUEUE, "default");
}
submissionContext.setQueue(queue);
submissionContext.setApplicationName(appName);
submissionContext.setApplicationType(SliderKeys.APP_TYPE);
Set<String> appTags =

View File

@ -263,8 +263,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Invalid event " + event.getType() +
" at " + oldState + " for component instance " + compInstanceId, e);
LOG.error(getCompInstanceId() + ": Invalid event " + event.getType() +
" at " + oldState, e);
}
if (oldState != getState()) {
LOG.info(getCompInstanceId() + " Transitioned from " + oldState + " to "

View File

@ -45,6 +45,7 @@ import org.apache.slider.server.servicemonitor.Probe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@ -436,6 +437,10 @@ public class Component implements EventHandler<ComponentEvent> {
return priority;
}
public long getAllocateId() {
return allocateId;
}
public String getName () {
return componentSpec.getName();
}
@ -461,8 +466,8 @@ public class Component implements EventHandler<ComponentEvent> {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Invalid event " + event.getType() +
" at " + oldState + " for component " + componentSpec.getName(), e);
LOG.error(MessageFormat.format("[COMPONENT {0}]: Invalid event {1} at {2}",
componentSpec.getName(), event.getType(), oldState), e);
}
if (oldState != getState()) {
LOG.info("[COMPONENT {}] Transitioned from {} to {} on {} event.",

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.conf;
import org.apache.slider.api.resource.Configuration;
public class YarnServiceConf {
// Retry settings for the ServiceClient to talk to Service AppMaster
public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms";
public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms";
// Retry settings for container failures
public static final String CONTAINER_RETRY_MAX = "yarn.service.container-failure.retry.max";
public static final String CONTAINER_RETRY_INTERVAL = "yarn.service.container-failure.retry-interval";
/**
* Get long value for the property
* @param name name of the property
* @param defaultValue default value of the property, if it is not defined in
* userConf and systemConf.
* @param userConf Configuration provided by client in the JSON definition
* @param systemConf The YarnConfiguration in the system.
* @return long value for the property
*/
public static long getLong(String name, long defaultValue,
Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) {
return userConf.getPropertyLong(name, systemConf.getLong(name, defaultValue));
}
public static int getInt(String name, int defaultValue,
Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) {
return userConf.getPropertyInt(name, systemConf.getInt(name, defaultValue));
}
}

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.conf;
public interface YarnServiceConfKeys {
// Retry settings for the ServiceClient to talk to Service AppMaster
String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms";
String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms";
}

View File

@ -17,7 +17,9 @@
*/
package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Component;
import org.apache.hadoop.yarn.service.conf.SliderKeys;
@ -35,6 +37,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_RETRY_INTERVAL;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_RETRY_MAX;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.$;
public abstract class AbstractProviderService implements ProviderService,
@ -50,7 +54,8 @@ public abstract class AbstractProviderService implements ProviderService,
public void buildContainerLaunchContext(AbstractLauncher launcher,
Application application, ComponentInstance instance,
SliderFileSystem fileSystem) throws IOException, SliderException {
SliderFileSystem fileSystem, Configuration yarnConf)
throws IOException, SliderException {
Component component = instance.getComponent().getComponentSpec();;
processArtifact(launcher, instance, fileSystem, application);
@ -93,5 +98,12 @@ public abstract class AbstractProviderService implements ProviderService,
operation.add(launchCommand);
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build());
// By default retry forever every 30 seconds
launcher.setRetryContext(YarnServiceConf
.getInt(CONTAINER_RETRY_MAX, -1, application.getConfiguration(),
yarnConf), YarnServiceConf
.getInt(CONTAINER_RETRY_INTERVAL, 30000, application.getConfiguration(),
yarnConf));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.conf.Configuration;
import org.apache.slider.api.resource.Application;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.exceptions.SliderException;
@ -33,5 +34,6 @@ public interface ProviderService {
*/
void buildContainerLaunchContext(AbstractLauncher containerLauncher,
Application application, ComponentInstance instance,
SliderFileSystem sliderFileSystem) throws IOException, SliderException;
SliderFileSystem sliderFileSystem, Configuration yarnConf)
throws IOException, SliderException;
}

View File

@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.service.conf.SliderKeys;
@ -186,6 +188,13 @@ public class AbstractLauncher {
return containerLaunchContext;
}
public void setRetryContext(int maxRetries, int retryInterval) {
ContainerRetryContext retryContext = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, maxRetries,
retryInterval);
containerLaunchContext.setContainerRetryContext(retryContext);
}
/**
* Dump local resources at debug level
*/

View File

@ -138,7 +138,7 @@ public class RoleLaunchService
ProviderService provider = ProviderFactory.getProviderService(
compSpec.getArtifact());
provider.buildContainerLaunchContext(containerLauncher, application,
instance, fs);
instance, fs, getConfig());
long delay = compSpec.getConfiguration()
.getPropertyLong(KEY_CONTAINER_LAUNCH_DELAY, 0);

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.*;
import org.apache.hadoop.yarn.proto.ClientAMProtocol;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.slider.api.resource.Application;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.mock;
public class MockServiceAM extends ServiceMaster {
Application application;
// The list of containers fed by tests to be returned on
// AMRMClientCallBackHandler#onContainersAllocated
final List<Container> feedContainers =
Collections.synchronizedList(new LinkedList<>());
public MockServiceAM(Application application) {
super(application.getName());
this.application = application;
}
@Override
protected ContainerId getAMContainerId()
throws BadClusterStateException {
return ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.fromString(application.getId()), 1), 1);
}
@Override
protected Path getAppDir() {
Path path = new Path(new Path("target", "apps"), application.getName());
System.out.println("Application path: " + path);
return path;
}
@Override
protected ServiceScheduler createServiceScheduler(ServiceContext context)
throws IOException, YarnException {
return new ServiceScheduler(context) {
@Override
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
@Override
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
AMRMClientImpl client1 = new AMRMClientImpl() {
@Override public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException {
AllocateResponse.AllocateResponseBuilder builder =
AllocateResponse.newBuilder();
synchronized (feedContainers) {
if (feedContainers.isEmpty()) {
System.out.println("Allocating........ no containers");
return builder.build();
} else {
// The AMRMClient will return containers for compoenent that are
// at FLEXING state
List<Container> allocatedContainers = new LinkedList<>();
Iterator<Container> itor = feedContainers.iterator();
while (itor.hasNext()) {
Container c = itor.next();
org.apache.hadoop.yarn.service.component.Component component =
componentsById.get(c.getAllocationRequestId());
if (component.getState() == ComponentState.FLEXING) {
System.out.println("Allocated container " + c.getId());
allocatedContainers.add(c);
itor.remove();
}
}
return builder.allocatedContainers(allocatedContainers).build();
}
}
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl) {
return mock(RegisterApplicationMasterResponse.class);
}
@Override public void unregisterApplicationMaster(
FinalApplicationStatus appStatus, String appMessage,
String appTrackingUrl) {
// DO nothing
}
};
return AMRMClientAsync
.createAMRMClientAsync(client1, 1000,
this.new AMRMClientCallback());
}
@Override
public NMClientAsync createNMClient() {
NMClientAsync nmClientAsync = super.createNMClient();
nmClientAsync.setClient(mock(NMClient.class));
return nmClientAsync;
}
};
}
@Override protected void loadApplicationJson(ServiceContext context,
SliderFileSystem fs) throws IOException {
context.application = application;
}
/**
*
* @param application The application for the component
* @param id The id for the container
* @param compName The component to which the container is fed
* @return
*/
public Container feedContainerToComp(Application application, int id,
String compName) {
ApplicationId applicationId = ApplicationId.fromString(application.getId());
ContainerId containerId = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id);
NodeId nodeId = NodeId.newInstance("localhost", 1234);
Container container = Container
.newInstance(containerId, nodeId, "localhost",
Resource.newInstance(100, 1), Priority.newInstance(0), null);
long allocateId =
context.scheduler.getAllComponents().get(compName).getAllocateId();
container.setAllocationRequestId(allocateId);
synchronized (feedContainers) {
feedContainers.add(container);
}
return container;
}
public void flexComponent(String compName, long numberOfContainers)
throws IOException {
ClientAMProtocol.ComponentCountProto componentCountProto =
ClientAMProtocol.ComponentCountProto.newBuilder().setName(compName)
.setNumberOfContainers(numberOfContainers).build();
ClientAMProtocol.FlexComponentsRequestProto requestProto =
ClientAMProtocol.FlexComponentsRequestProto.newBuilder()
.addComponents(componentCountProto).build();
context.clientAMService.flexComponents(requestProto);
}
public Component getComponent(String compName) {
return context.scheduler.getAllComponents().get(compName);
}
public void waitForDependenciesSatisfied(String compName)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return context.scheduler.getAllComponents().get(compName)
.areDependenciesReady();
}
}, 1000, 20000);
}
public void waitForNumDesiredContainers(String compName,
int numDesiredContainers) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return context.scheduler.getAllComponents().get(compName)
.getNumDesiredInstances() == numDesiredContainers;
}
}, 1000, 20000);
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Component;
import org.apache.slider.api.resource.Resource;
public class ServiceTestUtils {
// Example service definition
// 2 components, each of which has 2 containers.
protected Application createExampleApplication() {
Application exampleApp = new Application();
exampleApp.setName("example-app");
exampleApp.addComponent(createComponent("compa"));
exampleApp.addComponent(createComponent("compb"));
return exampleApp;
}
protected Component createComponent(String name) {
return createComponent(name, 2L, "sleep 1000");
}
protected Component createComponent(String name, long numContainers,
String command) {
Component comp1 = new Component();
comp1.setNumberOfContainers(numContainers);
comp1.setLaunchCommand(command);
comp1.setName(name);
Resource resource = new Resource();
comp1.setResource(resource);
resource.setMemory("128");
resource.setCpus(1);
return comp1;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service;
import com.google.common.base.Supplier;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,14 +36,15 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Component;
import org.apache.slider.api.resource.Container;
import org.apache.slider.api.resource.ContainerState;
import org.apache.slider.api.resource.Resource;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.exceptions.SliderException;
import org.junit.After;
@ -59,9 +61,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -70,8 +70,7 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
import static org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys.KEY_AM_RESOURCE_MEM;
import static org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys.KEY_SLIDER_BASE_PATH;
@ -79,7 +78,7 @@ import static org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys.KEY_SLIDER_B
* End to end tests to test deploying services with MiniYarnCluster and a in-JVM
* ZK testing cluster.
*/
public class TestYarnNativeServices {
public class TestYarnNativeServices extends ServiceTestUtils{
private static final Log LOG =
LogFactory.getLog(TestYarnNativeServices.class);
@ -118,14 +117,16 @@ public class TestYarnNativeServices {
LinuxResourceCalculatorPlugin.class.getName());
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
ProcfsBasedProcessTree.class.getName());
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
conf.setBoolean(
YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
conf.setBoolean(TIMELINE_SERVICE_ENABLED, false);
conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
conf.setLong(DEBUG_NM_DELETE_DELAY_SEC, 60000);
conf.setLong(KEY_AM_RESOURCE_MEM, 128);
conf.setLong(KEY_AM_RESOURCE_MEM, 526);
conf.setLong(InternalKeys.MONITOR_INTERVAL, 5);
// Disable vmem check to disallow NM killing the container
conf.setBoolean(NM_VMEM_CHECK_ENABLED, false);
conf.setBoolean(NM_PMEM_CHECK_ENABLED, false);
// setup zk cluster
TestingCluster zkCluster;
zkCluster = new TestingCluster(1);
@ -233,11 +234,16 @@ public class TestYarnNativeServices {
// 4. Flex up each component to 2 containers and check the component instance names
// 5. Stop the service
// 6. Destroy the service
@Test (timeout = 500000)
@Test (timeout = 200000)
public void testCreateFlexStopDestroyService() throws Exception {
ServiceClient client = createClient();
Application exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
SliderFileSystem fileSystem = new SliderFileSystem(conf);
Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName());
// check app.json is persisted.
Assert.assertTrue(
fs.exists(new Path(appDir, exampleApp.getName() + ".json")));
waitForAllCompToBeReady(client, exampleApp);
// Flex two components, each from 2 container to 3 containers.
@ -272,8 +278,6 @@ public class TestYarnNativeServices {
LOG.info("Destroy the service");
//destroy the service and check the app dir is deleted from fs.
client.actionDestroy(exampleApp.getName());
SliderFileSystem fileSystem = new SliderFileSystem(conf);
Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName());
// check the application dir on hdfs (in this case, local fs) are deleted.
Assert.assertFalse(fs.exists(appDir));
}
@ -281,7 +285,7 @@ public class TestYarnNativeServices {
// Create compa with 2 containers
// Create compb with 2 containers which depends on compa
// Check containers for compa started before containers for compb
@Test (timeout = 500000)
@Test (timeout = 200000)
public void testComponentStartOrder() throws Exception {
ServiceClient client = createClient();
Application exampleApp = new Application();
@ -400,7 +404,7 @@ public class TestYarnNativeServices {
e.printStackTrace();
return false;
}
}, 5000, 200000);
}, 2000, 200000);
}
// wait until all the containers for all components become ready state
@ -442,7 +446,7 @@ public class TestYarnNativeServices {
e.printStackTrace();
return false;
}
}, 5000, 900000);
}, 2000, 200000);
}
private ServiceClient createClient() throws Exception {
@ -467,30 +471,4 @@ public class TestYarnNativeServices {
}
return totalContainers;
}
// Example service definition
// 2 components, each of which has 2 containers.
private Application createExampleApplication() {
Application exampleApp = new Application();
exampleApp.setName("example-app");
exampleApp.addComponent(createComponent("compa"));
exampleApp.addComponent(createComponent("compb"));
return exampleApp;
}
private Component createComponent(String name) {
return createComponent(name, 2L, "sleep 1000");
}
private Component createComponent(String name, long numContainers,
String command) {
Component comp1 = new Component();
comp1.setNumberOfContainers(numContainers);
comp1.setLaunchCommand(command);
comp1.setName(name);
Resource resource = new Resource();
comp1.setResource(resource);
resource.setMemory("128");
resource.setCpus(1);
return comp1;
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.servicemonitor;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.MockServiceAM;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Component;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
public class TestServiceMonitor extends ServiceTestUtils {
private File basedir;
YarnConfiguration conf = new YarnConfiguration();
@Before
public void setup() throws Exception {
basedir = new File("target", "apps");
if (basedir.exists()) {
FileUtils.deleteDirectory(basedir);
} else {
basedir.mkdirs();
}
conf.setLong(InternalKeys.MONITOR_INTERVAL, 2);
}
@After
public void tearDown() throws IOException {
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
}
// Create compa with 1 container
// Create compb with 1 container
// Verify compb dependency satisfied
// Increase compa to 2 containers
// Verify compb dependency becomes unsatisfied.
@Test
public void testComponentDependency() throws Exception{
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
Application exampleApp = new Application();
exampleApp.setId(applicationId.toString());
exampleApp.setName("testComponentDependency");
exampleApp.addComponent(createComponent("compa", 1, "sleep 1000"));
Component compb = createComponent("compb", 1, "sleep 1000");
// Let compb depends on compa;
compb.setDependencies(Collections.singletonList("compa"));
exampleApp.addComponent(compb);
MockServiceAM am = new MockServiceAM(exampleApp);
am.init(conf);
am.start();
// compa ready
Assert.assertTrue(am.getComponent("compa").areDependenciesReady());
//compb not ready
Assert.assertFalse(am.getComponent("compb").areDependenciesReady());
// feed 1 container to compa,
am.feedContainerToComp(exampleApp, 1, "compa");
// waiting for compb's dependencies are satisfied
am.waitForDependenciesSatisfied("compb");
// feed 1 container to compb
am.feedContainerToComp(exampleApp, 2, "compb");
am.flexComponent("compa", 2);
am.waitForNumDesiredContainers("compa", 2);
// compb dependencies not satisfied again.
Assert.assertFalse(am.getComponent("compb").areDependenciesReady());
am.stop();
}
}