YARN-9001. [Submarine] Use AppAdminClient instead of ServiceClient to sumbit jobs. (Zac Zhou via wangda)

Change-Id: Ic3d6c1e439df9cdf74448b345b925343224efe51
This commit is contained in:
Wangda Tan 2018-11-13 13:13:27 -08:00
parent 9da6054ca4
commit fcbd205cc3
7 changed files with 80 additions and 24 deletions

View File

@ -1552,6 +1552,7 @@ public Service getStatus(String serviceName)
LOG.info("Service {} does not have an application ID", serviceName); LOG.info("Service {} does not have an application ID", serviceName);
return appSpec; return appSpec;
} }
appSpec.setId(currentAppId.toString());
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
appSpec.setState(convertState(appReport.getYarnApplicationState())); appSpec.setState(convertState(appReport.getYarnApplicationState()));
ApplicationTimeout lifetime = ApplicationTimeout lifetime =

View File

@ -47,6 +47,11 @@ public JobMonitor(ClientContext clientContext) {
public abstract JobStatus getTrainingJobStatus(String jobName) public abstract JobStatus getTrainingJobStatus(String jobName)
throws IOException, YarnException; throws IOException, YarnException;
/**
* Cleanup AppAdminClient, etc.
*/
public void cleanup() throws IOException {}
/** /**
* Continue wait and print status if job goes to ready or final state. * Continue wait and print status if job goes to ready or final state.
* @param jobName * @param jobName
@ -80,5 +85,6 @@ public void waitTrainingFinal(String jobName)
throw new IOException(e); throw new IOException(e);
} }
} }
cleanup();
} }
} }

View File

@ -14,9 +14,10 @@
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.common.api.JobStatus; import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder; import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder;
@ -25,21 +26,33 @@
import java.io.IOException; import java.io.IOException;
public class YarnServiceJobMonitor extends JobMonitor { public class YarnServiceJobMonitor extends JobMonitor {
private ServiceClient serviceClient = null; private volatile AppAdminClient serviceClient = null;
public YarnServiceJobMonitor(ClientContext clientContext) { public YarnServiceJobMonitor(ClientContext clientContext) {
super(clientContext); super(clientContext);
} }
@Override @Override
public synchronized JobStatus getTrainingJobStatus(String jobName) public JobStatus getTrainingJobStatus(String jobName)
throws IOException, YarnException { throws IOException, YarnException {
if (this.serviceClient == null) { if (this.serviceClient == null) {
this.serviceClient = YarnServiceUtils.createServiceClient( synchronized(this) {
clientContext.getYarnConfig()); if (this.serviceClient == null) {
this.serviceClient = YarnServiceUtils.createServiceClient(
clientContext.getYarnConfig());
}
}
} }
String appStatus=serviceClient.getStatusString(jobName);
Service serviceSpec= ServiceApiUtil.jsonSerDeser.fromJson(appStatus);
JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec);
return jobStatus;
}
Service serviceSpec = this.serviceClient.getStatus(jobName); @Override
return JobStatusBuilder.fromServiceSpec(serviceSpec); public void cleanup() throws IOException{
if (this.serviceClient != null) {
this.serviceClient.close();
}
} }
} }

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact;
@ -27,7 +28,7 @@
import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink; import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.apache.hadoop.yarn.submarine.common.ClientContext;
@ -53,6 +54,8 @@
import java.util.StringTokenizer; import java.util.StringTokenizer;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
/** /**
* Submit a job to cluster * Submit a job to cluster
@ -527,6 +530,20 @@ private Service createServiceByParameters(RunJobParameters parameters)
return serviceSpec; return serviceSpec;
} }
private String generateServiceSpecFile(Service service) throws IOException {
File serviceSpecFile = File.createTempFile(service.getName(), ".json");
String buffer = jsonSerDeser.toJson(service);
Writer w = new OutputStreamWriter(new FileOutputStream(serviceSpecFile),
"UTF-8");
PrintWriter pw = new PrintWriter(w);
try {
pw.append(buffer);
} finally {
pw.close();
}
return serviceSpecFile.getAbsolutePath();
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -534,13 +551,30 @@ private Service createServiceByParameters(RunJobParameters parameters)
public ApplicationId submitJob(RunJobParameters parameters) public ApplicationId submitJob(RunJobParameters parameters)
throws IOException, YarnException { throws IOException, YarnException {
createServiceByParameters(parameters); createServiceByParameters(parameters);
ServiceClient serviceClient = YarnServiceUtils.createServiceClient( String serviceSpecFile = generateServiceSpecFile(serviceSpec);
AppAdminClient appAdminClient = YarnServiceUtils.createServiceClient(
clientContext.getYarnConfig()); clientContext.getYarnConfig());
ApplicationId appid = serviceClient.actionCreate(serviceSpec); int code = appAdminClient.actionLaunch(serviceSpecFile,
serviceClient.stop(); serviceSpec.getName(), null, null);
if(code != EXIT_SUCCESS) {
throw new YarnException("Fail to launch application with exit code:" +
code);
}
String appStatus=appAdminClient.getStatusString(serviceSpec.getName());
Service app=ServiceApiUtil.jsonSerDeser.fromJson(appStatus);
if(app.getId() == null) {
throw new YarnException("Can't get application id for Service " +
serviceSpec.getName());
}
ApplicationId appid = ApplicationId.fromString(app.getId());
appAdminClient.stop();
return appid; return appid;
} }
@VisibleForTesting @VisibleForTesting
public Service getServiceSpec() { public Service getServiceSpec() {
return serviceSpec; return serviceSpec;

View File

@ -16,8 +16,8 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.submarine.common.Envs; import org.apache.hadoop.yarn.submarine.common.Envs;
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -26,27 +26,28 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.yarn.client.api.AppAdminClient.DEFAULT_TYPE;
public class YarnServiceUtils { public class YarnServiceUtils {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(YarnServiceUtils.class); LoggerFactory.getLogger(YarnServiceUtils.class);
// This will be true only in UT. // This will be true only in UT.
private static ServiceClient stubServiceClient = null; private static AppAdminClient stubServiceClient = null;
public static ServiceClient createServiceClient( public static AppAdminClient createServiceClient(
Configuration yarnConfiguration) { Configuration yarnConfiguration) {
if (stubServiceClient != null) { if (stubServiceClient != null) {
return stubServiceClient; return stubServiceClient;
} }
ServiceClient serviceClient = new ServiceClient(); AppAdminClient serviceClient = AppAdminClient.createAppAdminClient(
serviceClient.init(yarnConfiguration); DEFAULT_TYPE, yarnConfiguration);
serviceClient.start();
return serviceClient; return serviceClient;
} }
@VisibleForTesting @VisibleForTesting
public static void setStubServiceClient(ServiceClient stubServiceClient) { public static void setStubServiceClient(AppAdminClient stubServiceClient) {
YarnServiceUtils.stubServiceClient = stubServiceClient; YarnServiceUtils.stubServiceClient = stubServiceClient;
} }

View File

@ -19,12 +19,11 @@
package org.apache.hadoop.yarn.submarine.client.cli.yarnservice; package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli; import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
import org.apache.hadoop.yarn.submarine.common.MockClientContext; import org.apache.hadoop.yarn.submarine.common.MockClientContext;
import org.apache.hadoop.yarn.submarine.common.api.TaskType; import org.apache.hadoop.yarn.submarine.common.api.TaskType;
@ -45,6 +44,7 @@
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -53,9 +53,11 @@ public class TestYarnServiceRunJobCli {
@Before @Before
public void before() throws IOException, YarnException { public void before() throws IOException, YarnException {
SubmarineLogs.verboseOff(); SubmarineLogs.verboseOff();
ServiceClient serviceClient = mock(ServiceClient.class); AppAdminClient serviceClient = mock(AppAdminClient.class);
when(serviceClient.actionCreate(any(Service.class))).thenReturn( when(serviceClient.actionLaunch(any(String.class), any(String.class),
ApplicationId.newInstance(1234L, 1)); any(Long.class), any(String.class))).thenReturn(EXIT_SUCCESS);
when(serviceClient.getStatusString(any(String.class))).thenReturn(
"{\"id\": \"application_1234_1\"}");
YarnServiceUtils.setStubServiceClient(serviceClient); YarnServiceUtils.setStubServiceClient(serviceClient);
} }

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.io.IOException; import java.io.IOException;