MAPREDUCE-4007. JobClient getJob(JobID) should return NULL if the job does not exist (for backwards compatibility) (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1300750 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9180eca592
commit
7609243e58
|
@ -150,6 +150,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-3974. TestSubmitJob in MR1 tests doesn't compile after HDFS-162
|
MAPREDUCE-3974. TestSubmitJob in MR1 tests doesn't compile after HDFS-162
|
||||||
merge. (atm)
|
merge. (atm)
|
||||||
|
|
||||||
|
MAPREDUCE-4007. JobClient getJob(JobID) should return NULL if the job
|
||||||
|
does not exist (for backwards compatibility) (tucu)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -193,9 +193,6 @@ public class MRClientService extends AbstractService
|
||||||
private Job verifyAndGetJob(JobId jobID,
|
private Job verifyAndGetJob(JobId jobID,
|
||||||
boolean modifyAccess) throws YarnRemoteException {
|
boolean modifyAccess) throws YarnRemoteException {
|
||||||
Job job = appContext.getJob(jobID);
|
Job job = appContext.getJob(jobID);
|
||||||
if (job == null) {
|
|
||||||
throw RPCUtil.getRemoteException("Unknown job " + jobID);
|
|
||||||
}
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +234,12 @@ public class MRClientService extends AbstractService
|
||||||
Job job = verifyAndGetJob(jobId, false);
|
Job job = verifyAndGetJob(jobId, false);
|
||||||
GetJobReportResponse response =
|
GetJobReportResponse response =
|
||||||
recordFactory.newRecordInstance(GetJobReportResponse.class);
|
recordFactory.newRecordInstance(GetJobReportResponse.class);
|
||||||
response.setJobReport(job.getReport());
|
if (job != null) {
|
||||||
|
response.setJobReport(job.getReport());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
response.setJobReport(null);
|
||||||
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -199,11 +199,10 @@ public class HistoryClientService extends AbstractService {
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw RPCUtil.getRemoteException(e);
|
throw RPCUtil.getRemoteException(e);
|
||||||
}
|
}
|
||||||
if (job == null) {
|
if (job != null) {
|
||||||
throw RPCUtil.getRemoteException("Unknown job " + jobID);
|
JobACL operation = JobACL.VIEW_JOB;
|
||||||
|
checkAccess(job, operation);
|
||||||
}
|
}
|
||||||
JobACL operation = JobACL.VIEW_JOB;
|
|
||||||
checkAccess(job, operation);
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +220,12 @@ public class HistoryClientService extends AbstractService {
|
||||||
JobId jobId = request.getJobId();
|
JobId jobId = request.getJobId();
|
||||||
Job job = verifyAndGetJob(jobId);
|
Job job = verifyAndGetJob(jobId);
|
||||||
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
|
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
|
||||||
response.setJobReport(job.getReport());
|
if (job != null) {
|
||||||
|
response.setJobReport(job.getReport());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
response.setJobReport(null);
|
||||||
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -372,17 +372,21 @@ public class ClientServiceDelegate {
|
||||||
request.setJobId(jobId);
|
request.setJobId(jobId);
|
||||||
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
|
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
|
||||||
GetJobReportRequest.class, request)).getJobReport();
|
GetJobReportRequest.class, request)).getJobReport();
|
||||||
if (StringUtils.isEmpty(report.getJobFile())) {
|
JobStatus jobStatus = null;
|
||||||
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
|
if (report != null) {
|
||||||
report.setJobFile(jobFile);
|
if (StringUtils.isEmpty(report.getJobFile())) {
|
||||||
|
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
|
||||||
|
report.setJobFile(jobFile);
|
||||||
|
}
|
||||||
|
String historyTrackingUrl = report.getTrackingUrl();
|
||||||
|
String url = StringUtils.isNotEmpty(historyTrackingUrl)
|
||||||
|
? historyTrackingUrl : trackingUrl;
|
||||||
|
if (!UNAVAILABLE.equals(url)) {
|
||||||
|
url = "http://" + url;
|
||||||
|
}
|
||||||
|
jobStatus = TypeConverter.fromYarn(report, url);
|
||||||
}
|
}
|
||||||
String historyTrackingUrl = report.getTrackingUrl();
|
return jobStatus;
|
||||||
String url = StringUtils.isNotEmpty(historyTrackingUrl)
|
|
||||||
? historyTrackingUrl : trackingUrl;
|
|
||||||
if (!UNAVAILABLE.equals(url)) {
|
|
||||||
url = "http://" + url;
|
|
||||||
}
|
|
||||||
return TypeConverter.fromYarn(report, url);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
|
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.atLeastOnce;
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -167,4 +168,17 @@ public class JobClientUnitTest {
|
||||||
verify(mockCluster, never()).getJob(jobID);
|
verify(mockCluster, never()).getJob(jobID);
|
||||||
verify(mockJob, never()).getTaskReports(isA(TaskType.class));
|
verify(mockJob, never()).getTaskReports(isA(TaskType.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetJobWithUnknownJob() throws Exception {
|
||||||
|
TestJobClient client = new TestJobClient(new JobConf());
|
||||||
|
Cluster mockCluster = mock(Cluster.class);
|
||||||
|
client.setCluster(mockCluster);
|
||||||
|
JobID id = new JobID("unknown",0);
|
||||||
|
|
||||||
|
when(mockCluster.getJob(id)).thenReturn(null);
|
||||||
|
|
||||||
|
assertNull(client.getJob(id));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.v2;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.JobID;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||||
|
import org.apache.hadoop.mapred.RunningJob;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.Writer;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
public class TestNonExistentJob extends TestCase {
|
||||||
|
|
||||||
|
private MiniDFSCluster dfsCluster = null;
|
||||||
|
private MiniMRCluster mrCluster = null;
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
if (System.getProperty("hadoop.log.dir") == null) {
|
||||||
|
System.setProperty("hadoop.log.dir", "/tmp");
|
||||||
|
}
|
||||||
|
int taskTrackers = 2;
|
||||||
|
int dataNodes = 2;
|
||||||
|
String proxyUser = System.getProperty("user.name");
|
||||||
|
String proxyGroup = "g";
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("127.0.0.1,localhost");
|
||||||
|
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
|
||||||
|
sb.append(",").append(i.getCanonicalHostName());
|
||||||
|
}
|
||||||
|
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set("dfs.block.access.token.enable", "false");
|
||||||
|
conf.set("dfs.permissions", "true");
|
||||||
|
conf.set("hadoop.security.authentication", "simple");
|
||||||
|
|
||||||
|
dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
|
||||||
|
FileSystem fileSystem = dfsCluster.getFileSystem();
|
||||||
|
fileSystem.mkdirs(new Path("/tmp"));
|
||||||
|
fileSystem.mkdirs(new Path("/user"));
|
||||||
|
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
|
||||||
|
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
|
||||||
|
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
|
||||||
|
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
|
||||||
|
String nnURI = fileSystem.getUri().toString();
|
||||||
|
int numDirs = 1;
|
||||||
|
String[] racks = null;
|
||||||
|
String[] hosts = null;
|
||||||
|
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
|
||||||
|
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected JobConf getJobConf() {
|
||||||
|
return mrCluster.createJobConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
if (mrCluster != null) {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
}
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGetInvalidJob() throws Exception {
|
||||||
|
RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
|
||||||
|
assertNull(runJob);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue