diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 0bc5a2c8418..713d890ff92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -1552,6 +1552,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, LOG.info("Service {} does not have an application ID", serviceName); return appSpec; } + appSpec.setId(currentAppId.toString()); ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); appSpec.setState(convertState(appReport.getYarnApplicationState())); ApplicationTimeout lifetime = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java index c81393b08be..35e21fc8d37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java @@ -47,6 +47,11 @@ public abstract class JobMonitor { public abstract JobStatus getTrainingJobStatus(String jobName) throws IOException, YarnException; + /** + * Cleanup AppAdminClient, etc. + */ + public void cleanup() throws IOException {} + /** * Continue wait and print status if job goes to ready or final state. * @param jobName @@ -80,5 +85,6 @@ public abstract class JobMonitor { throw new IOException(e); } } + cleanup(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java index fab018a9004..ee68ddbd5ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java @@ -14,9 +14,10 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.apache.hadoop.yarn.submarine.common.api.JobStatus; import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder; @@ -25,21 +26,33 @@ import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; import java.io.IOException; public class YarnServiceJobMonitor extends JobMonitor { - private ServiceClient serviceClient = null; + private volatile AppAdminClient serviceClient = null; public YarnServiceJobMonitor(ClientContext clientContext) { super(clientContext); } @Override - public synchronized JobStatus getTrainingJobStatus(String jobName) + public JobStatus getTrainingJobStatus(String jobName) throws IOException, YarnException { if (this.serviceClient == null) { - this.serviceClient = YarnServiceUtils.createServiceClient( - clientContext.getYarnConfig()); + synchronized(this) { + if (this.serviceClient == null) { + this.serviceClient = YarnServiceUtils.createServiceClient( + clientContext.getYarnConfig()); + } + } } + String appStatus=serviceClient.getStatusString(jobName); + Service serviceSpec= ServiceApiUtil.jsonSerDeser.fromJson(appStatus); + JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec); + return jobStatus; + } - Service serviceSpec = this.serviceClient.getStatus(jobName); - return JobStatusBuilder.fromServiceSpec(serviceSpec); + @Override + public void cleanup() throws IOException{ + if (this.serviceClient != null) { + this.serviceClient.close(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index bcd4698f911..d28f89fbac7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.records.Artifact; @@ -27,7 +28,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.common.ClientContext; @@ -53,6 +54,8 @@ import java.util.Set; import java.util.StringTokenizer; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; /** * Submit a job to cluster @@ -527,6 +530,20 @@ public class YarnServiceJobSubmitter implements JobSubmitter { return serviceSpec; } + private String generateServiceSpecFile(Service service) throws IOException { + File serviceSpecFile = File.createTempFile(service.getName(), ".json"); + String buffer = jsonSerDeser.toJson(service); + Writer w = new OutputStreamWriter(new FileOutputStream(serviceSpecFile), + "UTF-8"); + PrintWriter pw = new PrintWriter(w); + try { + pw.append(buffer); + } finally { + pw.close(); + } + return serviceSpecFile.getAbsolutePath(); + } + /** * {@inheritDoc} */ @@ -534,13 +551,30 @@ public class YarnServiceJobSubmitter implements JobSubmitter { public ApplicationId submitJob(RunJobParameters parameters) throws IOException, YarnException { createServiceByParameters(parameters); - ServiceClient serviceClient = YarnServiceUtils.createServiceClient( + String serviceSpecFile = generateServiceSpecFile(serviceSpec); + + AppAdminClient appAdminClient = YarnServiceUtils.createServiceClient( clientContext.getYarnConfig()); - ApplicationId appid = serviceClient.actionCreate(serviceSpec); - serviceClient.stop(); + int code = appAdminClient.actionLaunch(serviceSpecFile, + serviceSpec.getName(), null, null); + if(code != EXIT_SUCCESS) { + throw new YarnException("Fail to launch application with exit code:" + + code); + } + + String appStatus=appAdminClient.getStatusString(serviceSpec.getName()); + Service app=ServiceApiUtil.jsonSerDeser.fromJson(appStatus); + if(app.getId() == null) { + throw new YarnException("Can't get application id for Service " + + serviceSpec.getName()); + } + ApplicationId appid = ApplicationId.fromString(app.getId()); + appAdminClient.stop(); return appid; } + + @VisibleForTesting public Service getServiceSpec() { return serviceSpec; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java index ce3a1eb1b33..c599fc9591b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java @@ -16,8 +16,8 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.submarine.common.Envs; import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; import org.slf4j.Logger; @@ -26,27 +26,28 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import static org.apache.hadoop.yarn.client.api.AppAdminClient.DEFAULT_TYPE; + public class YarnServiceUtils { private static final Logger LOG = LoggerFactory.getLogger(YarnServiceUtils.class); // This will be true only in UT. - private static ServiceClient stubServiceClient = null; + private static AppAdminClient stubServiceClient = null; - public static ServiceClient createServiceClient( + public static AppAdminClient createServiceClient( Configuration yarnConfiguration) { if (stubServiceClient != null) { return stubServiceClient; } - ServiceClient serviceClient = new ServiceClient(); - serviceClient.init(yarnConfiguration); - serviceClient.start(); + AppAdminClient serviceClient = AppAdminClient.createAppAdminClient( + DEFAULT_TYPE, yarnConfiguration); return serviceClient; } @VisibleForTesting - public static void setStubServiceClient(ServiceClient stubServiceClient) { + public static void setStubServiceClient(AppAdminClient stubServiceClient) { YarnServiceUtils.stubServiceClient = stubServiceClient; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java index 89d39a057af..439103042a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java @@ -19,12 +19,11 @@ package org.apache.hadoop.yarn.submarine.client.cli.yarnservice; import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli; import org.apache.hadoop.yarn.submarine.common.MockClientContext; import org.apache.hadoop.yarn.submarine.common.api.TaskType; @@ -45,6 +44,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Map; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,9 +53,11 @@ public class TestYarnServiceRunJobCli { @Before public void before() throws IOException, YarnException { SubmarineLogs.verboseOff(); - ServiceClient serviceClient = mock(ServiceClient.class); - when(serviceClient.actionCreate(any(Service.class))).thenReturn( - ApplicationId.newInstance(1234L, 1)); + AppAdminClient serviceClient = mock(AppAdminClient.class); + when(serviceClient.actionLaunch(any(String.class), any(String.class), + any(Long.class), any(String.class))).thenReturn(EXIT_SUCCESS); + when(serviceClient.getStatusString(any(String.class))).thenReturn( + "{\"id\": \"application_1234_1\"}"); YarnServiceUtils.setStubServiceClient(serviceClient); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java index 5c06ddc7560..b59c01e6e21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.submarine.common.fs.MockRemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import java.io.IOException;