YARN-8018. Added support for initiating yarn service upgrade.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-03-26 18:46:31 -04:00
parent edb202e493
commit 27d60a1634
29 changed files with 1374 additions and 79 deletions

View File

@ -469,4 +469,22 @@ public String getStatusString(String appIdOrName) throws IOException,
return output; return output;
} }
@Override
public int actionUpgrade(String appName,
String fileName) throws IOException, YarnException {
int result;
try {
Service service =
loadAppJsonFromLocalFS(fileName, appName, null, null);
service.setState(ServiceState.UPGRADING);
String buffer = jsonSerDeser.toJson(service);
ClientResponse response = getApiClient()
.post(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
LOG.error("Failed to upgrade application: ", e);
result = EXIT_EXCEPTION_THROWN;
}
return result;
}
} }

View File

@ -375,6 +375,12 @@ public Response updateService(@Context HttpServletRequest request,
&& updateServiceData.getLifetime() > 0) { && updateServiceData.getLifetime() > 0) {
return updateLifetime(appName, updateServiceData, ugi); return updateLifetime(appName, updateServiceData, ugi);
} }
// If an UPGRADE is requested
if (updateServiceData.getState() != null &&
updateServiceData.getState() == ServiceState.UPGRADING) {
return upgradeService(updateServiceData, ugi);
}
} catch (UndeclaredThrowableException e) { } catch (UndeclaredThrowableException e) {
return formatResponse(Status.BAD_REQUEST, return formatResponse(Status.BAD_REQUEST,
e.getCause().getMessage()); e.getCause().getMessage());
@ -475,6 +481,24 @@ public Void run() throws YarnException, IOException {
return formatResponse(Status.OK, status); return formatResponse(Status.OK, status);
} }
private Response upgradeService(Service service,
final UserGroupInformation ugi) throws IOException, InterruptedException {
ServiceStatus status = new ServiceStatus();
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
sc.actionUpgrade(service);
sc.close();
return null;
});
LOG.info("Service {} version {} upgrade initialized");
status.setDiagnostics("Service " + service.getName() +
" version " + service.getVersion() + " saved.");
status.setState(ServiceState.ACCEPTED);
return formatResponse(Status.ACCEPTED, status);
}
/** /**
* Used by negative test case. * Used by negative test case.
* *

View File

@ -23,8 +23,14 @@
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import java.io.IOException; import java.io.IOException;
@ -37,4 +43,10 @@ GetStatusResponseProto getStatus(GetStatusRequestProto requestProto)
StopResponseProto stop(StopRequestProto requestProto) StopResponseProto stop(StopRequestProto requestProto)
throws IOException, YarnException; throws IOException, YarnException;
UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request)
throws IOException, YarnException;
RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException;
} }

View File

@ -33,8 +33,12 @@
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -142,4 +146,24 @@ public void run() {
public InetSocketAddress getBindAddress() { public InetSocketAddress getBindAddress() {
return bindAddress; return bindAddress;
} }
@Override
public UpgradeServiceResponseProto upgrade(
UpgradeServiceRequestProto request) throws IOException {
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE);
event.setVersion(request.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(event);
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
UserGroupInformation.getCurrentUser());
return UpgradeServiceResponseProto.newBuilder().build();
}
@Override
public RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException {
ServiceEvent event = new ServiceEvent(ServiceEventType.START);
context.scheduler.getDispatcher().getEventHandler().handle(event);
LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser());
return RestartServiceResponseProto.newBuilder().build();
}
} }

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.event.AbstractEvent;
/**
* Events are handled by {@link ServiceManager} to manage the service
* state.
*/
public class ServiceEvent extends AbstractEvent<ServiceEventType> {
private final ServiceEventType type;
private String version;
public ServiceEvent(ServiceEventType serviceEventType) {
super(serviceEventType);
this.type = serviceEventType;
}
public ServiceEventType getType() {
return type;
}
public String getVersion() {
return version;
}
public ServiceEvent setVersion(String version) {
this.version = version;
return this;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
/**
* Types of {@link ServiceEvent}.
*/
public enum ServiceEventType {
START,
UPGRADE,
STOP_UPGRADE
}

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Manages the state of the service.
*/
public class ServiceManager implements EventHandler<ServiceEvent> {
private static final Logger LOG = LoggerFactory.getLogger(
ServiceManager.class);
private final Service serviceSpec;
private final ServiceContext context;
private final ServiceScheduler scheduler;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
private final StateMachine<State, ServiceEventType, ServiceEvent>
stateMachine;
private final AsyncDispatcher dispatcher;
private final SliderFileSystem fs;
private final UpgradeComponentsFinder componentsFinder;
private String upgradeVersion;
private static final StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
new StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent>(State.STABLE)
.addTransition(State.STABLE, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.UPGRADE,
new StartUpgradeTransition())
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.START,
new StopUpgradeTransition())
.installTopology();
public ServiceManager(ServiceContext context) {
Preconditions.checkNotNull(context);
this.context = context;
serviceSpec = context.service;
scheduler = context.scheduler;
stateMachine = STATE_MACHINE_FACTORY.make(this);
dispatcher = scheduler.getDispatcher();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
fs = context.fs;
componentsFinder = new UpgradeComponentsFinder
.DefaultUpgradeComponentsFinder();
}
@Override
public void handle(ServiceEvent event) {
try {
writeLock.lock();
State oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error(MessageFormat.format(
"[SERVICE]: Invalid event {0} at {1}.", event.getType(),
oldState), e);
}
if (oldState != getState()) {
LOG.info("[SERVICE] Transitioned from {} to {} on {} event.",
oldState, getState(), event.getType());
}
} finally {
writeLock.unlock();
}
}
private State getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
private static class StartUpgradeTransition implements
MultipleArcTransition<ServiceManager, ServiceEvent, State> {
@Override
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
try {
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
serviceManager.fs, serviceManager.getName(), event.getVersion());
serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
List<org.apache.hadoop.yarn.service.api.records.Component>
compsThatNeedUpgrade = serviceManager.componentsFinder.
findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec);
if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
compsThatNeedUpgrade.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).
setTargetSpec(component);
serviceManager.dispatcher.getEventHandler().handle(
needUpgradeEvent);
});
}
serviceManager.upgradeVersion = event.getVersion();
return State.UPGRADING;
} catch (Throwable e) {
LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
e);
return State.STABLE;
}
}
}
private static class StopUpgradeTransition implements
MultipleArcTransition<ServiceManager, ServiceEvent, State> {
@Override
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
//abort is not supported currently
//trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler,
true);
if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) {
return serviceManager.finalizeUpgrade() ? State.STABLE :
State.UPGRADING;
} else {
return State.UPGRADING;
}
}
}
/**
* @return whether finalization of upgrade was successful.
*/
private boolean finalizeUpgrade() {
try {
Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade(
fs, getName(), upgradeVersion);
ServiceApiUtil.writeAppDefinition(fs,
ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec);
} catch (IOException e) {
LOG.error("Upgrade did not complete because unable to overwrite the" +
" service definition", e);
return false;
}
try {
fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
} catch (IOException e) {
LOG.warn("Unable to delete upgrade definition for service {} " +
"version {}", getName(), upgradeVersion);
}
serviceSpec.setVersion(upgradeVersion);
upgradeVersion = null;
return true;
}
/**
* Returns the name of the service.
*/
public String getName() {
return serviceSpec.getName();
}
/**
* State of {@link ServiceManager}.
*/
public enum State {
STABLE, UPGRADING
}
@VisibleForTesting
Service getServiceSpec() {
return serviceSpec;
}
}

View File

@ -110,6 +110,9 @@ public class ServiceScheduler extends CompositeService {
LoggerFactory.getLogger(ServiceScheduler.class); LoggerFactory.getLogger(ServiceScheduler.class);
private Service app; private Service app;
// This encapsulates the <code>app</code> with methods to upgrade the app.
private ServiceManager serviceManager;
// component_name -> component // component_name -> component
private final Map<String, Component> componentsByName = private final Map<String, Component> componentsByName =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
@ -192,6 +195,7 @@ public void buildInstance(ServiceContext context, Configuration configuration)
addIfService(nmClient); addIfService(nmClient);
dispatcher = new AsyncDispatcher("Component dispatcher"); dispatcher = new AsyncDispatcher("Component dispatcher");
dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
dispatcher.register(ComponentEventType.class, dispatcher.register(ComponentEventType.class,
new ComponentEventHandler()); new ComponentEventHandler());
dispatcher.register(ComponentInstanceEventType.class, dispatcher.register(ComponentInstanceEventType.class,
@ -300,6 +304,7 @@ public void serviceStart() throws Exception {
// Since AM has been started and registered, the service is in STARTED state // Since AM has been started and registered, the service is in STARTED state
app.setState(ServiceState.STARTED); app.setState(ServiceState.STARTED);
serviceManager = new ServiceManager(context);
// recover components based on containers sent from RM // recover components based on containers sent from RM
recoverComponents(response); recoverComponents(response);
@ -510,6 +515,20 @@ private void createAllComponents() {
} }
} }
private final class ServiceEventHandler
implements EventHandler<ServiceEvent> {
@Override
public void handle(ServiceEvent event) {
try {
serviceManager.handle(event);
} catch (Throwable t) {
LOG.error(MessageFormat
.format("[SERVICE]: Error in handling event type {0}",
event.getType()), t);
}
}
}
private final class ComponentEventHandler private final class ComponentEventHandler
implements EventHandler<ComponentEvent> { implements EventHandler<ComponentEvent> {
@Override @Override

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Finds all the the target component specs.
*/
public interface UpgradeComponentsFinder {
List<Component> findTargetComponentSpecs(Service currentDef,
Service targetDef);
/**
* Default implementation of {@link UpgradeComponentsFinder} that finds all
* the target component specs.
*/
class DefaultUpgradeComponentsFinder implements UpgradeComponentsFinder {
@Override
public List<Component> findTargetComponentSpecs(Service currentDef,
Service targetDef) {
if (currentDef.getComponents().size() !=
targetDef.getComponents().size()) {
throw new UnsupportedOperationException(
"addition/deletion of components not supported by upgrade");
}
if (!currentDef.getKerberosPrincipal().equals(
targetDef.getKerberosPrincipal())) {
throw new UnsupportedOperationException("changes to kerberos " +
"principal not supported by upgrade");
}
if (!Objects.equals(currentDef.getQueue(), targetDef.getQueue())) {
throw new UnsupportedOperationException("changes to queue " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getPlacementPolicy(),
targetDef.getPlacementPolicy())) {
throw new UnsupportedOperationException("changes to placement policy " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) {
throw new UnsupportedOperationException("changes to resource " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getDescription(),
targetDef.getDescription())) {
throw new UnsupportedOperationException("changes to description " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getQuicklinks(),
targetDef.getQuicklinks())) {
throw new UnsupportedOperationException("changes to quick links " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getLaunchTime(),
targetDef.getLaunchTime())) {
throw new UnsupportedOperationException("changes to launch time " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getLifetime(),
targetDef.getLifetime())) {
throw new UnsupportedOperationException("changes to lifetime " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getConfiguration(),
currentDef.getConfiguration())) {
return targetDef.getComponents();
}
if (!Objects.equals(currentDef.getArtifact(), targetDef.getArtifact())) {
return targetDef.getComponents();
}
List<Component> targetComps = new ArrayList<>();
targetDef.getComponents().forEach(component -> {
Component currentComp = currentDef.getComponent(component.getName());
if (!Objects.equals(currentComp.getName(), component.getName())) {
throw new UnsupportedOperationException(
"changes to component name not supported by upgrade");
}
if (!Objects.equals(currentComp.getDependencies(),
component.getDependencies())) {
throw new UnsupportedOperationException(
"changes to component dependencies not supported by upgrade");
}
if (!Objects.equals(currentComp.getReadinessCheck(),
component.getReadinessCheck())) {
throw new UnsupportedOperationException(
"changes to component readiness check not supported by upgrade");
}
if (!Objects.equals(currentComp.getResource(),
component.getResource())) {
throw new UnsupportedOperationException(
"changes to component resource not supported by upgrade");
}
if (!Objects.equals(currentComp.getRunPrivilegedContainer(),
component.getRunPrivilegedContainer())) {
throw new UnsupportedOperationException(
"changes to run privileged container not supported by upgrade");
}
if (!Objects.equals(currentComp.getPlacementPolicy(),
component.getPlacementPolicy())) {
throw new UnsupportedOperationException(
"changes to component placement policy not supported by upgrade");
}
if (!Objects.equals(currentComp.getQuicklinks(),
component.getQuicklinks())) {
throw new UnsupportedOperationException(
"changes to component quick links not supported by upgrade");
}
if (!Objects.equals(currentComp.getArtifact(),
component.getArtifact()) ||
!Objects.equals(currentComp.getLaunchCommand(),
component.getLaunchCommand()) ||
!Objects.equals(currentComp.getConfiguration(),
component.getConfiguration())) {
targetComps.add(component);
}
});
return targetComps;
}
}
}

View File

@ -26,5 +26,5 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ApiModel(description = "The current state of a component.") @ApiModel(description = "The current state of a component.")
public enum ComponentState { public enum ComponentState {
FLEXING, STABLE FLEXING, STABLE, NEEDS_UPGRADE;
} }

View File

@ -29,5 +29,5 @@
@ApiModel(description = "The current state of an service.") @ApiModel(description = "The current state of an service.")
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
public enum ServiceState { public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX; ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.client; package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
@ -55,7 +56,9 @@
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
@ -73,8 +76,8 @@
import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ZookeeperUtils; import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
@ -186,6 +189,7 @@ public Service loadAppJsonFromLocalFS(String fileName, String serviceName,
return service; return service;
} }
@Override
public int actionSave(String fileName, String serviceName, Long lifetime, public int actionSave(String fileName, String serviceName, Long lifetime,
String queue) throws IOException, YarnException { String queue) throws IOException, YarnException {
return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName, return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName,
@ -194,9 +198,54 @@ public int actionSave(String fileName, String serviceName, Long lifetime,
public int actionBuild(Service service) public int actionBuild(Service service)
throws YarnException, IOException { throws YarnException, IOException {
Path appDir = checkAppNotExistOnHdfs(service);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
createDirAndPersistApp(appDir, service); Path appDir = checkAppNotExistOnHdfs(service, false);
ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
return EXIT_SUCCESS;
}
@Override
public int actionUpgrade(String appName, String fileName)
throws IOException, YarnException {
checkAppExistOnHdfs(appName);
Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
null, null);
return actionUpgrade(upgradeService);
}
public int actionUpgrade(Service service) throws YarnException, IOException {
Service persistedService =
ServiceApiUtil.loadService(fs, service.getName());
if (!StringUtils.isEmpty(persistedService.getId())) {
cachedAppInfo.put(persistedService.getName(), new AppInfo(
ApplicationId.fromString(persistedService.getId()),
persistedService.getKerberosPrincipal().getPrincipalName()));
}
if (persistedService.getVersion().equals(service.getVersion())) {
String message =
service.getName() + " is already at version " + service.getVersion()
+ ". There is nothing to upgrade.";
LOG.error(message);
throw new YarnException(message);
}
Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(service.getName()));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(service.getName() + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
UpgradeServiceRequestProto.Builder requestBuilder =
UpgradeServiceRequestProto.newBuilder();
requestBuilder.setVersion(service.getVersion());
proxy.upgrade(requestBuilder.build());
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
@ -212,16 +261,16 @@ public ApplicationId actionCreate(Service service)
String serviceName = service.getName(); String serviceName = service.getName();
ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
verifyNoLiveAppInRM(serviceName, "create"); verifyNoLiveAppInRM(serviceName, "create");
Path appDir = checkAppNotExistOnHdfs(service); Path appDir = checkAppNotExistOnHdfs(service, false);
// Write the definition first and then submit - AM will read the definition // Write the definition first and then submit - AM will read the definition
createDirAndPersistApp(appDir, service); ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
ApplicationId appId = submitApp(service); ApplicationId appId = submitApp(service);
cachedAppInfo.put(serviceName, new AppInfo(appId, service cachedAppInfo.put(serviceName, new AppInfo(appId, service
.getKerberosPrincipal().getPrincipalName())); .getKerberosPrincipal().getPrincipalName()));
service.setId(appId.toString()); service.setId(appId.toString());
// update app definition with appId // update app definition with appId
persistAppDef(appDir, service); ServiceApiUtil.writeAppDefinition(fs, appDir, service);
return appId; return appId;
} }
@ -349,6 +398,7 @@ private Map<String, Long> flexComponents(String serviceName,
return original; return original;
} }
@Override
public int actionStop(String serviceName) public int actionStop(String serviceName)
throws YarnException, IOException { throws YarnException, IOException {
return actionStop(serviceName, true); return actionStop(serviceName, true);
@ -424,6 +474,7 @@ public int actionStop(String serviceName, boolean waitForAppStopped)
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
@Override
public int actionDestroy(String serviceName) throws YarnException, public int actionDestroy(String serviceName) throws YarnException,
IOException { IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig()); ServiceApiUtil.validateNameFormat(serviceName, getConfig());
@ -557,8 +608,8 @@ private void verifyNoLiveAppInRM(String serviceName, String action)
} }
} }
private ApplicationId submitApp(Service app) @VisibleForTesting
throws IOException, YarnException { ApplicationId submitApp(Service app) throws IOException, YarnException {
String serviceName = app.getName(); String serviceName = app.getName();
Configuration conf = getConfig(); Configuration conf = getConfig();
Path appRootDir = fs.buildClusterDirPath(app.getName()); Path appRootDir = fs.buildClusterDirPath(app.getName());
@ -772,29 +823,64 @@ private boolean addAMLog4jResource(String serviceName, Configuration conf,
return hasAMLog4j; return hasAMLog4j;
} }
@Override
public int actionStart(String serviceName) throws YarnException, IOException { public int actionStart(String serviceName) throws YarnException, IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig()); ServiceApiUtil.validateNameFormat(serviceName, getConfig());
Path appDir = checkAppExistOnHdfs(serviceName); Service liveService = getStatus(serviceName);
Service service = ServiceApiUtil.loadService(fs, serviceName); if (liveService == null ||
ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); !liveService.getState().equals(ServiceState.UPGRADING)) {
// see if it is actually running and bail out; Path appDir = checkAppExistOnHdfs(serviceName);
verifyNoLiveAppInRM(serviceName, "start"); Service service = ServiceApiUtil.loadService(fs, serviceName);
ApplicationId appId = submitApp(service); ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
service.setId(appId.toString()); // see if it is actually running and bail out;
// write app definition on to hdfs verifyNoLiveAppInRM(serviceName, "start");
Path appJson = persistAppDef(appDir, service); ApplicationId appId = submitApp(service);
LOG.info("Persisted service " + service.getName() + " at " + appJson); service.setId(appId.toString());
return 0; // write app definition on to hdfs
Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service " + service.getName() + " at " + appJson);
return 0;
} else {
LOG.info("Finalize service {} upgrade");
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(serviceName));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(serviceName + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(serviceName, appReport);
RestartServiceRequestProto.Builder requestBuilder =
RestartServiceRequestProto.newBuilder();
proxy.restart(requestBuilder.build());
return 0;
}
} }
private Path checkAppNotExistOnHdfs(Service service) /**
* Verifies that the service definition does not exist on hdfs.
*
* @param service service
* @param isUpgrade true for upgrades; false otherwise
* @return path to the service definition..
* @throws IOException
* @throws SliderException
*/
private Path checkAppNotExistOnHdfs(Service service, boolean isUpgrade)
throws IOException, SliderException { throws IOException, SliderException {
Path appDir = fs.buildClusterDirPath(service.getName()); Path appDir = !isUpgrade ? fs.buildClusterDirPath(service.getName()) :
fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion());
fs.verifyDirectoryNonexistent( fs.verifyDirectoryNonexistent(
new Path(appDir, service.getName() + ".json")); new Path(appDir, service.getName() + ".json"));
return appDir; return appDir;
} }
/**
* Verifies that the service exists on hdfs.
* @param serviceName service name
* @return path to the service definition.
* @throws IOException
* @throws SliderException
*/
private Path checkAppExistOnHdfs(String serviceName) private Path checkAppExistOnHdfs(String serviceName)
throws IOException, SliderException { throws IOException, SliderException {
Path appDir = fs.buildClusterDirPath(serviceName); Path appDir = fs.buildClusterDirPath(serviceName);
@ -802,20 +888,6 @@ private Path checkAppExistOnHdfs(String serviceName)
return appDir; return appDir;
} }
private void createDirAndPersistApp(Path appDir, Service service)
throws IOException, SliderException {
FsPermission appDirPermission = new FsPermission("750");
fs.createWithPermissions(appDir, appDirPermission);
Path appJson = persistAppDef(appDir, service);
LOG.info("Persisted service " + service.getName() + " at " + appJson);
}
private Path persistAppDef(Path appDir, Service service) throws IOException {
Path appJson = new Path(appDir, service.getName() + ".json");
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext) private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
throws IOException { throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) { if (!UserGroupInformation.isSecurityEnabled()) {
@ -1074,6 +1146,17 @@ protected ClientAMProtocol createAMProxy(String serviceName,
UserGroupInformation.getCurrentUser(), rpc, address); UserGroupInformation.getCurrentUser(), rpc, address);
} }
@VisibleForTesting
void setFileSystem(SliderFileSystem fileSystem)
throws IOException {
this.fs = fileSystem;
}
@VisibleForTesting
void setYarnClient(YarnClient yarnClient) {
this.yarnClient = yarnClient;
}
public synchronized ApplicationId getAppId(String serviceName) public synchronized ApplicationId getAppId(String serviceName)
throws IOException, YarnException { throws IOException, YarnException {
if (cachedAppInfo.containsKey(serviceName)) { if (cachedAppInfo.containsKey(serviceName)) {

View File

@ -138,6 +138,12 @@ CONTAINER_STARTED, new ContainerStartedTransition())
// For flex down, go to STABLE state // For flex down, go to STABLE state
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING), .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition()) FLEX, new FlexComponentTransition())
.addTransition(STABLE, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(FLEXING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.installTopology(); .installTopology();
public Component( public Component(
@ -355,6 +361,14 @@ public void transition(Component component, ComponentEvent event) {
} }
} }
private static class ComponentNeedsUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
}
}
public void removePendingInstance(ComponentInstance instance) { public void removePendingInstance(ComponentInstance instance) {
pendingInstances.remove(instance); pendingInstances.remove(instance);
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.component; package org.apache.hadoop.yarn.service.component;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -32,6 +33,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private ComponentInstance instance; private ComponentInstance instance;
private ContainerStatus status; private ContainerStatus status;
private ContainerId containerId; private ContainerId containerId;
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
public ContainerId getContainerId() { public ContainerId getContainerId() {
return containerId; return containerId;
@ -91,4 +93,14 @@ public ComponentEvent setStatus(ContainerStatus status) {
this.status = status; this.status = status;
return this; return this;
} }
public org.apache.hadoop.yarn.service.api.records.Component getTargetSpec() {
return targetSpec;
}
public ComponentEvent setTargetSpec(
org.apache.hadoop.yarn.service.api.records.Component targetSpec) {
this.targetSpec = Preconditions.checkNotNull(targetSpec);
return this;
}
} }

View File

@ -23,5 +23,7 @@ public enum ComponentEventType {
CONTAINER_ALLOCATED, CONTAINER_ALLOCATED,
CONTAINER_RECOVERED, CONTAINER_RECOVERED,
CONTAINER_STARTED, CONTAINER_STARTED,
CONTAINER_COMPLETED CONTAINER_COMPLETED,
UPGRADE,
STOP_UPGRADE
} }

View File

@ -21,5 +21,6 @@
public enum ComponentState { public enum ComponentState {
INIT, INIT,
FLEXING, FLEXING,
STABLE STABLE,
UPGRADING
} }

View File

@ -92,4 +92,6 @@ public interface YarnServiceConstants {
String CONTENT = "content"; String CONTENT = "content";
String PRINCIPAL = "yarn.service.am.principal"; String PRINCIPAL = "yarn.service.am.principal";
String UPGRADE_DIR = "upgrade";
} }

View File

@ -35,8 +35,12 @@
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB; import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
public class ClientAMProtocolPBClientImpl public class ClientAMProtocolPBClientImpl
implements ClientAMProtocol, Closeable { implements ClientAMProtocol, Closeable {
@ -88,4 +92,26 @@ public StopResponseProto stop(StopRequestProto requestProto)
RPC.stopProxy(this.proxy); RPC.stopProxy(this.proxy);
} }
} }
@Override
public UpgradeServiceResponseProto upgrade(
UpgradeServiceRequestProto request) throws IOException, YarnException {
try {
return proxy.upgradeService(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
@Override
public RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException {
try {
return proxy.restartService(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
} }

View File

@ -25,6 +25,10 @@
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ClientAMProtocol;
import java.io.IOException; import java.io.IOException;
@ -67,4 +71,24 @@ public org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto stop(
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public UpgradeServiceResponseProto upgradeService(RpcController controller,
UpgradeServiceRequestProto request) throws ServiceException {
try {
return real.upgrade(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
@Override
public RestartServiceResponseProto restartService(RpcController controller,
RestartServiceRequestProto request) throws ServiceException {
try {
return real.restart(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -112,10 +111,38 @@ public String toString() {
public Path buildClusterDirPath(String clustername) { public Path buildClusterDirPath(String clustername) {
Preconditions.checkNotNull(clustername); Preconditions.checkNotNull(clustername);
Path path = getBaseApplicationPath(); Path path = getBaseApplicationPath();
return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername); return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/"
+ clustername);
} }
/**
* Build up the upgrade path string for a cluster. No attempt to
* create the directory is made.
*
* @param clusterName name of the cluster
* @param version version of the cluster
* @return the upgrade path to the cluster
*/
public Path buildClusterUpgradeDirPath(String clusterName, String version) {
Preconditions.checkNotNull(clusterName);
Preconditions.checkNotNull(version);
return new Path(buildClusterDirPath(clusterName),
YarnServiceConstants.UPGRADE_DIR + "/" + version);
}
/**
* Delete the upgrade cluster directory.
* @param clusterName name of the cluster
* @param version version of the cluster
* @throws IOException
*/
public void deleteClusterUpgradeDir(String clusterName, String version)
throws IOException {
Preconditions.checkNotNull(clusterName);
Preconditions.checkNotNull(version);
Path upgradeCluster = buildClusterUpgradeDirPath(clusterName, version);
fileSystem.delete(upgradeCluster, true);
}
/** /**
* Build up the path string for keytab install location -no attempt to * Build up the path string for keytab install location -no attempt to
* create the directory is made * create the directory is made

View File

@ -22,9 +22,9 @@
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory; import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
@ -275,6 +276,14 @@ public static Service loadService(SliderFileSystem fs, String
return jsonSerDeser.load(fs.getFileSystem(), serviceJson); return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
} }
public static Service loadServiceUpgrade(SliderFileSystem fs,
String serviceName, String version) throws IOException {
Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
Path versionedDef = new Path(versionPath, serviceName + ".json");
LOG.info("Loading service definition from {}", versionedDef);
return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
}
public static Service loadServiceFrom(SliderFileSystem fs, public static Service loadServiceFrom(SliderFileSystem fs,
Path appDefPath) throws IOException { Path appDefPath) throws IOException {
LOG.info("Loading service definition from " + appDefPath); LOG.info("Loading service definition from " + appDefPath);
@ -429,6 +438,23 @@ private static Map<String, Component> sortByDependencies(List<Component>
return sortByDependencies(components, sortedComponents); return sortByDependencies(components, sortedComponents);
} }
public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
Service service)
throws IOException, SliderException {
FsPermission appDirPermission = new FsPermission("750");
fs.createWithPermissions(appDir, appDirPermission);
Path appJson = writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service {} version {} at {}", service.getName(),
service.getVersion(), appJson);
}
public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
Service service) throws IOException {
Path appJson = new Path(appDir, service.getName() + ".json");
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
public static String $(String s) { public static String $(String s) {
return "${" + s +"}"; return "${" + s +"}";
} }

View File

@ -26,6 +26,10 @@ service ClientAMProtocolService {
rpc flexComponents(FlexComponentsRequestProto) returns (FlexComponentsResponseProto); rpc flexComponents(FlexComponentsRequestProto) returns (FlexComponentsResponseProto);
rpc getStatus(GetStatusRequestProto) returns (GetStatusResponseProto); rpc getStatus(GetStatusRequestProto) returns (GetStatusResponseProto);
rpc stop(StopRequestProto) returns (StopResponseProto); rpc stop(StopRequestProto) returns (StopResponseProto);
rpc upgradeService(UpgradeServiceRequestProto)
returns (UpgradeServiceResponseProto);
rpc restartService(RestartServiceRequestProto)
returns (RestartServiceResponseProto);
} }
message FlexComponentsRequestProto { message FlexComponentsRequestProto {
@ -37,7 +41,7 @@ message ComponentCountProto {
optional int64 numberOfContainers = 2; optional int64 numberOfContainers = 2;
} }
message FlexComponentsResponseProto{ message FlexComponentsResponseProto {
} }
message GetStatusRequestProto { message GetStatusRequestProto {
@ -54,3 +58,16 @@ message StopRequestProto {
message StopResponseProto { message StopResponseProto {
} }
message UpgradeServiceRequestProto {
optional string version = 1;
}
message UpgradeServiceResponseProto {
}
message RestartServiceRequestProto {
}
message RestartServiceResponseProto {
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service; package org.apache.hadoop.yarn.service;
import com.google.common.base.Throwables;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -26,18 +27,23 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.codehaus.jackson.map.PropertyNamingStrategy; import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,6 +53,8 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URL; import java.net.URL;
import java.nio.file.Paths;
import java.util.Map;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
@ -78,7 +86,7 @@ public class ServiceTestUtils {
// Example service definition // Example service definition
// 2 components, each of which has 2 containers. // 2 components, each of which has 2 containers.
protected Service createExampleApplication() { public static Service createExampleApplication() {
Service exampleApp = new Service(); Service exampleApp = new Service();
exampleApp.setName("example-app"); exampleApp.setName("example-app");
exampleApp.setVersion("v1"); exampleApp.setVersion("v1");
@ -176,7 +184,7 @@ protected void setupInternal(int numNodeManager)
zkCluster.start(); zkCluster.start();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString()); conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString()); LOG.info("ZK cluster: " + zkCluster.getConnectString());
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
basedir = new File("target", "apps"); basedir = new File("target", "apps");
@ -268,4 +276,78 @@ private void waitForNMsToRegister() throws Exception {
} }
} }
/**
* Creates a {@link ServiceClient} for test purposes.
*/
public static ServiceClient createClient(Configuration conf)
throws Exception {
ServiceClient client = new ServiceClient() {
@Override
protected Path addJarResource(String appName,
Map<String, LocalResource> localResources)
throws IOException, SliderException {
// do nothing, the Unit test will use local jars
return null;
}
};
client.init(conf);
client.start();
return client;
}
/**
* Watcher to initialize yarn service base path under target and deletes the
* the test directory when finishes.
*/
public static class ServiceFSWatcher extends TestWatcher {
private YarnConfiguration conf;
private SliderFileSystem fs;
private java.nio.file.Path serviceBasePath;
@Override
protected void starting(Description description) {
conf = new YarnConfiguration();
delete(description);
serviceBasePath = Paths.get("target",
description.getClassName(), description.getMethodName());
conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
try {
fs = new SliderFileSystem(conf);
} catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
protected void finished(Description description) {
delete(description);
}
private void delete(Description description) {
FileUtils.deleteQuietly(Paths.get("target",
description.getClassName()).toFile());
}
/**
* Returns the yarn conf.
*/
public YarnConfiguration getConf() {
return conf;
}
/**
* Returns the file system.
*/
public SliderFileSystem getFs() {
return fs;
}
/**
* Returns the test service base path.
*/
public java.nio.file.Path getServiceBasePath() {
return serviceBasePath;
}
}
} }

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* Tests for {@link UpgradeComponentsFinder.DefaultUpgradeComponentsFinder}.
*/
public class TestDefaultUpgradeComponentsFinder {
private UpgradeComponentsFinder.DefaultUpgradeComponentsFinder finder =
new UpgradeComponentsFinder.DefaultUpgradeComponentsFinder();
@Test
public void testServiceArtifactChange() {
Service currentDef = ServiceTestUtils.createExampleApplication();
Service targetDef = ServiceTestUtils.createExampleApplication();
targetDef.getComponents().forEach(x -> x.setArtifact(
TestServiceManager.createTestArtifact("v1")));
Assert.assertEquals("all components need upgrade",
targetDef.getComponents(), finder.findTargetComponentSpecs(currentDef,
targetDef));
}
@Test
public void testComponentArtifactChange() {
Service currentDef = TestServiceManager.createBaseDef("test");
Service targetDef = TestServiceManager.createBaseDef("test");
targetDef.getComponents().get(0).setArtifact(
TestServiceManager.createTestArtifact("v2"));
List<Component> expected = new ArrayList<>();
expected.add(targetDef.getComponents().get(0));
Assert.assertEquals("single components needs upgrade",
expected, finder.findTargetComponentSpecs(currentDef,
targetDef));
}
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link ServiceManager}.
*/
public class TestServiceManager {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testUpgrade() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testUpgrade");
upgrade(serviceManager, "v2", false);
Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartNothingToUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", false);
//make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp -> {
comp.setState(ComponentState.STABLE);
});
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service not re-started", ServiceState.STABLE,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartWithPendingUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", true);
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service should still be upgrading",
ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
}
private void upgrade(ServiceManager service, String version,
boolean upgradeArtifact)
throws IOException, SliderException {
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(service.getName());
upgradedDef.setVersion(version);
if (upgradeArtifact) {
Artifact upgradedArtifact = createTestArtifact("2");
upgradedDef.getComponents().forEach(component -> {
component.setArtifact(upgradedArtifact);
});
}
writeUpgradedDef(upgradedDef);
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
upgradeEvent.setVersion("v2");
service.handle(upgradeEvent);
}
private ServiceManager createTestServiceManager(String name)
throws IOException {
ServiceContext context = new ServiceContext();
context.service = createBaseDef(name);
context.fs = rule.getFs();
context.scheduler = new ServiceScheduler(context) {
@Override
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
};
context.scheduler.init(rule.getConf());
Map<String, org.apache.hadoop.yarn.service.component.Component>
componentState = context.scheduler.getAllComponents();
context.service.getComponents().forEach(component -> {
componentState.put(component.getName(),
new org.apache.hadoop.yarn.service.component.Component(component,
1L, context));
});
return new ServiceManager(context);
}
static Service createBaseDef(String name) {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
Service serviceDef = ServiceTestUtils.createExampleApplication();
serviceDef.setId(applicationId.toString());
serviceDef.setName(name);
serviceDef.setState(ServiceState.STARTED);
Artifact artifact = createTestArtifact("1");
serviceDef.getComponents().forEach(component ->
component.setArtifact(artifact));
return serviceDef;
}
static Artifact createTestArtifact(String artifactId) {
Artifact artifact = new Artifact();
artifact.setId(artifactId);
artifact.setType(Artifact.TypeEnum.TARBALL);
return artifact;
}
private void writeUpgradedDef(Service upgradedDef)
throws IOException, SliderException {
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
upgradedDef.getName(), upgradedDef.getVersion());
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
upgradedDef);
}
}

View File

@ -33,7 +33,7 @@
import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.junit.After; import org.junit.After;
@ -86,7 +86,7 @@ public void tearDown() throws IOException {
@Test (timeout = 200000) @Test (timeout = 200000)
public void testCreateFlexStopDestroyService() throws Exception { public void testCreateFlexStopDestroyService() throws Exception {
setupInternal(NUM_NMS); setupInternal(NUM_NMS);
ServiceClient client = createClient(); ServiceClient client = createClient(getConf());
Service exampleApp = createExampleApplication(); Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp); client.actionCreate(exampleApp);
SliderFileSystem fileSystem = new SliderFileSystem(getConf()); SliderFileSystem fileSystem = new SliderFileSystem(getConf());
@ -141,7 +141,7 @@ public void testCreateFlexStopDestroyService() throws Exception {
@Test (timeout = 200000) @Test (timeout = 200000)
public void testComponentStartOrder() throws Exception { public void testComponentStartOrder() throws Exception {
setupInternal(NUM_NMS); setupInternal(NUM_NMS);
ServiceClient client = createClient(); ServiceClient client = createClient(getConf());
Service exampleApp = new Service(); Service exampleApp = new Service();
exampleApp.setName("teststartorder"); exampleApp.setName("teststartorder");
exampleApp.setVersion("v1"); exampleApp.setVersion("v1");
@ -169,7 +169,7 @@ public void testCreateServiceSameNameDifferentUser() throws Exception {
String userB = "userb"; String userB = "userb";
setupInternal(NUM_NMS); setupInternal(NUM_NMS);
ServiceClient client = createClient(); ServiceClient client = createClient(getConf());
String origBasePath = getConf().get(YARN_SERVICE_BASE_PATH); String origBasePath = getConf().get(YARN_SERVICE_BASE_PATH);
Service userAApp = new Service(); Service userAApp = new Service();
@ -221,7 +221,7 @@ public void testCreateServiceSameNameSameUser() throws Exception {
System.setProperty("user.name", user); System.setProperty("user.name", user);
setupInternal(NUM_NMS); setupInternal(NUM_NMS);
ServiceClient client = createClient(); ServiceClient client = createClient(getConf());
Service appA = new Service(); Service appA = new Service();
appA.setName(sameAppName); appA.setName(sameAppName);
@ -290,7 +290,7 @@ public void testRecoverComponentsAfterRMRestart() throws Exception {
setConf(conf); setConf(conf);
setupInternal(NUM_NMS); setupInternal(NUM_NMS);
ServiceClient client = createClient(); ServiceClient client = createClient(getConf());
Service exampleApp = createExampleApplication(); Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp); client.actionCreate(exampleApp);
Multimap<String, String> containersBeforeFailure = Multimap<String, String> containersBeforeFailure =
@ -333,6 +333,28 @@ public void testRecoverComponentsAfterRMRestart() throws Exception {
client.actionDestroy(exampleApp.getName()); client.actionDestroy(exampleApp.getName());
} }
@Test(timeout = 200000)
public void testUpgradeService() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient(getConf());
Service service = createExampleApplication();
client.actionCreate(service);
waitForServiceToBeStarted(client, service);
//upgrade the service
service.setVersion("v2");
client.actionUpgrade(service);
//wait for service to be in upgrade state
waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
SliderFileSystem fs = new SliderFileSystem(getConf());
Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs,
service.getName(), service.getVersion());
Assert.assertEquals(service.getName(), fromFs.getName());
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
}
// Check containers launched are in dependency order // Check containers launched are in dependency order
// Get all containers into a list and sort based on container launch time e.g. // Get all containers into a list and sort based on container launch time e.g.
// compa-c1, compa-c2, compb-c1, compb-c2; // compa-c1, compa-c2, compb-c1, compb-c2;
@ -470,16 +492,7 @@ private Multimap<String, String> waitForAllCompToBeReady(ServiceClient client,
*/ */
private void waitForServiceToBeStable(ServiceClient client, private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException { Service exampleApp) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> { waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE);
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == ServiceState.STABLE;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
} }
/** /**
@ -492,11 +505,25 @@ private void waitForServiceToBeStable(ServiceClient client,
*/ */
private void waitForServiceToBeStarted(ServiceClient client, private void waitForServiceToBeStarted(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException { Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState) throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
try { try {
Service retrievedApp = client.getStatus(exampleApp.getName()); Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp); System.out.println(retrievedApp);
return retrievedApp.getState() == ServiceState.STARTED; return retrievedApp.getState() == desiredState;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return false; return false;
@ -504,21 +531,6 @@ private void waitForServiceToBeStarted(ServiceClient client,
}, 2000, 200000); }, 2000, 200000);
} }
private ServiceClient createClient() throws Exception {
ServiceClient client = new ServiceClient() {
@Override protected Path addJarResource(String appName,
Map<String, LocalResource> localResources)
throws IOException, SliderException {
// do nothing, the Unit test will use local jars
return null;
}
};
client.init(getConf());
client.start();
return client;
}
private int countTotalContainers(Service service) { private int countTotalContainers(Service service) {
int totalContainers = 0; int totalContainers = 0;
for (Component component : service.getComponents()) { for (Component component : service.getComponents()) {

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link ServiceClient}.
*/
public class TestServiceClient {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testActionUpgrade() throws Exception {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
ServiceClient client = createServiceClient(applicationId);
Service service = ServiceTestUtils.createExampleApplication();
service.setVersion("v1");
client.actionCreate(service);
//upgrade the service
service.setVersion("v2");
client.actionUpgrade(service);
//wait for service to be in upgrade state
Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(),
service.getName(), service.getVersion());
Assert.assertEquals(service.getName(), fromFs.getName());
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
}
private ServiceClient createServiceClient(ApplicationId applicationId)
throws Exception {
ClientAMProtocol amProxy = mock(ClientAMProtocol.class);
YarnClient yarnClient = createMockYarnClient();
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ApplicationAttemptReport attemptReport =
ApplicationAttemptReport.newInstance(attemptId, "localhost", 0,
null, null, null,
YarnApplicationAttemptState.RUNNING, null);
ApplicationReport appReport = mock(ApplicationReport.class);
when(appReport.getHost()).thenReturn("localhost");
when(yarnClient.getApplicationAttemptReport(Matchers.any()))
.thenReturn(attemptReport);
when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport);
ServiceClient client = new ServiceClient() {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
}
@Override
protected ClientAMProtocol createAMProxy(String serviceName,
ApplicationReport appReport) throws IOException, YarnException {
return amProxy;
}
@Override
ApplicationId submitApp(Service app) throws IOException, YarnException {
return applicationId;
}
};
client.setFileSystem(rule.getFs());
client.setYarnClient(yarnClient);
client.init(rule.getConf());
client.start();
return client;
}
private YarnClient createMockYarnClient() throws IOException, YarnException {
YarnClient yarnClient = mock(YarnClient.class);
when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class)))
.thenReturn(new ArrayList<>());
return yarnClient;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
/**
* Tests for {@link CoreFileSystem}.
*/
public class TestCoreFileSystem {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testClusterUpgradeDirPath() {
String serviceName = "testClusterUpgrade";
String version = "v1";
Path expectedPath = new Path(rule.getFs().buildClusterDirPath(serviceName),
YarnServiceConstants.UPGRADE_DIR + "/" + version);
Assert.assertEquals("incorrect upgrade path", expectedPath,
rule.getFs().buildClusterUpgradeDirPath(serviceName, version));
}
}

View File

@ -229,4 +229,20 @@ public abstract int enableFastLaunch(String destinationFolder)
@Unstable @Unstable
public abstract String getStatusString(String appIdOrName) throws public abstract String getStatusString(String appIdOrName) throws
IOException, YarnException; IOException, YarnException;
/**
* Upgrade a long running service.
*
* @param appName the name of the application
* @param fileName specification of application upgrade to save.
*
* @return exit code
* @throws IOException IOException
* @throws YarnException exception in client or server
*/
@Public
@Unstable
public abstract int actionUpgrade(String appName, String fileName)
throws IOException, YarnException;
} }