YARN-8411. Restart stopped system service during RM start.
Contributed by Billie Rinaldi
This commit is contained in:
parent
7566e0ec5f
commit
69b0596897
|
@ -29,7 +29,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.service.SystemServiceManager;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -228,14 +230,33 @@ public class SystemServiceManagerImpl extends AbstractService
|
|||
userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
|
||||
@Override public ApplicationId run()
|
||||
throws IOException, YarnException {
|
||||
ApplicationId applicationId = serviceClient.actionCreate(service);
|
||||
return applicationId;
|
||||
boolean tryStart = true;
|
||||
try {
|
||||
serviceClient.actionBuild(service);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof SliderException && ((SliderException) e)
|
||||
.getExitCode() == SliderExitCodes.EXIT_INSTANCE_EXISTS) {
|
||||
LOG.info("Service {} already exists, will attempt to start " +
|
||||
"service", service.getName());
|
||||
} else {
|
||||
tryStart = false;
|
||||
LOG.info("Got exception saving {}, will not attempt to " +
|
||||
"start service", service.getName(), e);
|
||||
}
|
||||
}
|
||||
if (tryStart) {
|
||||
return serviceClient.actionStartAndGetId(service.getName());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
if (applicationId != null) {
|
||||
LOG.info("Service {} submitted with Application ID: {}",
|
||||
service.getName(), applicationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ServiceClient getServiceClient() {
|
||||
return new ServiceClient();
|
||||
|
|
|
@ -648,8 +648,7 @@ public class ApiServer {
|
|||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
sc.actionStart(appName);
|
||||
ApplicationId appId = sc.getAppId(appName);
|
||||
ApplicationId appId = sc.actionStartAndGetId(appName);
|
||||
sc.close();
|
||||
return appId;
|
||||
}
|
||||
|
|
|
@ -103,13 +103,13 @@ public class ServiceClientTest extends ServiceClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int actionStart(String serviceName)
|
||||
public ApplicationId actionStartAndGetId(String serviceName)
|
||||
throws YarnException, IOException {
|
||||
if (serviceName != null && serviceName.equals("jenkins")) {
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
serviceAppId.put(serviceName, appId);
|
||||
return EXIT_SUCCESS;
|
||||
return appId;
|
||||
} else {
|
||||
throw new ApplicationNotFoundException("");
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -51,6 +53,7 @@ public class TestSystemServiceManagerImpl {
|
|||
|
||||
private String[] users = new String[] {"user1", "user2"};
|
||||
private static Map<String, Set<String>> loadedServices = new HashMap<>();
|
||||
private static Map<String, Set<String>> savedServices = new HashMap<>();
|
||||
private static Map<String, Set<String>> submittedServices = new HashMap<>();
|
||||
|
||||
@Before
|
||||
|
@ -72,7 +75,7 @@ public class TestSystemServiceManagerImpl {
|
|||
}
|
||||
|
||||
@After
|
||||
public void teadDown() {
|
||||
public void tearDown() {
|
||||
systemService.stop();
|
||||
}
|
||||
|
||||
|
@ -102,6 +105,11 @@ public class TestSystemServiceManagerImpl {
|
|||
// 2nd time launch service to handle if service exist scenario
|
||||
systemService.launchUserService(userServices);
|
||||
verifyForLaunchedUserServices();
|
||||
|
||||
// verify start of stopped services
|
||||
submittedServices.clear();
|
||||
systemService.launchUserService(userServices);
|
||||
verifyForLaunchedUserServices();
|
||||
}
|
||||
|
||||
private void verifyForScannedUserServices(
|
||||
|
@ -149,7 +157,27 @@ public class TestSystemServiceManagerImpl {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ApplicationId actionCreate(Service service)
|
||||
public int actionBuild(Service service)
|
||||
throws YarnException, IOException {
|
||||
String userName =
|
||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
Set<String> services = savedServices.get(userName);
|
||||
if (services == null) {
|
||||
services = new HashSet<>();
|
||||
savedServices.put(userName, services);
|
||||
}
|
||||
if (services.contains(service.getName())) {
|
||||
String message = "Failed to save service " + service.getName()
|
||||
+ ", because it already exists.";
|
||||
throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
|
||||
message);
|
||||
}
|
||||
services.add(service.getName());
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationId actionStartAndGetId(String serviceName)
|
||||
throws YarnException, IOException {
|
||||
String userName =
|
||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
|
@ -158,12 +186,12 @@ public class TestSystemServiceManagerImpl {
|
|||
services = new HashSet<>();
|
||||
submittedServices.put(userName, services);
|
||||
}
|
||||
if (services.contains(service.getName())) {
|
||||
String message = "Failed to create service " + service.getName()
|
||||
+ ", because it already exists.";
|
||||
if (services.contains(serviceName)) {
|
||||
String message = "Failed to create service " + serviceName
|
||||
+ ", because it is already running.";
|
||||
throw new YarnException(message);
|
||||
}
|
||||
services.add(service.getName());
|
||||
services.add(serviceName);
|
||||
return ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1003,6 +1003,12 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
|
||||
@Override
|
||||
public int actionStart(String serviceName) throws YarnException, IOException {
|
||||
actionStartAndGetId(serviceName);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
public ApplicationId actionStartAndGetId(String serviceName) throws
|
||||
YarnException, IOException {
|
||||
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
||||
Service liveService = getStatus(serviceName);
|
||||
if (liveService == null ||
|
||||
|
@ -1019,11 +1025,11 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
// write app definition on to hdfs
|
||||
Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service);
|
||||
LOG.info("Persisted service " + service.getName() + " at " + appJson);
|
||||
return 0;
|
||||
return appId;
|
||||
} else {
|
||||
LOG.info("Finalize service {} upgrade");
|
||||
ApplicationReport appReport =
|
||||
yarnClient.getApplicationReport(getAppId(serviceName));
|
||||
ApplicationId appId = getAppId(serviceName);
|
||||
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||
if (StringUtils.isEmpty(appReport.getHost())) {
|
||||
throw new YarnException(serviceName + " AM hostname is empty");
|
||||
}
|
||||
|
@ -1032,7 +1038,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
RestartServiceRequestProto.Builder requestBuilder =
|
||||
RestartServiceRequestProto.newBuilder();
|
||||
proxy.restart(requestBuilder.build());
|
||||
return 0;
|
||||
return appId;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue