diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 49702e3455c..e4a245d3dc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -469,4 +469,22 @@ public class ApiServiceClient extends AppAdminClient { return output; } + @Override + public int actionUpgrade(String appName, + String fileName) throws IOException, YarnException { + int result; + try { + Service service = + loadAppJsonFromLocalFS(fileName, appName, null, null); + service.setState(ServiceState.UPGRADING); + String buffer = jsonSerDeser.toJson(service); + ClientResponse response = getApiClient() + .post(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade application: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index e7979b8536a..59ee05d7ab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -375,6 +375,12 @@ public class ApiServer { && updateServiceData.getLifetime() > 0) { return updateLifetime(appName, updateServiceData, ugi); } + + // If an UPGRADE is requested + if (updateServiceData.getState() != null && + updateServiceData.getState() == ServiceState.UPGRADING) { + return upgradeService(updateServiceData, ugi); + } } catch (UndeclaredThrowableException e) { return formatResponse(Status.BAD_REQUEST, e.getCause().getMessage()); @@ -475,6 +481,24 @@ public class ApiServer { return formatResponse(Status.OK, status); } + private Response upgradeService(Service service, + final UserGroupInformation ugi) throws IOException, InterruptedException { + ServiceStatus status = new ServiceStatus(); + ugi.doAs((PrivilegedExceptionAction) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + sc.actionUpgrade(service); + sc.close(); + return null; + }); + LOG.info("Service {} version {} upgrade initialized"); + status.setDiagnostics("Service " + service.getName() + + " version " + service.getVersion() + " saved."); + status.setState(ServiceState.ACCEPTED); + return formatResponse(Status.ACCEPTED, status); + } + /** * Used by negative test case. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 516d23d3ae6..4422451c5c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -23,8 +23,14 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; + import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; + import java.io.IOException; @@ -37,4 +43,10 @@ public interface ClientAMProtocol { StopResponseProto stop(StopRequestProto requestProto) throws IOException, YarnException; + + UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request) + throws IOException, YarnException; + + RestartServiceResponseProto restart(RestartServiceRequestProto request) + throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index fb73f158703..08c36f443ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -33,8 +33,12 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; @@ -142,4 +146,24 @@ public class ClientAMService extends AbstractService public InetSocketAddress getBindAddress() { return bindAddress; } + + @Override + public UpgradeServiceResponseProto upgrade( + UpgradeServiceRequestProto request) throws IOException { + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE); + event.setVersion(request.getVersion()); + context.scheduler.getDispatcher().getEventHandler().handle(event); + LOG.info("Upgrading service to version {} by {}", request.getVersion(), + UserGroupInformation.getCurrentUser()); + return UpgradeServiceResponseProto.newBuilder().build(); + } + + @Override + public RestartServiceResponseProto restart(RestartServiceRequestProto request) + throws IOException, YarnException { + ServiceEvent event = new ServiceEvent(ServiceEventType.START); + context.scheduler.getDispatcher().getEventHandler().handle(event); + LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser()); + return RestartServiceResponseProto.newBuilder().build(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java new file mode 100644 index 00000000000..9e7d442b9e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +/** + * Events are handled by {@link ServiceManager} to manage the service + * state. + */ +public class ServiceEvent extends AbstractEvent { + + private final ServiceEventType type; + private String version; + + public ServiceEvent(ServiceEventType serviceEventType) { + super(serviceEventType); + this.type = serviceEventType; + } + + public ServiceEventType getType() { + return type; + } + + public String getVersion() { + return version; + } + + public ServiceEvent setVersion(String version) { + this.version = version; + return this; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java new file mode 100644 index 00000000000..2162eb5e93d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +/** + * Types of {@link ServiceEvent}. + */ +public enum ServiceEventType { + START, + UPGRADE, + STOP_UPGRADE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java new file mode 100644 index 00000000000..a3fbe899bb0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.apache.hadoop.yarn.service.utils.SliderFileSystem; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Manages the state of the service. + */ +public class ServiceManager implements EventHandler { + private static final Logger LOG = LoggerFactory.getLogger( + ServiceManager.class); + + private final Service serviceSpec; + private final ServiceContext context; + private final ServiceScheduler scheduler; + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + + private final StateMachine + stateMachine; + + private final AsyncDispatcher dispatcher; + private final SliderFileSystem fs; + private final UpgradeComponentsFinder componentsFinder; + + private String upgradeVersion; + + private static final StateMachineFactory STATE_MACHINE_FACTORY = + new StateMachineFactory(State.STABLE) + + .addTransition(State.STABLE, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.UPGRADE, + new StartUpgradeTransition()) + + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.START, + new StopUpgradeTransition()) + .installTopology(); + + public ServiceManager(ServiceContext context) { + Preconditions.checkNotNull(context); + this.context = context; + serviceSpec = context.service; + scheduler = context.scheduler; + stateMachine = STATE_MACHINE_FACTORY.make(this); + dispatcher = scheduler.getDispatcher(); + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + fs = context.fs; + componentsFinder = new UpgradeComponentsFinder + .DefaultUpgradeComponentsFinder(); + } + + @Override + public void handle(ServiceEvent event) { + try { + writeLock.lock(); + State oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitionException e) { + LOG.error(MessageFormat.format( + "[SERVICE]: Invalid event {0} at {1}.", event.getType(), + oldState), e); + } + if (oldState != getState()) { + LOG.info("[SERVICE] Transitioned from {} to {} on {} event.", + oldState, getState(), event.getType()); + } + } finally { + writeLock.unlock(); + } + } + + private State getState() { + this.readLock.lock(); + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + + private static class StartUpgradeTransition implements + MultipleArcTransition { + + @Override + public State transition(ServiceManager serviceManager, + ServiceEvent event) { + try { + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( + serviceManager.fs, serviceManager.getName(), event.getVersion()); + + serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + List + compsThatNeedUpgrade = serviceManager.componentsFinder. + findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec); + + if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { + compsThatNeedUpgrade.forEach(component -> { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE). + setTargetSpec(component); + serviceManager.dispatcher.getEventHandler().handle( + needUpgradeEvent); + }); + } + serviceManager.upgradeVersion = event.getVersion(); + return State.UPGRADING; + } catch (Throwable e) { + LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(), + e); + return State.STABLE; + } + } + } + + private static class StopUpgradeTransition implements + MultipleArcTransition { + + @Override + public State transition(ServiceManager serviceManager, + ServiceEvent event) { + //abort is not supported currently + //trigger re-check of service state + ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler, + true); + if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) { + return serviceManager.finalizeUpgrade() ? State.STABLE : + State.UPGRADING; + } else { + return State.UPGRADING; + } + } + } + + /** + * @return whether finalization of upgrade was successful. + */ + private boolean finalizeUpgrade() { + try { + Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade( + fs, getName(), upgradeVersion); + ServiceApiUtil.writeAppDefinition(fs, + ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec); + } catch (IOException e) { + LOG.error("Upgrade did not complete because unable to overwrite the" + + " service definition", e); + return false; + } + + try { + fs.deleteClusterUpgradeDir(getName(), upgradeVersion); + } catch (IOException e) { + LOG.warn("Unable to delete upgrade definition for service {} " + + "version {}", getName(), upgradeVersion); + } + serviceSpec.setVersion(upgradeVersion); + upgradeVersion = null; + return true; + } + + /** + * Returns the name of the service. + */ + public String getName() { + return serviceSpec.getName(); + } + + /** + * State of {@link ServiceManager}. + */ + public enum State { + STABLE, UPGRADING + } + + + @VisibleForTesting + Service getServiceSpec() { + return serviceSpec; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 63331977b2f..79eef49e0c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -110,6 +110,9 @@ public class ServiceScheduler extends CompositeService { LoggerFactory.getLogger(ServiceScheduler.class); private Service app; + // This encapsulates the app with methods to upgrade the app. + private ServiceManager serviceManager; + // component_name -> component private final Map componentsByName = new ConcurrentHashMap<>(); @@ -192,6 +195,7 @@ public class ServiceScheduler extends CompositeService { addIfService(nmClient); dispatcher = new AsyncDispatcher("Component dispatcher"); + dispatcher.register(ServiceEventType.class, new ServiceEventHandler()); dispatcher.register(ComponentEventType.class, new ComponentEventHandler()); dispatcher.register(ComponentInstanceEventType.class, @@ -300,6 +304,7 @@ public class ServiceScheduler extends CompositeService { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); + serviceManager = new ServiceManager(context); // recover components based on containers sent from RM recoverComponents(response); @@ -510,6 +515,20 @@ public class ServiceScheduler extends CompositeService { } } + private final class ServiceEventHandler + implements EventHandler { + @Override + public void handle(ServiceEvent event) { + try { + serviceManager.handle(event); + } catch (Throwable t) { + LOG.error(MessageFormat + .format("[SERVICE]: Error in handling event type {0}", + event.getType()), t); + } + } + } + private final class ComponentEventHandler implements EventHandler { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java new file mode 100644 index 00000000000..e18697b2f71 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Finds all the the target component specs. + */ +public interface UpgradeComponentsFinder { + + List findTargetComponentSpecs(Service currentDef, + Service targetDef); + + /** + * Default implementation of {@link UpgradeComponentsFinder} that finds all + * the target component specs. + */ + class DefaultUpgradeComponentsFinder implements UpgradeComponentsFinder { + + @Override + public List findTargetComponentSpecs(Service currentDef, + Service targetDef) { + if (currentDef.getComponents().size() != + targetDef.getComponents().size()) { + throw new UnsupportedOperationException( + "addition/deletion of components not supported by upgrade"); + } + if (!currentDef.getKerberosPrincipal().equals( + targetDef.getKerberosPrincipal())) { + throw new UnsupportedOperationException("changes to kerberos " + + "principal not supported by upgrade"); + } + if (!Objects.equals(currentDef.getQueue(), targetDef.getQueue())) { + throw new UnsupportedOperationException("changes to queue " + + "not supported by upgrade"); + } + if (!Objects.equals(currentDef.getPlacementPolicy(), + targetDef.getPlacementPolicy())) { + throw new UnsupportedOperationException("changes to placement policy " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) { + throw new UnsupportedOperationException("changes to resource " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getDescription(), + targetDef.getDescription())) { + throw new UnsupportedOperationException("changes to description " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getQuicklinks(), + targetDef.getQuicklinks())) { + throw new UnsupportedOperationException("changes to quick links " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getLaunchTime(), + targetDef.getLaunchTime())) { + throw new UnsupportedOperationException("changes to launch time " + + "not supported by upgrade"); + } + + + if (!Objects.equals(currentDef.getLifetime(), + targetDef.getLifetime())) { + throw new UnsupportedOperationException("changes to lifetime " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getConfiguration(), + currentDef.getConfiguration())) { + return targetDef.getComponents(); + } + + if (!Objects.equals(currentDef.getArtifact(), targetDef.getArtifact())) { + return targetDef.getComponents(); + } + + List targetComps = new ArrayList<>(); + targetDef.getComponents().forEach(component -> { + Component currentComp = currentDef.getComponent(component.getName()); + + if (!Objects.equals(currentComp.getName(), component.getName())) { + throw new UnsupportedOperationException( + "changes to component name not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getDependencies(), + component.getDependencies())) { + throw new UnsupportedOperationException( + "changes to component dependencies not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getReadinessCheck(), + component.getReadinessCheck())) { + throw new UnsupportedOperationException( + "changes to component readiness check not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getResource(), + component.getResource())) { + throw new UnsupportedOperationException( + "changes to component resource not supported by upgrade"); + } + + + if (!Objects.equals(currentComp.getRunPrivilegedContainer(), + component.getRunPrivilegedContainer())) { + throw new UnsupportedOperationException( + "changes to run privileged container not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getPlacementPolicy(), + component.getPlacementPolicy())) { + throw new UnsupportedOperationException( + "changes to component placement policy not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getQuicklinks(), + component.getQuicklinks())) { + throw new UnsupportedOperationException( + "changes to component quick links not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getArtifact(), + component.getArtifact()) || + !Objects.equals(currentComp.getLaunchCommand(), + component.getLaunchCommand()) || + !Objects.equals(currentComp.getConfiguration(), + component.getConfiguration())) { + targetComps.add(component); + } + }); + return targetComps; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java index 702a9ae2c31..f7eda7bccf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java @@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable @ApiModel(description = "The current state of a component.") public enum ComponentState { - FLEXING, STABLE + FLEXING, STABLE, NEEDS_UPGRADE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 902a0b10d54..286eaa24972 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,5 @@ import org.apache.hadoop.classification.InterfaceStability; @ApiModel(description = "The current state of an service.") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { - ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX; + ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index d7148007c6c..1ea20afbb0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.client; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -55,7 +56,9 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.api.records.Component; @@ -73,8 +76,8 @@ import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; -import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ServiceUtils; +import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ZookeeperUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -186,6 +189,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return service; } + @Override public int actionSave(String fileName, String serviceName, Long lifetime, String queue) throws IOException, YarnException { return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName, @@ -194,9 +198,54 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, public int actionBuild(Service service) throws YarnException, IOException { - Path appDir = checkAppNotExistOnHdfs(service); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); - createDirAndPersistApp(appDir, service); + Path appDir = checkAppNotExistOnHdfs(service, false); + ServiceApiUtil.createDirAndPersistApp(fs, appDir, service); + return EXIT_SUCCESS; + } + + @Override + public int actionUpgrade(String appName, String fileName) + throws IOException, YarnException { + checkAppExistOnHdfs(appName); + Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, + null, null); + return actionUpgrade(upgradeService); + } + + public int actionUpgrade(Service service) throws YarnException, IOException { + Service persistedService = + ServiceApiUtil.loadService(fs, service.getName()); + if (!StringUtils.isEmpty(persistedService.getId())) { + cachedAppInfo.put(persistedService.getName(), new AppInfo( + ApplicationId.fromString(persistedService.getId()), + persistedService.getKerberosPrincipal().getPrincipalName())); + } + + if (persistedService.getVersion().equals(service.getVersion())) { + String message = + service.getName() + " is already at version " + service.getVersion() + + ". There is nothing to upgrade."; + LOG.error(message); + throw new YarnException(message); + } + + Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); + ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); + ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); + + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(service.getName())); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(service.getName() + " AM hostname is empty"); + } + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + + UpgradeServiceRequestProto.Builder requestBuilder = + UpgradeServiceRequestProto.newBuilder(); + requestBuilder.setVersion(service.getVersion()); + + proxy.upgrade(requestBuilder.build()); return EXIT_SUCCESS; } @@ -212,16 +261,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, String serviceName = service.getName(); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); verifyNoLiveAppInRM(serviceName, "create"); - Path appDir = checkAppNotExistOnHdfs(service); + Path appDir = checkAppNotExistOnHdfs(service, false); // Write the definition first and then submit - AM will read the definition - createDirAndPersistApp(appDir, service); + ServiceApiUtil.createDirAndPersistApp(fs, appDir, service); ApplicationId appId = submitApp(service); cachedAppInfo.put(serviceName, new AppInfo(appId, service .getKerberosPrincipal().getPrincipalName())); service.setId(appId.toString()); // update app definition with appId - persistAppDef(appDir, service); + ServiceApiUtil.writeAppDefinition(fs, appDir, service); return appId; } @@ -349,6 +398,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return original; } + @Override public int actionStop(String serviceName) throws YarnException, IOException { return actionStop(serviceName, true); @@ -424,6 +474,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return EXIT_SUCCESS; } + @Override public int actionDestroy(String serviceName) throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); @@ -557,8 +608,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } } - private ApplicationId submitApp(Service app) - throws IOException, YarnException { + @VisibleForTesting + ApplicationId submitApp(Service app) throws IOException, YarnException { String serviceName = app.getName(); Configuration conf = getConfig(); Path appRootDir = fs.buildClusterDirPath(app.getName()); @@ -777,29 +828,64 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return hasAMLog4j; } + @Override public int actionStart(String serviceName) throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); - Path appDir = checkAppExistOnHdfs(serviceName); - Service service = ServiceApiUtil.loadService(fs, serviceName); - ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); - // see if it is actually running and bail out; - verifyNoLiveAppInRM(serviceName, "start"); - ApplicationId appId = submitApp(service); - service.setId(appId.toString()); - // write app definition on to hdfs - Path appJson = persistAppDef(appDir, service); - LOG.info("Persisted service " + service.getName() + " at " + appJson); - return 0; + Service liveService = getStatus(serviceName); + if (liveService == null || + !liveService.getState().equals(ServiceState.UPGRADING)) { + Path appDir = checkAppExistOnHdfs(serviceName); + Service service = ServiceApiUtil.loadService(fs, serviceName); + ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); + // see if it is actually running and bail out; + verifyNoLiveAppInRM(serviceName, "start"); + ApplicationId appId = submitApp(service); + service.setId(appId.toString()); + // write app definition on to hdfs + Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service); + LOG.info("Persisted service " + service.getName() + " at " + appJson); + return 0; + } else { + LOG.info("Finalize service {} upgrade"); + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(serviceName)); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(serviceName + " AM hostname is empty"); + } + ClientAMProtocol proxy = createAMProxy(serviceName, appReport); + + RestartServiceRequestProto.Builder requestBuilder = + RestartServiceRequestProto.newBuilder(); + proxy.restart(requestBuilder.build()); + return 0; + } } - private Path checkAppNotExistOnHdfs(Service service) + /** + * Verifies that the service definition does not exist on hdfs. + * + * @param service service + * @param isUpgrade true for upgrades; false otherwise + * @return path to the service definition.. + * @throws IOException + * @throws SliderException + */ + private Path checkAppNotExistOnHdfs(Service service, boolean isUpgrade) throws IOException, SliderException { - Path appDir = fs.buildClusterDirPath(service.getName()); + Path appDir = !isUpgrade ? fs.buildClusterDirPath(service.getName()) : + fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion()); fs.verifyDirectoryNonexistent( new Path(appDir, service.getName() + ".json")); return appDir; } + /** + * Verifies that the service exists on hdfs. + * @param serviceName service name + * @return path to the service definition. + * @throws IOException + * @throws SliderException + */ private Path checkAppExistOnHdfs(String serviceName) throws IOException, SliderException { Path appDir = fs.buildClusterDirPath(serviceName); @@ -807,20 +893,6 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return appDir; } - private void createDirAndPersistApp(Path appDir, Service service) - throws IOException, SliderException { - FsPermission appDirPermission = new FsPermission("750"); - fs.createWithPermissions(appDir, appDirPermission); - Path appJson = persistAppDef(appDir, service); - LOG.info("Persisted service " + service.getName() + " at " + appJson); - } - - private Path persistAppDef(Path appDir, Service service) throws IOException { - Path appJson = new Path(appDir, service.getName() + ".json"); - jsonSerDeser.save(fs.getFileSystem(), appJson, service, true); - return appJson; - } - private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) { @@ -1079,6 +1151,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, UserGroupInformation.getCurrentUser(), rpc, address); } + @VisibleForTesting + void setFileSystem(SliderFileSystem fileSystem) + throws IOException { + this.fs = fileSystem; + } + + @VisibleForTesting + void setYarnClient(YarnClient yarnClient) { + this.yarnClient = yarnClient; + } + public synchronized ApplicationId getAppId(String serviceName) throws IOException, YarnException { if (cachedAppInfo.containsKey(serviceName)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 9e10ae68398..b521504540b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -138,6 +138,12 @@ public class Component implements EventHandler { // For flex down, go to STABLE state .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) + .addTransition(STABLE, UPGRADING, UPGRADE, + new ComponentNeedsUpgradeTransition()) + .addTransition(FLEXING, UPGRADING, UPGRADE, + new ComponentNeedsUpgradeTransition()) + .addTransition(UPGRADING, UPGRADING, UPGRADE, + new ComponentNeedsUpgradeTransition()) .installTopology(); public Component( @@ -356,6 +362,14 @@ public class Component implements EventHandler { } } + private static class ComponentNeedsUpgradeTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + component.componentSpec.setState(org.apache.hadoop.yarn.service.api. + records.ComponentState.NEEDS_UPGRADE); + } + } + public void removePendingInstance(ComponentInstance instance) { pendingInstances.remove(instance); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 447b436fc9d..7bd5cb9399f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.component; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -32,6 +33,7 @@ public class ComponentEvent extends AbstractEvent { private ComponentInstance instance; private ContainerStatus status; private ContainerId containerId; + private org.apache.hadoop.yarn.service.api.records.Component targetSpec; public ContainerId getContainerId() { return containerId; @@ -91,4 +93,14 @@ public class ComponentEvent extends AbstractEvent { this.status = status; return this; } + + public org.apache.hadoop.yarn.service.api.records.Component getTargetSpec() { + return targetSpec; + } + + public ComponentEvent setTargetSpec( + org.apache.hadoop.yarn.service.api.records.Component targetSpec) { + this.targetSpec = Preconditions.checkNotNull(targetSpec); + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 067302de6a4..970788aadcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -23,5 +23,7 @@ public enum ComponentEventType { CONTAINER_ALLOCATED, CONTAINER_RECOVERED, CONTAINER_STARTED, - CONTAINER_COMPLETED + CONTAINER_COMPLETED, + UPGRADE, + STOP_UPGRADE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java index a5f9ff4693a..0f63d03e95e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java @@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.service.component; public enum ComponentState { INIT, FLEXING, - STABLE + STABLE, + UPGRADING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java index 0378d243483..7b474f6e249 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java @@ -92,4 +92,6 @@ public interface YarnServiceConstants { String CONTENT = "content"; String PRINCIPAL = "yarn.service.am.principal"; + + String UPGRADE_DIR = "upgrade"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index 33e33a62269..8152225e8a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; public class ClientAMProtocolPBClientImpl implements ClientAMProtocol, Closeable { @@ -88,4 +92,26 @@ public class ClientAMProtocolPBClientImpl RPC.stopProxy(this.proxy); } } + + @Override + public UpgradeServiceResponseProto upgrade( + UpgradeServiceRequestProto request) throws IOException, YarnException { + try { + return proxy.upgradeService(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } + + @Override + public RestartServiceResponseProto restart(RestartServiceRequestProto request) + throws IOException, YarnException { + try { + return proxy.restartService(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index 710078112a0..1a1a1ef01ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -25,6 +25,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import java.io.IOException; @@ -67,4 +71,24 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB { throw new ServiceException(e); } } + + @Override + public UpgradeServiceResponseProto upgradeService(RpcController controller, + UpgradeServiceRequestProto request) throws ServiceException { + try { + return real.upgrade(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } + + @Override + public RestartServiceResponseProto restartService(RpcController controller, + RestartServiceRequestProto request) throws ServiceException { + try { + return real.restart(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java index 284825ee119..5c2bac6897f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -112,10 +111,38 @@ public class CoreFileSystem { public Path buildClusterDirPath(String clustername) { Preconditions.checkNotNull(clustername); Path path = getBaseApplicationPath(); - return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername); + return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + + clustername); } + /** + * Build up the upgrade path string for a cluster. No attempt to + * create the directory is made. + * + * @param clusterName name of the cluster + * @param version version of the cluster + * @return the upgrade path to the cluster + */ + public Path buildClusterUpgradeDirPath(String clusterName, String version) { + Preconditions.checkNotNull(clusterName); + Preconditions.checkNotNull(version); + return new Path(buildClusterDirPath(clusterName), + YarnServiceConstants.UPGRADE_DIR + "/" + version); + } + /** + * Delete the upgrade cluster directory. + * @param clusterName name of the cluster + * @param version version of the cluster + * @throws IOException + */ + public void deleteClusterUpgradeDir(String clusterName, String version) + throws IOException { + Preconditions.checkNotNull(clusterName); + Preconditions.checkNotNull(version); + Path upgradeCluster = buildClusterUpgradeDirPath(clusterName, version); + fileSystem.delete(upgradeCluster, true); + } /** * Build up the path string for keytab install location -no attempt to * create the directory is made diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 05917758476..13d9a37ccbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -22,9 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.security.HadoopKerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Service; @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Resource; +import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.ProviderFactory; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; @@ -275,6 +276,14 @@ public class ServiceApiUtil { return jsonSerDeser.load(fs.getFileSystem(), serviceJson); } + public static Service loadServiceUpgrade(SliderFileSystem fs, + String serviceName, String version) throws IOException { + Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version); + Path versionedDef = new Path(versionPath, serviceName + ".json"); + LOG.info("Loading service definition from {}", versionedDef); + return jsonSerDeser.load(fs.getFileSystem(), versionedDef); + } + public static Service loadServiceFrom(SliderFileSystem fs, Path appDefPath) throws IOException { LOG.info("Loading service definition from " + appDefPath); @@ -429,6 +438,23 @@ public class ServiceApiUtil { return sortByDependencies(components, sortedComponents); } + public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir, + Service service) + throws IOException, SliderException { + FsPermission appDirPermission = new FsPermission("750"); + fs.createWithPermissions(appDir, appDirPermission); + Path appJson = writeAppDefinition(fs, appDir, service); + LOG.info("Persisted service {} version {} at {}", service.getName(), + service.getVersion(), appJson); + } + + public static Path writeAppDefinition(SliderFileSystem fs, Path appDir, + Service service) throws IOException { + Path appJson = new Path(appDir, service.getName() + ".json"); + jsonSerDeser.save(fs.getFileSystem(), appJson, service, true); + return appJson; + } + public static String $(String s) { return "${" + s +"}"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 0a21c240d70..3677593971a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -26,6 +26,10 @@ service ClientAMProtocolService { rpc flexComponents(FlexComponentsRequestProto) returns (FlexComponentsResponseProto); rpc getStatus(GetStatusRequestProto) returns (GetStatusResponseProto); rpc stop(StopRequestProto) returns (StopResponseProto); + rpc upgradeService(UpgradeServiceRequestProto) + returns (UpgradeServiceResponseProto); + rpc restartService(RestartServiceRequestProto) + returns (RestartServiceResponseProto); } message FlexComponentsRequestProto { @@ -37,7 +41,7 @@ message ComponentCountProto { optional int64 numberOfContainers = 2; } -message FlexComponentsResponseProto{ +message FlexComponentsResponseProto { } message GetStatusRequestProto { @@ -53,4 +57,17 @@ message StopRequestProto { message StopResponseProto { +} + +message UpgradeServiceRequestProto { + optional string version = 1; +} + +message UpgradeServiceResponseProto { +} + +message RestartServiceRequestProto { +} + +message RestartServiceResponseProto { } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 99332116af3..8347eb35d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.base.Throwables; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.hadoop.conf.Configuration; @@ -26,18 +27,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Resource; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.codehaus.jackson.map.PropertyNamingStrategy; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +53,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URL; +import java.nio.file.Paths; +import java.util.Map; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; @@ -78,7 +86,7 @@ public class ServiceTestUtils { // Example service definition // 2 components, each of which has 2 containers. - protected Service createExampleApplication() { + public static Service createExampleApplication() { Service exampleApp = new Service(); exampleApp.setName("example-app"); exampleApp.setVersion("v1"); @@ -176,7 +184,7 @@ public class ServiceTestUtils { zkCluster.start(); conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString()); conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); - LOG.info("ZK cluster: " + zkCluster.getConnectString()); + LOG.info("ZK cluster: " + zkCluster.getConnectString()); fs = FileSystem.get(conf); basedir = new File("target", "apps"); @@ -268,4 +276,78 @@ public class ServiceTestUtils { } } + /** + * Creates a {@link ServiceClient} for test purposes. + */ + public static ServiceClient createClient(Configuration conf) + throws Exception { + ServiceClient client = new ServiceClient() { + @Override + protected Path addJarResource(String appName, + Map localResources) + throws IOException, SliderException { + // do nothing, the Unit test will use local jars + return null; + } + }; + client.init(conf); + client.start(); + return client; + } + + + /** + * Watcher to initialize yarn service base path under target and deletes the + * the test directory when finishes. + */ + public static class ServiceFSWatcher extends TestWatcher { + private YarnConfiguration conf; + private SliderFileSystem fs; + private java.nio.file.Path serviceBasePath; + + @Override + protected void starting(Description description) { + conf = new YarnConfiguration(); + delete(description); + serviceBasePath = Paths.get("target", + description.getClassName(), description.getMethodName()); + conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString()); + try { + fs = new SliderFileSystem(conf); + } catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + protected void finished(Description description) { + delete(description); + } + + private void delete(Description description) { + FileUtils.deleteQuietly(Paths.get("target", + description.getClassName()).toFile()); + } + + /** + * Returns the yarn conf. + */ + public YarnConfiguration getConf() { + return conf; + } + + /** + * Returns the file system. + */ + public SliderFileSystem getFs() { + return fs; + } + + /** + * Returns the test service base path. + */ + public java.nio.file.Path getServiceBasePath() { + return serviceBasePath; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java new file mode 100644 index 00000000000..086fdbe0aff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link UpgradeComponentsFinder.DefaultUpgradeComponentsFinder}. + */ +public class TestDefaultUpgradeComponentsFinder { + + private UpgradeComponentsFinder.DefaultUpgradeComponentsFinder finder = + new UpgradeComponentsFinder.DefaultUpgradeComponentsFinder(); + + @Test + public void testServiceArtifactChange() { + Service currentDef = ServiceTestUtils.createExampleApplication(); + Service targetDef = ServiceTestUtils.createExampleApplication(); + targetDef.getComponents().forEach(x -> x.setArtifact( + TestServiceManager.createTestArtifact("v1"))); + + Assert.assertEquals("all components need upgrade", + targetDef.getComponents(), finder.findTargetComponentSpecs(currentDef, + targetDef)); + } + + @Test + public void testComponentArtifactChange() { + Service currentDef = TestServiceManager.createBaseDef("test"); + Service targetDef = TestServiceManager.createBaseDef("test"); + + targetDef.getComponents().get(0).setArtifact( + TestServiceManager.createTestArtifact("v2")); + + List expected = new ArrayList<>(); + expected.add(targetDef.getComponents().get(0)); + + Assert.assertEquals("single components needs upgrade", + expected, finder.findTargetComponentSpecs(currentDef, + targetDef)); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java new file mode 100644 index 00000000000..c65a5d4efe6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link ServiceManager}. + */ +public class TestServiceManager { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testUpgrade() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testUpgrade"); + upgrade(serviceManager, "v2", false); + Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, + serviceManager.getServiceSpec().getState()); + } + + @Test + public void testRestartNothingToUpgrade() + throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testRestart"); + upgrade(serviceManager, "v2", false); + + //make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> { + comp.setState(ComponentState.STABLE); + }); + serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + Assert.assertEquals("service not re-started", ServiceState.STABLE, + serviceManager.getServiceSpec().getState()); + } + + @Test + public void testRestartWithPendingUpgrade() + throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testRestart"); + upgrade(serviceManager, "v2", true); + serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + Assert.assertEquals("service should still be upgrading", + ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + } + + + private void upgrade(ServiceManager service, String version, + boolean upgradeArtifact) + throws IOException, SliderException { + Service upgradedDef = ServiceTestUtils.createExampleApplication(); + upgradedDef.setName(service.getName()); + upgradedDef.setVersion(version); + if (upgradeArtifact) { + Artifact upgradedArtifact = createTestArtifact("2"); + upgradedDef.getComponents().forEach(component -> { + component.setArtifact(upgradedArtifact); + }); + } + writeUpgradedDef(upgradedDef); + ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); + upgradeEvent.setVersion("v2"); + service.handle(upgradeEvent); + } + + private ServiceManager createTestServiceManager(String name) + throws IOException { + ServiceContext context = new ServiceContext(); + context.service = createBaseDef(name); + context.fs = rule.getFs(); + + context.scheduler = new ServiceScheduler(context) { + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + }; + + context.scheduler.init(rule.getConf()); + + Map + componentState = context.scheduler.getAllComponents(); + context.service.getComponents().forEach(component -> { + componentState.put(component.getName(), + new org.apache.hadoop.yarn.service.component.Component(component, + 1L, context)); + }); + return new ServiceManager(context); + } + + static Service createBaseDef(String name) { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service serviceDef = ServiceTestUtils.createExampleApplication(); + serviceDef.setId(applicationId.toString()); + serviceDef.setName(name); + serviceDef.setState(ServiceState.STARTED); + Artifact artifact = createTestArtifact("1"); + + serviceDef.getComponents().forEach(component -> + component.setArtifact(artifact)); + return serviceDef; + } + + static Artifact createTestArtifact(String artifactId) { + Artifact artifact = new Artifact(); + artifact.setId(artifactId); + artifact.setType(Artifact.TypeEnum.TARBALL); + return artifact; + } + + private void writeUpgradedDef(Service upgradedDef) + throws IOException, SliderException { + Path upgradePath = rule.getFs().buildClusterUpgradeDirPath( + upgradedDef.getName(), upgradedDef.getVersion()); + ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath, + upgradedDef); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 51a190e4013..2b40e494596 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.client.ServiceClient; -import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.hamcrest.CoreMatchers; import org.junit.After; @@ -86,7 +86,7 @@ public class TestYarnNativeServices extends ServiceTestUtils { @Test (timeout = 200000) public void testCreateFlexStopDestroyService() throws Exception { setupInternal(NUM_NMS); - ServiceClient client = createClient(); + ServiceClient client = createClient(getConf()); Service exampleApp = createExampleApplication(); client.actionCreate(exampleApp); SliderFileSystem fileSystem = new SliderFileSystem(getConf()); @@ -143,7 +143,7 @@ public class TestYarnNativeServices extends ServiceTestUtils { @Test (timeout = 200000) public void testComponentStartOrder() throws Exception { setupInternal(NUM_NMS); - ServiceClient client = createClient(); + ServiceClient client = createClient(getConf()); Service exampleApp = new Service(); exampleApp.setName("teststartorder"); exampleApp.setVersion("v1"); @@ -177,7 +177,7 @@ public class TestYarnNativeServices extends ServiceTestUtils { String userB = "userb"; setupInternal(NUM_NMS); - ServiceClient client = createClient(); + ServiceClient client = createClient(getConf()); String origBasePath = getConf().get(YARN_SERVICE_BASE_PATH); Service userAApp = new Service(); @@ -229,7 +229,7 @@ public class TestYarnNativeServices extends ServiceTestUtils { System.setProperty("user.name", user); setupInternal(NUM_NMS); - ServiceClient client = createClient(); + ServiceClient client = createClient(getConf()); Service appA = new Service(); appA.setName(sameAppName); @@ -298,7 +298,7 @@ public class TestYarnNativeServices extends ServiceTestUtils { setConf(conf); setupInternal(NUM_NMS); - ServiceClient client = createClient(); + ServiceClient client = createClient(getConf()); Service exampleApp = createExampleApplication(); client.actionCreate(exampleApp); Multimap containersBeforeFailure = @@ -341,6 +341,28 @@ public class TestYarnNativeServices extends ServiceTestUtils { client.actionDestroy(exampleApp.getName()); } + @Test(timeout = 200000) + public void testUpgradeService() throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + + Service service = createExampleApplication(); + client.actionCreate(service); + waitForServiceToBeStarted(client, service); + + //upgrade the service + service.setVersion("v2"); + client.actionUpgrade(service); + + //wait for service to be in upgrade state + waitForServiceToBeInState(client, service, ServiceState.UPGRADING); + SliderFileSystem fs = new SliderFileSystem(getConf()); + Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs, + service.getName(), service.getVersion()); + Assert.assertEquals(service.getName(), fromFs.getName()); + Assert.assertEquals(service.getVersion(), fromFs.getVersion()); + } + // Check containers launched are in dependency order // Get all containers into a list and sort based on container launch time e.g. // compa-c1, compa-c2, compb-c1, compb-c2; @@ -478,16 +500,7 @@ public class TestYarnNativeServices extends ServiceTestUtils { */ private void waitForServiceToBeStable(ServiceClient client, Service exampleApp) throws TimeoutException, InterruptedException { - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - System.out.println(retrievedApp); - return retrievedApp.getState() == ServiceState.STABLE; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, 200000); + waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE); } /** @@ -500,11 +513,25 @@ public class TestYarnNativeServices extends ServiceTestUtils { */ private void waitForServiceToBeStarted(ServiceClient client, Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState) throws TimeoutException, + InterruptedException { GenericTestUtils.waitFor(() -> { try { Service retrievedApp = client.getStatus(exampleApp.getName()); System.out.println(retrievedApp); - return retrievedApp.getState() == ServiceState.STARTED; + return retrievedApp.getState() == desiredState; } catch (Exception e) { e.printStackTrace(); return false; @@ -512,21 +539,6 @@ public class TestYarnNativeServices extends ServiceTestUtils { }, 2000, 200000); } - private ServiceClient createClient() throws Exception { - ServiceClient client = new ServiceClient() { - @Override protected Path addJarResource(String appName, - Map localResources) - throws IOException, SliderException { - // do nothing, the Unit test will use local jars - return null; - } - }; - client.init(getConf()); - client.start(); - return client; - } - - private int countTotalContainers(Service service) { int totalContainers = 0; for (Component component : service.getComponents()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java new file mode 100644 index 00000000000..cc5b6ec7fe4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.ClientAMProtocol; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Matchers; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link ServiceClient}. + */ +public class TestServiceClient { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testActionUpgrade() throws Exception { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + ServiceClient client = createServiceClient(applicationId); + + Service service = ServiceTestUtils.createExampleApplication(); + service.setVersion("v1"); + client.actionCreate(service); + + //upgrade the service + service.setVersion("v2"); + client.actionUpgrade(service); + + //wait for service to be in upgrade state + Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(), + service.getName(), service.getVersion()); + Assert.assertEquals(service.getName(), fromFs.getName()); + Assert.assertEquals(service.getVersion(), fromFs.getVersion()); + } + + + private ServiceClient createServiceClient(ApplicationId applicationId) + throws Exception { + ClientAMProtocol amProxy = mock(ClientAMProtocol.class); + YarnClient yarnClient = createMockYarnClient(); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( + applicationId, 1); + ApplicationAttemptReport attemptReport = + ApplicationAttemptReport.newInstance(attemptId, "localhost", 0, + null, null, null, + YarnApplicationAttemptState.RUNNING, null); + + ApplicationReport appReport = mock(ApplicationReport.class); + when(appReport.getHost()).thenReturn("localhost"); + + when(yarnClient.getApplicationAttemptReport(Matchers.any())) + .thenReturn(attemptReport); + when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport); + + ServiceClient client = new ServiceClient() { + @Override + protected void serviceInit(Configuration configuration) throws Exception { + } + + @Override + protected ClientAMProtocol createAMProxy(String serviceName, + ApplicationReport appReport) throws IOException, YarnException { + return amProxy; + } + + @Override + ApplicationId submitApp(Service app) throws IOException, YarnException { + return applicationId; + } + }; + + client.setFileSystem(rule.getFs()); + client.setYarnClient(yarnClient); + + client.init(rule.getConf()); + client.start(); + return client; + } + + private YarnClient createMockYarnClient() throws IOException, YarnException { + YarnClient yarnClient = mock(YarnClient.class); + when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class))) + .thenReturn(new ArrayList<>()); + return yarnClient; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestCoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestCoreFileSystem.java new file mode 100644 index 00000000000..ba4a65813d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestCoreFileSystem.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link CoreFileSystem}. + */ +public class TestCoreFileSystem { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testClusterUpgradeDirPath() { + String serviceName = "testClusterUpgrade"; + String version = "v1"; + Path expectedPath = new Path(rule.getFs().buildClusterDirPath(serviceName), + YarnServiceConstants.UPGRADE_DIR + "/" + version); + Assert.assertEquals("incorrect upgrade path", expectedPath, + rule.getFs().buildClusterUpgradeDirPath(serviceName, version)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index a09663e618e..d5eb787c706 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -229,4 +229,20 @@ public abstract class AppAdminClient extends CompositeService { @Unstable public abstract String getStatusString(String appIdOrName) throws IOException, YarnException; + + /** + * Upgrade a long running service. + * + * @param appName the name of the application + * @param fileName specification of application upgrade to save. + * + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionUpgrade(String appName, String fileName) + throws IOException, YarnException; + }