YARN-8018. Added support for initiating yarn service upgrade.

Contributed by Chandni Singh

(cherry picked from commit 27d60a1634)
This commit is contained in:
Eric Yang 2018-03-26 18:46:31 -04:00
parent 23179c06a3
commit 150085cc64
29 changed files with 1374 additions and 79 deletions

View File

@ -469,4 +469,22 @@ public class ApiServiceClient extends AppAdminClient {
return output;
}
@Override
public int actionUpgrade(String appName,
String fileName) throws IOException, YarnException {
int result;
try {
Service service =
loadAppJsonFromLocalFS(fileName, appName, null, null);
service.setState(ServiceState.UPGRADING);
String buffer = jsonSerDeser.toJson(service);
ClientResponse response = getApiClient()
.post(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
LOG.error("Failed to upgrade application: ", e);
result = EXIT_EXCEPTION_THROWN;
}
return result;
}
}

View File

@ -375,6 +375,12 @@ public class ApiServer {
&& updateServiceData.getLifetime() > 0) {
return updateLifetime(appName, updateServiceData, ugi);
}
// If an UPGRADE is requested
if (updateServiceData.getState() != null &&
updateServiceData.getState() == ServiceState.UPGRADING) {
return upgradeService(updateServiceData, ugi);
}
} catch (UndeclaredThrowableException e) {
return formatResponse(Status.BAD_REQUEST,
e.getCause().getMessage());
@ -475,6 +481,24 @@ public class ApiServer {
return formatResponse(Status.OK, status);
}
private Response upgradeService(Service service,
final UserGroupInformation ugi) throws IOException, InterruptedException {
ServiceStatus status = new ServiceStatus();
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
sc.actionUpgrade(service);
sc.close();
return null;
});
LOG.info("Service {} version {} upgrade initialized");
status.setDiagnostics("Service " + service.getName() +
" version " + service.getVersion() + " saved.");
status.setState(ServiceState.ACCEPTED);
return formatResponse(Status.ACCEPTED, status);
}
/**
* Used by negative test case.
*

View File

@ -23,8 +23,14 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import java.io.IOException;
@ -37,4 +43,10 @@ public interface ClientAMProtocol {
StopResponseProto stop(StopRequestProto requestProto)
throws IOException, YarnException;
UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request)
throws IOException, YarnException;
RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException;
}

View File

@ -33,8 +33,12 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.slf4j.Logger;
@ -142,4 +146,24 @@ public class ClientAMService extends AbstractService
public InetSocketAddress getBindAddress() {
return bindAddress;
}
@Override
public UpgradeServiceResponseProto upgrade(
UpgradeServiceRequestProto request) throws IOException {
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE);
event.setVersion(request.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(event);
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
UserGroupInformation.getCurrentUser());
return UpgradeServiceResponseProto.newBuilder().build();
}
@Override
public RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException {
ServiceEvent event = new ServiceEvent(ServiceEventType.START);
context.scheduler.getDispatcher().getEventHandler().handle(event);
LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser());
return RestartServiceResponseProto.newBuilder().build();
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.event.AbstractEvent;
/**
* Events are handled by {@link ServiceManager} to manage the service
* state.
*/
public class ServiceEvent extends AbstractEvent<ServiceEventType> {
private final ServiceEventType type;
private String version;
public ServiceEvent(ServiceEventType serviceEventType) {
super(serviceEventType);
this.type = serviceEventType;
}
public ServiceEventType getType() {
return type;
}
public String getVersion() {
return version;
}
public ServiceEvent setVersion(String version) {
this.version = version;
return this;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
/**
* Types of {@link ServiceEvent}.
*/
public enum ServiceEventType {
START,
UPGRADE,
STOP_UPGRADE
}

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Manages the state of the service.
*/
public class ServiceManager implements EventHandler<ServiceEvent> {
private static final Logger LOG = LoggerFactory.getLogger(
ServiceManager.class);
private final Service serviceSpec;
private final ServiceContext context;
private final ServiceScheduler scheduler;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
private final StateMachine<State, ServiceEventType, ServiceEvent>
stateMachine;
private final AsyncDispatcher dispatcher;
private final SliderFileSystem fs;
private final UpgradeComponentsFinder componentsFinder;
private String upgradeVersion;
private static final StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
new StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent>(State.STABLE)
.addTransition(State.STABLE, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.UPGRADE,
new StartUpgradeTransition())
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.START,
new StopUpgradeTransition())
.installTopology();
public ServiceManager(ServiceContext context) {
Preconditions.checkNotNull(context);
this.context = context;
serviceSpec = context.service;
scheduler = context.scheduler;
stateMachine = STATE_MACHINE_FACTORY.make(this);
dispatcher = scheduler.getDispatcher();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
fs = context.fs;
componentsFinder = new UpgradeComponentsFinder
.DefaultUpgradeComponentsFinder();
}
@Override
public void handle(ServiceEvent event) {
try {
writeLock.lock();
State oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error(MessageFormat.format(
"[SERVICE]: Invalid event {0} at {1}.", event.getType(),
oldState), e);
}
if (oldState != getState()) {
LOG.info("[SERVICE] Transitioned from {} to {} on {} event.",
oldState, getState(), event.getType());
}
} finally {
writeLock.unlock();
}
}
private State getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
private static class StartUpgradeTransition implements
MultipleArcTransition<ServiceManager, ServiceEvent, State> {
@Override
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
try {
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
serviceManager.fs, serviceManager.getName(), event.getVersion());
serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
List<org.apache.hadoop.yarn.service.api.records.Component>
compsThatNeedUpgrade = serviceManager.componentsFinder.
findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec);
if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
compsThatNeedUpgrade.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).
setTargetSpec(component);
serviceManager.dispatcher.getEventHandler().handle(
needUpgradeEvent);
});
}
serviceManager.upgradeVersion = event.getVersion();
return State.UPGRADING;
} catch (Throwable e) {
LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
e);
return State.STABLE;
}
}
}
private static class StopUpgradeTransition implements
MultipleArcTransition<ServiceManager, ServiceEvent, State> {
@Override
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
//abort is not supported currently
//trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler,
true);
if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) {
return serviceManager.finalizeUpgrade() ? State.STABLE :
State.UPGRADING;
} else {
return State.UPGRADING;
}
}
}
/**
* @return whether finalization of upgrade was successful.
*/
private boolean finalizeUpgrade() {
try {
Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade(
fs, getName(), upgradeVersion);
ServiceApiUtil.writeAppDefinition(fs,
ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec);
} catch (IOException e) {
LOG.error("Upgrade did not complete because unable to overwrite the" +
" service definition", e);
return false;
}
try {
fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
} catch (IOException e) {
LOG.warn("Unable to delete upgrade definition for service {} " +
"version {}", getName(), upgradeVersion);
}
serviceSpec.setVersion(upgradeVersion);
upgradeVersion = null;
return true;
}
/**
* Returns the name of the service.
*/
public String getName() {
return serviceSpec.getName();
}
/**
* State of {@link ServiceManager}.
*/
public enum State {
STABLE, UPGRADING
}
@VisibleForTesting
Service getServiceSpec() {
return serviceSpec;
}
}

View File

@ -110,6 +110,9 @@ public class ServiceScheduler extends CompositeService {
LoggerFactory.getLogger(ServiceScheduler.class);
private Service app;
// This encapsulates the <code>app</code> with methods to upgrade the app.
private ServiceManager serviceManager;
// component_name -> component
private final Map<String, Component> componentsByName =
new ConcurrentHashMap<>();
@ -192,6 +195,7 @@ public class ServiceScheduler extends CompositeService {
addIfService(nmClient);
dispatcher = new AsyncDispatcher("Component dispatcher");
dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
dispatcher.register(ComponentEventType.class,
new ComponentEventHandler());
dispatcher.register(ComponentInstanceEventType.class,
@ -300,6 +304,7 @@ public class ServiceScheduler extends CompositeService {
// Since AM has been started and registered, the service is in STARTED state
app.setState(ServiceState.STARTED);
serviceManager = new ServiceManager(context);
// recover components based on containers sent from RM
recoverComponents(response);
@ -510,6 +515,20 @@ public class ServiceScheduler extends CompositeService {
}
}
private final class ServiceEventHandler
implements EventHandler<ServiceEvent> {
@Override
public void handle(ServiceEvent event) {
try {
serviceManager.handle(event);
} catch (Throwable t) {
LOG.error(MessageFormat
.format("[SERVICE]: Error in handling event type {0}",
event.getType()), t);
}
}
}
private final class ComponentEventHandler
implements EventHandler<ComponentEvent> {
@Override

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Finds all the the target component specs.
*/
public interface UpgradeComponentsFinder {
List<Component> findTargetComponentSpecs(Service currentDef,
Service targetDef);
/**
* Default implementation of {@link UpgradeComponentsFinder} that finds all
* the target component specs.
*/
class DefaultUpgradeComponentsFinder implements UpgradeComponentsFinder {
@Override
public List<Component> findTargetComponentSpecs(Service currentDef,
Service targetDef) {
if (currentDef.getComponents().size() !=
targetDef.getComponents().size()) {
throw new UnsupportedOperationException(
"addition/deletion of components not supported by upgrade");
}
if (!currentDef.getKerberosPrincipal().equals(
targetDef.getKerberosPrincipal())) {
throw new UnsupportedOperationException("changes to kerberos " +
"principal not supported by upgrade");
}
if (!Objects.equals(currentDef.getQueue(), targetDef.getQueue())) {
throw new UnsupportedOperationException("changes to queue " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getPlacementPolicy(),
targetDef.getPlacementPolicy())) {
throw new UnsupportedOperationException("changes to placement policy " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) {
throw new UnsupportedOperationException("changes to resource " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getDescription(),
targetDef.getDescription())) {
throw new UnsupportedOperationException("changes to description " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getQuicklinks(),
targetDef.getQuicklinks())) {
throw new UnsupportedOperationException("changes to quick links " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getLaunchTime(),
targetDef.getLaunchTime())) {
throw new UnsupportedOperationException("changes to launch time " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getLifetime(),
targetDef.getLifetime())) {
throw new UnsupportedOperationException("changes to lifetime " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getConfiguration(),
currentDef.getConfiguration())) {
return targetDef.getComponents();
}
if (!Objects.equals(currentDef.getArtifact(), targetDef.getArtifact())) {
return targetDef.getComponents();
}
List<Component> targetComps = new ArrayList<>();
targetDef.getComponents().forEach(component -> {
Component currentComp = currentDef.getComponent(component.getName());
if (!Objects.equals(currentComp.getName(), component.getName())) {
throw new UnsupportedOperationException(
"changes to component name not supported by upgrade");
}
if (!Objects.equals(currentComp.getDependencies(),
component.getDependencies())) {
throw new UnsupportedOperationException(
"changes to component dependencies not supported by upgrade");
}
if (!Objects.equals(currentComp.getReadinessCheck(),
component.getReadinessCheck())) {
throw new UnsupportedOperationException(
"changes to component readiness check not supported by upgrade");
}
if (!Objects.equals(currentComp.getResource(),
component.getResource())) {
throw new UnsupportedOperationException(
"changes to component resource not supported by upgrade");
}
if (!Objects.equals(currentComp.getRunPrivilegedContainer(),
component.getRunPrivilegedContainer())) {
throw new UnsupportedOperationException(
"changes to run privileged container not supported by upgrade");
}
if (!Objects.equals(currentComp.getPlacementPolicy(),
component.getPlacementPolicy())) {
throw new UnsupportedOperationException(
"changes to component placement policy not supported by upgrade");
}
if (!Objects.equals(currentComp.getQuicklinks(),
component.getQuicklinks())) {
throw new UnsupportedOperationException(
"changes to component quick links not supported by upgrade");
}
if (!Objects.equals(currentComp.getArtifact(),
component.getArtifact()) ||
!Objects.equals(currentComp.getLaunchCommand(),
component.getLaunchCommand()) ||
!Objects.equals(currentComp.getConfiguration(),
component.getConfiguration())) {
targetComps.add(component);
}
});
return targetComps;
}
}
}

View File

@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
@ApiModel(description = "The current state of a component.")
public enum ComponentState {
FLEXING, STABLE
FLEXING, STABLE, NEEDS_UPGRADE;
}

View File

@ -29,5 +29,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@ApiModel(description = "The current state of an service.")
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX;
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@ -55,7 +56,9 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.api.records.Component;
@ -73,8 +76,8 @@ import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
@ -186,6 +189,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return service;
}
@Override
public int actionSave(String fileName, String serviceName, Long lifetime,
String queue) throws IOException, YarnException {
return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName,
@ -194,9 +198,54 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
public int actionBuild(Service service)
throws YarnException, IOException {
Path appDir = checkAppNotExistOnHdfs(service);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
createDirAndPersistApp(appDir, service);
Path appDir = checkAppNotExistOnHdfs(service, false);
ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
return EXIT_SUCCESS;
}
@Override
public int actionUpgrade(String appName, String fileName)
throws IOException, YarnException {
checkAppExistOnHdfs(appName);
Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
null, null);
return actionUpgrade(upgradeService);
}
public int actionUpgrade(Service service) throws YarnException, IOException {
Service persistedService =
ServiceApiUtil.loadService(fs, service.getName());
if (!StringUtils.isEmpty(persistedService.getId())) {
cachedAppInfo.put(persistedService.getName(), new AppInfo(
ApplicationId.fromString(persistedService.getId()),
persistedService.getKerberosPrincipal().getPrincipalName()));
}
if (persistedService.getVersion().equals(service.getVersion())) {
String message =
service.getName() + " is already at version " + service.getVersion()
+ ". There is nothing to upgrade.";
LOG.error(message);
throw new YarnException(message);
}
Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(service.getName()));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(service.getName() + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
UpgradeServiceRequestProto.Builder requestBuilder =
UpgradeServiceRequestProto.newBuilder();
requestBuilder.setVersion(service.getVersion());
proxy.upgrade(requestBuilder.build());
return EXIT_SUCCESS;
}
@ -212,16 +261,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
String serviceName = service.getName();
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
verifyNoLiveAppInRM(serviceName, "create");
Path appDir = checkAppNotExistOnHdfs(service);
Path appDir = checkAppNotExistOnHdfs(service, false);
// Write the definition first and then submit - AM will read the definition
createDirAndPersistApp(appDir, service);
ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
ApplicationId appId = submitApp(service);
cachedAppInfo.put(serviceName, new AppInfo(appId, service
.getKerberosPrincipal().getPrincipalName()));
service.setId(appId.toString());
// update app definition with appId
persistAppDef(appDir, service);
ServiceApiUtil.writeAppDefinition(fs, appDir, service);
return appId;
}
@ -349,6 +398,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return original;
}
@Override
public int actionStop(String serviceName)
throws YarnException, IOException {
return actionStop(serviceName, true);
@ -424,6 +474,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return EXIT_SUCCESS;
}
@Override
public int actionDestroy(String serviceName) throws YarnException,
IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
@ -557,8 +608,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
}
private ApplicationId submitApp(Service app)
throws IOException, YarnException {
@VisibleForTesting
ApplicationId submitApp(Service app) throws IOException, YarnException {
String serviceName = app.getName();
Configuration conf = getConfig();
Path appRootDir = fs.buildClusterDirPath(app.getName());
@ -777,29 +828,64 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return hasAMLog4j;
}
@Override
public int actionStart(String serviceName) throws YarnException, IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
Path appDir = checkAppExistOnHdfs(serviceName);
Service service = ServiceApiUtil.loadService(fs, serviceName);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
// see if it is actually running and bail out;
verifyNoLiveAppInRM(serviceName, "start");
ApplicationId appId = submitApp(service);
service.setId(appId.toString());
// write app definition on to hdfs
Path appJson = persistAppDef(appDir, service);
LOG.info("Persisted service " + service.getName() + " at " + appJson);
return 0;
Service liveService = getStatus(serviceName);
if (liveService == null ||
!liveService.getState().equals(ServiceState.UPGRADING)) {
Path appDir = checkAppExistOnHdfs(serviceName);
Service service = ServiceApiUtil.loadService(fs, serviceName);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
// see if it is actually running and bail out;
verifyNoLiveAppInRM(serviceName, "start");
ApplicationId appId = submitApp(service);
service.setId(appId.toString());
// write app definition on to hdfs
Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service " + service.getName() + " at " + appJson);
return 0;
} else {
LOG.info("Finalize service {} upgrade");
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(serviceName));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(serviceName + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(serviceName, appReport);
RestartServiceRequestProto.Builder requestBuilder =
RestartServiceRequestProto.newBuilder();
proxy.restart(requestBuilder.build());
return 0;
}
}
private Path checkAppNotExistOnHdfs(Service service)
/**
* Verifies that the service definition does not exist on hdfs.
*
* @param service service
* @param isUpgrade true for upgrades; false otherwise
* @return path to the service definition..
* @throws IOException
* @throws SliderException
*/
private Path checkAppNotExistOnHdfs(Service service, boolean isUpgrade)
throws IOException, SliderException {
Path appDir = fs.buildClusterDirPath(service.getName());
Path appDir = !isUpgrade ? fs.buildClusterDirPath(service.getName()) :
fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion());
fs.verifyDirectoryNonexistent(
new Path(appDir, service.getName() + ".json"));
return appDir;
}
/**
* Verifies that the service exists on hdfs.
* @param serviceName service name
* @return path to the service definition.
* @throws IOException
* @throws SliderException
*/
private Path checkAppExistOnHdfs(String serviceName)
throws IOException, SliderException {
Path appDir = fs.buildClusterDirPath(serviceName);
@ -807,20 +893,6 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return appDir;
}
private void createDirAndPersistApp(Path appDir, Service service)
throws IOException, SliderException {
FsPermission appDirPermission = new FsPermission("750");
fs.createWithPermissions(appDir, appDirPermission);
Path appJson = persistAppDef(appDir, service);
LOG.info("Persisted service " + service.getName() + " at " + appJson);
}
private Path persistAppDef(Path appDir, Service service) throws IOException {
Path appJson = new Path(appDir, service.getName() + ".json");
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) {
@ -1079,6 +1151,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
UserGroupInformation.getCurrentUser(), rpc, address);
}
@VisibleForTesting
void setFileSystem(SliderFileSystem fileSystem)
throws IOException {
this.fs = fileSystem;
}
@VisibleForTesting
void setYarnClient(YarnClient yarnClient) {
this.yarnClient = yarnClient;
}
public synchronized ApplicationId getAppId(String serviceName)
throws IOException, YarnException {
if (cachedAppInfo.containsKey(serviceName)) {

View File

@ -138,6 +138,12 @@ public class Component implements EventHandler<ComponentEvent> {
// For flex down, go to STABLE state
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
.addTransition(STABLE, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(FLEXING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.installTopology();
public Component(
@ -356,6 +362,14 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
private static class ComponentNeedsUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
}
}
public void removePendingInstance(ComponentInstance instance) {
pendingInstances.remove(instance);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.component;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -32,6 +33,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private ComponentInstance instance;
private ContainerStatus status;
private ContainerId containerId;
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
public ContainerId getContainerId() {
return containerId;
@ -91,4 +93,14 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
this.status = status;
return this;
}
public org.apache.hadoop.yarn.service.api.records.Component getTargetSpec() {
return targetSpec;
}
public ComponentEvent setTargetSpec(
org.apache.hadoop.yarn.service.api.records.Component targetSpec) {
this.targetSpec = Preconditions.checkNotNull(targetSpec);
return this;
}
}

View File

@ -23,5 +23,7 @@ public enum ComponentEventType {
CONTAINER_ALLOCATED,
CONTAINER_RECOVERED,
CONTAINER_STARTED,
CONTAINER_COMPLETED
CONTAINER_COMPLETED,
UPGRADE,
STOP_UPGRADE
}

View File

@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.service.component;
public enum ComponentState {
INIT,
FLEXING,
STABLE
STABLE,
UPGRADING
}

View File

@ -92,4 +92,6 @@ public interface YarnServiceConstants {
String CONTENT = "content";
String PRINCIPAL = "yarn.service.am.principal";
String UPGRADE_DIR = "upgrade";
}

View File

@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
public class ClientAMProtocolPBClientImpl
implements ClientAMProtocol, Closeable {
@ -88,4 +92,26 @@ public class ClientAMProtocolPBClientImpl
RPC.stopProxy(this.proxy);
}
}
@Override
public UpgradeServiceResponseProto upgrade(
UpgradeServiceRequestProto request) throws IOException, YarnException {
try {
return proxy.upgradeService(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
@Override
public RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException {
try {
return proxy.restartService(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
}

View File

@ -25,6 +25,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import java.io.IOException;
@ -67,4 +71,24 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
throw new ServiceException(e);
}
}
@Override
public UpgradeServiceResponseProto upgradeService(RpcController controller,
UpgradeServiceRequestProto request) throws ServiceException {
try {
return real.upgrade(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
@Override
public RestartServiceResponseProto restartService(RpcController controller,
RestartServiceRequestProto request) throws ServiceException {
try {
return real.restart(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
}

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -112,10 +111,38 @@ public class CoreFileSystem {
public Path buildClusterDirPath(String clustername) {
Preconditions.checkNotNull(clustername);
Path path = getBaseApplicationPath();
return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername);
return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/"
+ clustername);
}
/**
* Build up the upgrade path string for a cluster. No attempt to
* create the directory is made.
*
* @param clusterName name of the cluster
* @param version version of the cluster
* @return the upgrade path to the cluster
*/
public Path buildClusterUpgradeDirPath(String clusterName, String version) {
Preconditions.checkNotNull(clusterName);
Preconditions.checkNotNull(version);
return new Path(buildClusterDirPath(clusterName),
YarnServiceConstants.UPGRADE_DIR + "/" + version);
}
/**
* Delete the upgrade cluster directory.
* @param clusterName name of the cluster
* @param version version of the cluster
* @throws IOException
*/
public void deleteClusterUpgradeDir(String clusterName, String version)
throws IOException {
Preconditions.checkNotNull(clusterName);
Preconditions.checkNotNull(version);
Path upgradeCluster = buildClusterUpgradeDirPath(clusterName, version);
fileSystem.delete(upgradeCluster, true);
}
/**
* Build up the path string for keytab install location -no attempt to
* create the directory is made

View File

@ -22,9 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
@ -275,6 +276,14 @@ public class ServiceApiUtil {
return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
}
public static Service loadServiceUpgrade(SliderFileSystem fs,
String serviceName, String version) throws IOException {
Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
Path versionedDef = new Path(versionPath, serviceName + ".json");
LOG.info("Loading service definition from {}", versionedDef);
return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
}
public static Service loadServiceFrom(SliderFileSystem fs,
Path appDefPath) throws IOException {
LOG.info("Loading service definition from " + appDefPath);
@ -429,6 +438,23 @@ public class ServiceApiUtil {
return sortByDependencies(components, sortedComponents);
}
public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
Service service)
throws IOException, SliderException {
FsPermission appDirPermission = new FsPermission("750");
fs.createWithPermissions(appDir, appDirPermission);
Path appJson = writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service {} version {} at {}", service.getName(),
service.getVersion(), appJson);
}
public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
Service service) throws IOException {
Path appJson = new Path(appDir, service.getName() + ".json");
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
public static String $(String s) {
return "${" + s +"}";
}

View File

@ -26,6 +26,10 @@ service ClientAMProtocolService {
rpc flexComponents(FlexComponentsRequestProto) returns (FlexComponentsResponseProto);
rpc getStatus(GetStatusRequestProto) returns (GetStatusResponseProto);
rpc stop(StopRequestProto) returns (StopResponseProto);
rpc upgradeService(UpgradeServiceRequestProto)
returns (UpgradeServiceResponseProto);
rpc restartService(RestartServiceRequestProto)
returns (RestartServiceResponseProto);
}
message FlexComponentsRequestProto {
@ -37,7 +41,7 @@ message ComponentCountProto {
optional int64 numberOfContainers = 2;
}
message FlexComponentsResponseProto{
message FlexComponentsResponseProto {
}
message GetStatusRequestProto {
@ -53,4 +57,17 @@ message StopRequestProto {
message StopResponseProto {
}
message UpgradeServiceRequestProto {
optional string version = 1;
}
message UpgradeServiceResponseProto {
}
message RestartServiceRequestProto {
}
message RestartServiceResponseProto {
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service;
import com.google.common.base.Throwables;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
@ -26,18 +27,23 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,6 +53,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Map;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
@ -78,7 +86,7 @@ public class ServiceTestUtils {
// Example service definition
// 2 components, each of which has 2 containers.
protected Service createExampleApplication() {
public static Service createExampleApplication() {
Service exampleApp = new Service();
exampleApp.setName("example-app");
exampleApp.setVersion("v1");
@ -176,7 +184,7 @@ public class ServiceTestUtils {
zkCluster.start();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString());
fs = FileSystem.get(conf);
basedir = new File("target", "apps");
@ -268,4 +276,78 @@ public class ServiceTestUtils {
}
}
/**
* Creates a {@link ServiceClient} for test purposes.
*/
public static ServiceClient createClient(Configuration conf)
throws Exception {
ServiceClient client = new ServiceClient() {
@Override
protected Path addJarResource(String appName,
Map<String, LocalResource> localResources)
throws IOException, SliderException {
// do nothing, the Unit test will use local jars
return null;
}
};
client.init(conf);
client.start();
return client;
}
/**
* Watcher to initialize yarn service base path under target and deletes the
* the test directory when finishes.
*/
public static class ServiceFSWatcher extends TestWatcher {
private YarnConfiguration conf;
private SliderFileSystem fs;
private java.nio.file.Path serviceBasePath;
@Override
protected void starting(Description description) {
conf = new YarnConfiguration();
delete(description);
serviceBasePath = Paths.get("target",
description.getClassName(), description.getMethodName());
conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
try {
fs = new SliderFileSystem(conf);
} catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
protected void finished(Description description) {
delete(description);
}
private void delete(Description description) {
FileUtils.deleteQuietly(Paths.get("target",
description.getClassName()).toFile());
}
/**
* Returns the yarn conf.
*/
public YarnConfiguration getConf() {
return conf;
}
/**
* Returns the file system.
*/
public SliderFileSystem getFs() {
return fs;
}
/**
* Returns the test service base path.
*/
public java.nio.file.Path getServiceBasePath() {
return serviceBasePath;
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* Tests for {@link UpgradeComponentsFinder.DefaultUpgradeComponentsFinder}.
*/
public class TestDefaultUpgradeComponentsFinder {
private UpgradeComponentsFinder.DefaultUpgradeComponentsFinder finder =
new UpgradeComponentsFinder.DefaultUpgradeComponentsFinder();
@Test
public void testServiceArtifactChange() {
Service currentDef = ServiceTestUtils.createExampleApplication();
Service targetDef = ServiceTestUtils.createExampleApplication();
targetDef.getComponents().forEach(x -> x.setArtifact(
TestServiceManager.createTestArtifact("v1")));
Assert.assertEquals("all components need upgrade",
targetDef.getComponents(), finder.findTargetComponentSpecs(currentDef,
targetDef));
}
@Test
public void testComponentArtifactChange() {
Service currentDef = TestServiceManager.createBaseDef("test");
Service targetDef = TestServiceManager.createBaseDef("test");
targetDef.getComponents().get(0).setArtifact(
TestServiceManager.createTestArtifact("v2"));
List<Component> expected = new ArrayList<>();
expected.add(targetDef.getComponents().get(0));
Assert.assertEquals("single components needs upgrade",
expected, finder.findTargetComponentSpecs(currentDef,
targetDef));
}
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link ServiceManager}.
*/
public class TestServiceManager {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testUpgrade() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testUpgrade");
upgrade(serviceManager, "v2", false);
Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartNothingToUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", false);
//make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp -> {
comp.setState(ComponentState.STABLE);
});
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service not re-started", ServiceState.STABLE,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartWithPendingUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", true);
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service should still be upgrading",
ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
}
private void upgrade(ServiceManager service, String version,
boolean upgradeArtifact)
throws IOException, SliderException {
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(service.getName());
upgradedDef.setVersion(version);
if (upgradeArtifact) {
Artifact upgradedArtifact = createTestArtifact("2");
upgradedDef.getComponents().forEach(component -> {
component.setArtifact(upgradedArtifact);
});
}
writeUpgradedDef(upgradedDef);
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
upgradeEvent.setVersion("v2");
service.handle(upgradeEvent);
}
private ServiceManager createTestServiceManager(String name)
throws IOException {
ServiceContext context = new ServiceContext();
context.service = createBaseDef(name);
context.fs = rule.getFs();
context.scheduler = new ServiceScheduler(context) {
@Override
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
};
context.scheduler.init(rule.getConf());
Map<String, org.apache.hadoop.yarn.service.component.Component>
componentState = context.scheduler.getAllComponents();
context.service.getComponents().forEach(component -> {
componentState.put(component.getName(),
new org.apache.hadoop.yarn.service.component.Component(component,
1L, context));
});
return new ServiceManager(context);
}
static Service createBaseDef(String name) {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
Service serviceDef = ServiceTestUtils.createExampleApplication();
serviceDef.setId(applicationId.toString());
serviceDef.setName(name);
serviceDef.setState(ServiceState.STARTED);
Artifact artifact = createTestArtifact("1");
serviceDef.getComponents().forEach(component ->
component.setArtifact(artifact));
return serviceDef;
}
static Artifact createTestArtifact(String artifactId) {
Artifact artifact = new Artifact();
artifact.setId(artifactId);
artifact.setType(Artifact.TypeEnum.TARBALL);
return artifact;
}
private void writeUpgradedDef(Service upgradedDef)
throws IOException, SliderException {
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
upgradedDef.getName(), upgradedDef.getVersion());
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
upgradedDef);
}
}

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.hamcrest.CoreMatchers;
import org.junit.After;
@ -86,7 +86,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
@Test (timeout = 200000)
public void testCreateFlexStopDestroyService() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient();
ServiceClient client = createClient(getConf());
Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
SliderFileSystem fileSystem = new SliderFileSystem(getConf());
@ -143,7 +143,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
@Test (timeout = 200000)
public void testComponentStartOrder() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient();
ServiceClient client = createClient(getConf());
Service exampleApp = new Service();
exampleApp.setName("teststartorder");
exampleApp.setVersion("v1");
@ -177,7 +177,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
String userB = "userb";
setupInternal(NUM_NMS);
ServiceClient client = createClient();
ServiceClient client = createClient(getConf());
String origBasePath = getConf().get(YARN_SERVICE_BASE_PATH);
Service userAApp = new Service();
@ -229,7 +229,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
System.setProperty("user.name", user);
setupInternal(NUM_NMS);
ServiceClient client = createClient();
ServiceClient client = createClient(getConf());
Service appA = new Service();
appA.setName(sameAppName);
@ -298,7 +298,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
setConf(conf);
setupInternal(NUM_NMS);
ServiceClient client = createClient();
ServiceClient client = createClient(getConf());
Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
Multimap<String, String> containersBeforeFailure =
@ -341,6 +341,28 @@ public class TestYarnNativeServices extends ServiceTestUtils {
client.actionDestroy(exampleApp.getName());
}
@Test(timeout = 200000)
public void testUpgradeService() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient(getConf());
Service service = createExampleApplication();
client.actionCreate(service);
waitForServiceToBeStarted(client, service);
//upgrade the service
service.setVersion("v2");
client.actionUpgrade(service);
//wait for service to be in upgrade state
waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
SliderFileSystem fs = new SliderFileSystem(getConf());
Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs,
service.getName(), service.getVersion());
Assert.assertEquals(service.getName(), fromFs.getName());
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
}
// Check containers launched are in dependency order
// Get all containers into a list and sort based on container launch time e.g.
// compa-c1, compa-c2, compb-c1, compb-c2;
@ -478,16 +500,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
*/
private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == ServiceState.STABLE;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE);
}
/**
@ -500,11 +513,25 @@ public class TestYarnNativeServices extends ServiceTestUtils {
*/
private void waitForServiceToBeStarted(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState) throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == ServiceState.STARTED;
return retrievedApp.getState() == desiredState;
} catch (Exception e) {
e.printStackTrace();
return false;
@ -512,21 +539,6 @@ public class TestYarnNativeServices extends ServiceTestUtils {
}, 2000, 200000);
}
private ServiceClient createClient() throws Exception {
ServiceClient client = new ServiceClient() {
@Override protected Path addJarResource(String appName,
Map<String, LocalResource> localResources)
throws IOException, SliderException {
// do nothing, the Unit test will use local jars
return null;
}
};
client.init(getConf());
client.start();
return client;
}
private int countTotalContainers(Service service) {
int totalContainers = 0;
for (Component component : service.getComponents()) {

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link ServiceClient}.
*/
public class TestServiceClient {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testActionUpgrade() throws Exception {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
ServiceClient client = createServiceClient(applicationId);
Service service = ServiceTestUtils.createExampleApplication();
service.setVersion("v1");
client.actionCreate(service);
//upgrade the service
service.setVersion("v2");
client.actionUpgrade(service);
//wait for service to be in upgrade state
Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(),
service.getName(), service.getVersion());
Assert.assertEquals(service.getName(), fromFs.getName());
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
}
private ServiceClient createServiceClient(ApplicationId applicationId)
throws Exception {
ClientAMProtocol amProxy = mock(ClientAMProtocol.class);
YarnClient yarnClient = createMockYarnClient();
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ApplicationAttemptReport attemptReport =
ApplicationAttemptReport.newInstance(attemptId, "localhost", 0,
null, null, null,
YarnApplicationAttemptState.RUNNING, null);
ApplicationReport appReport = mock(ApplicationReport.class);
when(appReport.getHost()).thenReturn("localhost");
when(yarnClient.getApplicationAttemptReport(Matchers.any()))
.thenReturn(attemptReport);
when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport);
ServiceClient client = new ServiceClient() {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
}
@Override
protected ClientAMProtocol createAMProxy(String serviceName,
ApplicationReport appReport) throws IOException, YarnException {
return amProxy;
}
@Override
ApplicationId submitApp(Service app) throws IOException, YarnException {
return applicationId;
}
};
client.setFileSystem(rule.getFs());
client.setYarnClient(yarnClient);
client.init(rule.getConf());
client.start();
return client;
}
private YarnClient createMockYarnClient() throws IOException, YarnException {
YarnClient yarnClient = mock(YarnClient.class);
when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class)))
.thenReturn(new ArrayList<>());
return yarnClient;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
/**
* Tests for {@link CoreFileSystem}.
*/
public class TestCoreFileSystem {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testClusterUpgradeDirPath() {
String serviceName = "testClusterUpgrade";
String version = "v1";
Path expectedPath = new Path(rule.getFs().buildClusterDirPath(serviceName),
YarnServiceConstants.UPGRADE_DIR + "/" + version);
Assert.assertEquals("incorrect upgrade path", expectedPath,
rule.getFs().buildClusterUpgradeDirPath(serviceName, version));
}
}

View File

@ -229,4 +229,20 @@ public abstract class AppAdminClient extends CompositeService {
@Unstable
public abstract String getStatusString(String appIdOrName) throws
IOException, YarnException;
/**
* Upgrade a long running service.
*
* @param appName the name of the application
* @param fileName specification of application upgrade to save.
*
* @return exit code
* @throws IOException IOException
* @throws YarnException exception in client or server
*/
@Public
@Unstable
public abstract int actionUpgrade(String appName, String fileName)
throws IOException, YarnException;
}