diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index a7b7e22d42c..6bc567328fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -132,7 +132,6 @@ public class ServiceScheduler extends CompositeService { private AMRMClientAsync amRMClient; private NMClientAsync nmClient; private AsyncDispatcher dispatcher; - AsyncDispatcher compInstanceDispatcher; private YarnRegistryViewForProviders yarnRegistryOperations; private ServiceContext context; private ContainerLaunchService containerLaunchService; @@ -152,7 +151,7 @@ public void buildInstance(ServiceContext context, Configuration configuration) yarnRegistryOperations = createYarnRegistryOperations(context, registryClient); - // register metrics + // register metrics, serviceMetrics = ServiceMetrics .register(app.getName(), "Metrics for service"); serviceMetrics.tag("type", "Metrics type [component or service]", "service"); @@ -167,14 +166,11 @@ public void buildInstance(ServiceContext context, Configuration configuration) dispatcher = new AsyncDispatcher("Component dispatcher"); dispatcher.register(ComponentEventType.class, new ComponentEventHandler()); + dispatcher.register(ComponentInstanceEventType.class, + new ComponentInstanceEventHandler()); dispatcher.setDrainEventsOnStop(); addIfService(dispatcher); - compInstanceDispatcher = - new AsyncDispatcher("CompInstance dispatcher"); - compInstanceDispatcher.register(ComponentInstanceEventType.class, - new ComponentInstanceEventHandler()); - addIfService(compInstanceDispatcher); containerLaunchService = new ContainerLaunchService(context.fs); addService(containerLaunchService); @@ -277,10 +273,10 @@ public void serviceStart() throws Exception { } private void recoverComponents(RegisterApplicationMasterResponse response) { - List recoveredContainers = response + List containersFromPrevAttempt = response .getContainersFromPreviousAttempts(); LOG.info("Received {} containers from previous attempt.", - recoveredContainers.size()); + containersFromPrevAttempt.size()); Map existingRecords = new HashMap<>(); List existingComps = null; try { @@ -302,9 +298,8 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { } } } - for (Container container : recoveredContainers) { - LOG.info("Handling container {} from previous attempt", - container.getId()); + for (Container container : containersFromPrevAttempt) { + LOG.info("Handling {} from previous attempt", container.getId()); ServiceRecord record = existingRecords.get(RegistryPathUtils .encodeYarnID(container.getId().toString())); if (record != null) { @@ -487,16 +482,21 @@ public void onContainersAllocated(List containers) { new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED) .setContainer(container); dispatcher.getEventHandler().handle(event); - Collection requests = amRMClient - .getMatchingRequests(container.getAllocationRequestId()); - LOG.info("[COMPONENT {}]: {} outstanding container requests.", - comp.getName(), requests.size()); - // remove the corresponding request - if (requests.iterator().hasNext()) { - LOG.info("[COMPONENT {}]: removing one container request.", comp - .getName()); - AMRMClient.ContainerRequest request = requests.iterator().next(); - amRMClient.removeContainerRequest(request); + try { + Collection requests = amRMClient + .getMatchingRequests(container.getAllocationRequestId()); + LOG.info("[COMPONENT {}]: remove {} outstanding container requests " + + "for allocateId " + container.getAllocationRequestId(), + comp.getName(), requests.size()); + // remove the corresponding request + if (requests.iterator().hasNext()) { + AMRMClient.ContainerRequest request = requests.iterator().next(); + amRMClient.removeContainerRequest(request); + } + } catch(Exception e) { + //TODO Due to YARN-7490, exception may be thrown, catch and ignore for + //now. + LOG.error("Exception when removing the matching requests. ", e); } } } @@ -569,7 +569,7 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler { } ComponentEvent event = new ComponentEvent(instance.getCompName(), CONTAINER_STARTED) - .setInstance(instance); + .setInstance(instance).setContainerId(containerId); dispatcher.getEventHandler().handle(event); } @@ -649,10 +649,6 @@ public void removeLiveCompInstance(ContainerId containerId) { liveInstances.remove(containerId); } - public AsyncDispatcher getCompInstanceDispatcher() { - return compInstanceDispatcher; - } - public YarnRegistryViewForProviders getYarnRegistryOperations() { return yarnRegistryOperations; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 7208f391a3a..88f47635e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -82,7 +82,8 @@ public class Component implements EventHandler { private Map compInstances = new ConcurrentHashMap<>(); // component instances to be assigned with a container - private List pendingInstances = new LinkedList<>(); + private List pendingInstances = + Collections.synchronizedList(new LinkedList<>()); private ContainerFailureTracker failureTracker; private Probe probe; private final ReentrantReadWriteLock.ReadLock readLock; @@ -94,7 +95,7 @@ public class Component implements EventHandler { private StateMachine stateMachine; - private AsyncDispatcher compInstanceDispatcher; + private AsyncDispatcher dispatcher; private static final StateMachineFactory stateMachineFactory = new StateMachineFactory( @@ -149,7 +150,7 @@ public Component( this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); this.stateMachine = stateMachineFactory.make(this); - compInstanceDispatcher = scheduler.getCompInstanceDispatcher(); + dispatcher = scheduler.getDispatcher(); failureTracker = new ContainerFailureTracker(context, this); probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck()); @@ -256,30 +257,18 @@ public void transition(Component component, ComponentEvent event) { component.releaseContainer(container); return; } - if (instance.hasContainer()) { - LOG.info( - "[COMPONENT {}]: Instance {} already has container, release " + - "surplus container {}", - instance.getCompName(), instance.getCompInstanceId(), container - .getId()); - component.releaseContainer(container); - return; - } + component.pendingInstances.remove(instance); - LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " + - "host {}, num pending component instances reduced to {} ", - component.getName(), container.getId(), instance - .getCompInstanceName(), container.getNodeId(), component - .pendingInstances.size()); instance.setContainer(container); ProviderUtils.initCompInstanceDir(component.getContext().fs, instance); component.getScheduler().addLiveCompInstance(container.getId(), instance); - LOG.info("[COMPONENT {}]: Marking {} as started for component " + - "instance {}", component.getName(), event.getContainer().getId(), - instance.getCompInstanceId()); - component.compInstanceDispatcher.getEventHandler().handle( - new ComponentInstanceEvent(instance.getContainerId(), - START)); + LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " + + "host {}, num pending component instances reduced to {} ", + component.getName(), container.getId(), + instance.getCompInstanceName(), container.getNodeId(), + component.pendingInstances.size()); + component.dispatcher.getEventHandler().handle( + new ComponentInstanceEvent(container.getId(), START)); } } @@ -288,9 +277,8 @@ private static class ContainerStartedTransition implements @Override public ComponentState transition(Component component, ComponentEvent event) { - component.compInstanceDispatcher.getEventHandler().handle( - new ComponentInstanceEvent(event.getInstance().getContainerId(), - START)); + component.dispatcher.getEventHandler().handle( + new ComponentInstanceEvent(event.getContainerId(), START)); return checkIfStable(component); } } @@ -313,14 +301,7 @@ private static class ContainerCompletedTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { component.updateMetrics(event.getStatus()); - - // add back to pending list - component.pendingInstances.add(event.getInstance()); - LOG.info( - "[COMPONENT {}]: {} completed, num pending comp instances increased to {}.", - component.getName(), event.getStatus().getContainerId(), - component.pendingInstances.size()); - component.compInstanceDispatcher.getEventHandler().handle( + component.dispatcher.getEventHandler().handle( new ComponentInstanceEvent(event.getStatus().getContainerId(), STOP).setStatus(event.getStatus())); component.componentSpec.setState( @@ -328,8 +309,8 @@ public void transition(Component component, ComponentEvent event) { } } - public ServiceMetrics getCompMetrics () { - return componentMetrics; + public void reInsertPendingInstance(ComponentInstance instance) { + pendingInstances.add(instance); } private void releaseContainer(Container container) { @@ -581,4 +562,9 @@ private static class BaseTransition implements public ServiceContext getContext() { return context; } + + // Only for testing + public List getPendingInstances() { + return pendingInstances; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index d93dcf153a1..447b436fc9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.component; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; @@ -30,6 +31,16 @@ public class ComponentEvent extends AbstractEvent { private Container container; private ComponentInstance instance; private ContainerStatus status; + private ContainerId containerId; + + public ContainerId getContainerId() { + return containerId; + } + + public ComponentEvent setContainerId(ContainerId containerId) { + this.containerId = containerId; + return this; + } public ComponentEvent(String name, ComponentEventType type) { super(type); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 9e5f98cf342..509f6675006 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -146,7 +146,7 @@ private static class ContainerStartedTransition extends BaseTransition { compInstance.containerStatusFuture = compInstance.scheduler.executorService.scheduleAtFixedRate( new ContainerStatusRetriever(compInstance.scheduler, - compInstance.getContainerId(), compInstance), 0, 1, + event.getContainerId(), compInstance), 0, 1, TimeUnit.SECONDS); compInstance.component.incRunningContainers(); long containerStartTime = System.currentTimeMillis(); @@ -160,10 +160,10 @@ private static class ContainerStartedTransition extends BaseTransition { } org.apache.hadoop.yarn.service.api.records.Container container = new org.apache.hadoop.yarn.service.api.records.Container(); - container.setId(compInstance.getContainerId().toString()); + container.setId(event.getContainerId().toString()); container.setLaunchTime(new Date(containerStartTime)); container.setState(ContainerState.RUNNING_BUT_UNREADY); - container.setBareHost(compInstance.container.getNodeId().getHost()); + container.setBareHost(compInstance.getNodeId().getHost()); container.setComponentInstanceName(compInstance.getCompInstanceName()); if (compInstance.containerSpec != null) { // remove the previous container. @@ -219,15 +219,11 @@ public void transition(ComponentInstance compInstance, // re-ask the failed container. Component comp = compInstance.component; comp.requestContainers(1); - LOG.info(compInstance.getCompInstanceId() - + ": Container completed. Requested a new container." + System - .lineSeparator() + " exitStatus={}, diagnostics={}.", - event.getStatus().getExitStatus(), - event.getStatus().getDiagnostics()); String containerDiag = compInstance.getCompInstanceId() + ": " + event.getStatus() .getDiagnostics(); compInstance.diagnostics.append(containerDiag + System.lineSeparator()); + compInstance.cancelContainerStatusRetriever(); if (compInstance.getState().equals(READY)) { compInstance.component.decContainersReady(); @@ -255,11 +251,13 @@ public void transition(ComponentInstance compInstance, // hdfs dir content will be overwritten when a new container gets started, // so no need remove. compInstance.scheduler.executorService - .submit(compInstance::cleanupRegistry); + .submit(() -> compInstance.cleanupRegistry(event.getContainerId())); + if (compInstance.timelineServiceEnabled) { // record in ATS - compInstance.serviceTimelinePublisher.componentInstanceFinished - (compInstance, event.getStatus().getExitStatus(), containerDiag); + compInstance.serviceTimelinePublisher + .componentInstanceFinished(event.getContainerId(), + event.getStatus().getExitStatus(), containerDiag); } compInstance.containerSpec.setState(ContainerState.STOPPED); } @@ -267,6 +265,14 @@ public void transition(ComponentInstance compInstance, // remove the failed ContainerId -> CompInstance mapping comp.getScheduler().removeLiveCompInstance(event.getContainerId()); + comp.reInsertPendingInstance(compInstance); + + LOG.info(compInstance.getCompInstanceId() + + ": {} completed. Reinsert back to pending list and requested " + + "a new container." + System.lineSeparator() + + " exitStatus={}, diagnostics={}.", + event.getContainerId(), event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); if (shouldExit) { // Sleep for 5 seconds in hope that the state can be recorded in ATS. // in case there's a client polling the comp state, it can be notified. @@ -277,8 +283,6 @@ public void transition(ComponentInstance compInstance, } ExitUtil.terminate(-1); } - - compInstance.removeContainer(); } } @@ -312,15 +316,6 @@ public void handle(ComponentInstanceEvent event) { } } - public boolean hasContainer() { - return this.container != null; - } - - public void removeContainer() { - this.container = null; - this.compInstanceId.setContainerId(null); - } - public void setContainer(Container container) { this.container = container; this.compInstanceId.setContainerId(container.getId()); @@ -337,7 +332,7 @@ public ContainerStatus getContainerStatus() { public void updateContainerStatus(ContainerStatus status) { this.status = status; org.apache.hadoop.yarn.service.api.records.Container container = - getCompSpec().getContainer(getContainerId().toString()); + getCompSpec().getContainer(status.getContainerId().toString()); if (container != null) { container.setIp(StringUtils.join(",", status.getIPs())); container.setHostname(status.getHost()); @@ -348,10 +343,6 @@ public void updateContainerStatus(ContainerStatus status) { updateServiceRecord(yarnRegistryOperations, status); } - public ContainerId getContainerId() { - return container.getId(); - } - public String getCompName() { return compInstanceId.getCompName(); } @@ -423,12 +414,7 @@ private void updateServiceRecord( public void destroy() { LOG.info(getCompInstanceId() + ": Flexed down by user, destroying."); diagnostics.append(getCompInstanceId() + ": Flexed down by user"); - if (container != null) { - scheduler.removeLiveCompInstance(container.getId()); - component.getScheduler().getAmRMClient() - .releaseAssignedContainer(container.getId()); - getCompSpec().removeContainer(containerSpec); - } + // update metrics if (getState() == STARTED) { component.decRunningContainers(); @@ -437,16 +423,29 @@ public void destroy() { component.decContainersReady(); component.decRunningContainers(); } + getCompSpec().removeContainer(containerSpec); + + if (container == null) { + LOG.info(getCompInstanceId() + " no container is assigned when " + + "destroying"); + return; + } + + ContainerId containerId = container.getId(); + scheduler.removeLiveCompInstance(containerId); + component.getScheduler().getAmRMClient() + .releaseAssignedContainer(containerId); if (timelineServiceEnabled) { - serviceTimelinePublisher.componentInstanceFinished(this, + serviceTimelinePublisher.componentInstanceFinished(containerId, KILLED_BY_APPMASTER, diagnostics.toString()); } - scheduler.executorService.submit(this::cleanupRegistryAndCompHdfsDir); + cancelContainerStatusRetriever(); + scheduler.executorService.submit(() -> + cleanupRegistryAndCompHdfsDir(containerId)); } - private void cleanupRegistry() { - ContainerId containerId = getContainerId(); + private void cleanupRegistry(ContainerId containerId) { String cid = RegistryPathUtils.encodeYarnID(containerId.toString()); try { yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid); @@ -456,8 +455,8 @@ private void cleanupRegistry() { } //TODO Maybe have a dedicated cleanup service. - public void cleanupRegistryAndCompHdfsDir() { - cleanupRegistry(); + public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) { + cleanupRegistry(containerId); try { if (compInstanceDir != null && fs.exists(compInstanceDir)) { boolean deleted = fs.delete(compInstanceDir, true); @@ -515,6 +514,12 @@ private static class ContainerStatusRetriever implements Runnable { } } + private void cancelContainerStatusRetriever() { + if (containerStatusFuture != null && !containerStatusFuture.isDone()) { + containerStatusFuture.cancel(true); + } + } + @Override public int compareTo(ComponentInstance to) { long delta = containerStartedTime - to.containerStartedTime; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java index 0e51a6228c0..b9f3a245c69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java @@ -87,7 +87,7 @@ public ContainerLauncher( AbstractLauncher launcher = new AbstractLauncher(fs, null); try { provider.buildContainerLaunchContext(launcher, service, - instance, fs, getConfig()); + instance, fs, getConfig(), container); instance.getComponent().getScheduler().getNmClient() .startContainerAsync(container, launcher.completeContainerLaunch()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java index 6d7406199ad..70155915ea6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.api.records.Component; @@ -55,7 +56,7 @@ public abstract void processArtifact(AbstractLauncher launcher, public void buildContainerLaunchContext(AbstractLauncher launcher, Service service, ComponentInstance instance, - SliderFileSystem fileSystem, Configuration yarnConf) + SliderFileSystem fileSystem, Configuration yarnConf, Container container) throws IOException, SliderException { Component component = instance.getComponent().getComponentSpec();; processArtifact(launcher, instance, fileSystem, service); @@ -67,7 +68,7 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, Map globalTokens = instance.getComponent().getScheduler().globalTokens; Map tokensForSubstitution = ProviderUtils - .initCompTokensForSubstitute(instance); + .initCompTokensForSubstitute(instance, container); tokensForSubstitution.putAll(globalTokens); // Set the environment variables in launcher launcher.putEnv(ServiceUtils diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java index eb721b4a4f3..11015ea1750 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.provider; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -34,6 +35,6 @@ public interface ProviderService { */ void buildContainerLaunchContext(AbstractLauncher containerLauncher, Service service, ComponentInstance instance, - SliderFileSystem sliderFileSystem, Configuration yarnConf) - throws IOException, SliderException; + SliderFileSystem sliderFileSystem, Configuration yarnConf, Container + container) throws IOException, SliderException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index e074dd756ae..c0c44c3db23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.service.ServiceContext; @@ -393,13 +394,13 @@ private static void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs, * @return tokens to replace */ public static Map initCompTokensForSubstitute( - ComponentInstance instance) { + ComponentInstance instance, Container container) { Map tokens = new HashMap<>(); tokens.put(COMPONENT_NAME, instance.getCompSpec().getName()); tokens .put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase()); tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName()); - tokens.put(CONTAINER_ID, instance.getContainer().getId().toString()); + tokens.put(CONTAINER_ID, container.getId().toString()); tokens.put(COMPONENT_ID, String.valueOf(instance.getCompInstanceId().getId())); tokens.putAll(instance.getComponent().getDependencyHostIpTokens()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java index c5229868005..949ce19c8dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -20,6 +20,7 @@ import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @@ -178,10 +179,10 @@ public void componentInstanceStarted(Container container, putEntity(entity); } - public void componentInstanceFinished(ComponentInstance instance, + public void componentInstanceFinished(ContainerId containerId, int exitCode, String diagnostics) { TimelineEntity entity = createComponentInstanceEntity( - instance.getContainer().getId().toString()); + containerId.toString()); // create info keys Map entityInfos = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index d343a03dfcd..429816137c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -24,14 +24,8 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; + +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -42,15 +36,15 @@ import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentState; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; +import org.apache.hadoop.yarn.util.Records; import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeoutException; import static org.mockito.Mockito.mock; @@ -63,6 +57,8 @@ public class MockServiceAM extends ServiceMaster { final List feedContainers = Collections.synchronizedList(new LinkedList<>()); + final List failedContainers = + Collections.synchronizedList(new LinkedList<>()); public MockServiceAM(Service service) { super(service.getName()); this.service = service; @@ -102,10 +98,10 @@ protected AMRMClientAsync createAMRMClient() { AllocateResponse.AllocateResponseBuilder builder = AllocateResponse.newBuilder(); + // add new containers if any synchronized (feedContainers) { if (feedContainers.isEmpty()) { System.out.println("Allocating........ no containers"); - return builder.build(); } else { // The AMRMClient will return containers for compoenent that are // at FLEXING state @@ -121,9 +117,20 @@ protected AMRMClientAsync createAMRMClient() { itor.remove(); } } - return builder.allocatedContainers(allocatedContainers).build(); + builder.allocatedContainers(allocatedContainers); } } + + // add failed containers if any + synchronized (failedContainers) { + if (!failedContainers.isEmpty()) { + List failed = + new LinkedList<>(failedContainers); + failedContainers.clear(); + builder.completedContainersStatuses(failed); + } + } + return builder.build(); } @Override @@ -184,6 +191,19 @@ public Container feedContainerToComp(Service service, int id, return container; } + public void feedFailedContainerToComp(Service service, int id, String + compName) { + ApplicationId applicationId = ApplicationId.fromString(service.getId()); + ContainerId containerId = ContainerId + .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id); + ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); + containerStatus.setContainerId(containerId); + synchronized (failedContainers) { + failedContainers.add(containerStatus); + } + } + + public void flexComponent(String compName, long numberOfContainers) throws IOException { ClientAMProtocol.ComponentCountProto componentCountProto = @@ -218,4 +238,22 @@ public void waitForNumDesiredContainers(String compName, } }, 1000, 20000); } + + + public ComponentInstance getCompInstance(String compName, String + instanceName) { + return context.scheduler.getAllComponents().get(compName) + .getComponentInstance(instanceName); + } + + public void waitForCompInstanceState(ComponentInstance instance, + ComponentInstanceState state) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return instance.getState().equals(state); + } + }, 1000, 20000); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index cf328809203..a70a0c280d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -65,6 +65,7 @@ public class ServiceTestUtils { private MiniYARNCluster yarnCluster = null; private MiniDFSCluster hdfsCluster = null; + TestingCluster zkCluster; private FileSystem fs = null; private Configuration conf = null; public static final int NUM_NMS = 1; @@ -165,7 +166,6 @@ protected void setupInternal(int numNodeManager) conf.setBoolean(NM_VMEM_CHECK_ENABLED, false); conf.setBoolean(NM_PMEM_CHECK_ENABLED, false); // setup zk cluster - TestingCluster zkCluster; zkCluster = new TestingCluster(1); zkCluster.start(); conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString()); @@ -239,6 +239,9 @@ public void shutdown() throws IOException { hdfsCluster = null; } } + if (zkCluster != null) { + zkCluster.stop(); + } if (basedir != null) { FileUtils.deleteDirectory(basedir); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java new file mode 100644 index 00000000000..fb4de0d5714 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.registry.client.api.RegistryConstants + .KEY_REGISTRY_ZK_QUORUM; + +public class TestServiceAM extends ServiceTestUtils{ + + private File basedir; + YarnConfiguration conf = new YarnConfiguration(); + TestingCluster zkCluster; + + @Before + public void setup() throws Exception { + basedir = new File("target", "apps"); + if (basedir.exists()) { + FileUtils.deleteDirectory(basedir); + } else { + basedir.mkdirs(); + } + zkCluster = new TestingCluster(1); + zkCluster.start(); + conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); + System.out.println("ZK cluster: " + zkCluster.getConnectString()); + } + + @After + public void tearDown() throws IOException { + if (basedir != null) { + FileUtils.deleteDirectory(basedir); + } + if (zkCluster != null) { + zkCluster.stop(); + } + } + + // Race condition YARN-7486 + // 1. Allocate 1 container to compa and wait it to be started + // 2. Fail this container, and in the meanwhile allocate the 2nd container. + // 3. The 2nd container should not be assigned to compa-0 instance, because + // the compa-0 instance is not stopped yet. + // 4. check compa still has the instance in the pending list. + @Test + public void testContainerCompleted() throws TimeoutException, + InterruptedException { + ApplicationId applicationId = ApplicationId.newInstance(123456, 1); + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setName("testContainerCompleted"); + exampleApp.addComponent(createComponent("compa", 1, "pwd")); + + MockServiceAM am = new MockServiceAM(exampleApp); + am.init(conf); + am.start(); + + ComponentInstance compa0 = am.getCompInstance("compa", "compa-0"); + // allocate a container + am.feedContainerToComp(exampleApp, 1, "compa"); + am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED); + + System.out.println("Fail the container 1"); + // fail the container + am.feedFailedContainerToComp(exampleApp, 1, "compa"); + + // allocate the second container immediately, this container will not be + // assigned to comp instance + // because the instance is not yet added to the pending list. + am.feedContainerToComp(exampleApp, 2, "compa"); + + am.waitForCompInstanceState(compa0, ComponentInstanceState.INIT); + // still 1 pending instance + Assert.assertEquals(1, + am.getComponent("compa").getPendingInstances().size()); + am.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java index 0e03a2c1b61..e25d38dd491 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.monitor; import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingCluster; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.MockServiceAM; @@ -37,10 +38,14 @@ import java.io.IOException; import java.util.Collections; +import static org.apache.hadoop.registry.client.api.RegistryConstants + .KEY_REGISTRY_ZK_QUORUM; + public class TestServiceMonitor extends ServiceTestUtils { private File basedir; YarnConfiguration conf = new YarnConfiguration(); + TestingCluster zkCluster; @Before public void setup() throws Exception { @@ -51,6 +56,10 @@ public void setup() throws Exception { basedir.mkdirs(); } conf.setLong(YarnServiceConf.READINESS_CHECK_INTERVAL, 2); + zkCluster = new TestingCluster(1); + zkCluster.start(); + conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); + System.out.println("ZK cluster: " + zkCluster.getConnectString()); } @After @@ -58,6 +67,9 @@ public void tearDown() throws IOException { if (basedir != null) { FileUtils.deleteDirectory(basedir); } + if (zkCluster != null) { + zkCluster.stop(); + } } // Create compa with 1 container