MAPREDUCE-3146. Added a MR specific command line to dump logs for a given TaskAttemptID. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195349 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-31 06:42:06 +00:00
parent 5f9e67e226
commit 47a381e306
42 changed files with 938 additions and 107 deletions

View File

@ -447,6 +447,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2989. Modified JobHistory to link to task and AM logs from the
JobHistoryServer. (Siddharth Seth via vinodkv)
MAPREDUCE-3146. Added a MR specific command line to dump logs for a
given TaskAttemptID. (Siddharth Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -48,9 +48,9 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -100,8 +100,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@ -130,9 +128,6 @@ public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private Clock clock;
private final long startTime;
private final long appSubmitTime;
@ -758,8 +753,8 @@ public class MRAppMaster extends CompositeService {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo =
new AMInfo(appAttemptID, startTime, containerID, nmHost, nmPort,
nmHttpPort);
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort);
amInfos.add(amInfo);
// /////////////////// Create the job itself.

View File

@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@ -61,6 +60,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@ -580,13 +580,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
startTime, finishTime, setupProgress, 0.0f,
0.0f, cleanupProgress, remoteJobConfFile.toString());
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, remoteJobConfFile.toString(), amInfos);
}
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
startTime, finishTime, setupProgress, computeProgress(mapTasks),
computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString());
appSubmitTime, startTime, finishTime, setupProgress,
computeProgress(mapTasks), computeProgress(reduceTasks),
cleanupProgress, remoteJobConfFile.toString(), amInfos);
} finally {
readLock.unlock();
}

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -158,6 +159,8 @@ public abstract class TaskAttemptImpl implements
private long finishTime;
private WrappedProgressSplitsBlock progressSplitBlock;
private int shufflePort = -1;
private String trackerName;
private int httpPort;
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
new CleanupContainerTransition();
@ -423,7 +426,7 @@ public abstract class TaskAttemptImpl implements
stateMachine;
private ContainerId containerID;
private String nodeHostName;
private NodeId containerNodeId;
private String containerMgrAddress;
private String nodeHttpAddress;
private WrappedJvmID jvmID;
@ -763,6 +766,12 @@ public abstract class TaskAttemptImpl implements
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(getCounters());
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.containerNodeId != null) {
result.setNodeManagerPort(this.containerNodeId.getPort());
}
return result;
} finally {
readLock.unlock();
@ -1001,8 +1010,8 @@ public abstract class TaskAttemptImpl implements
final TaskAttemptContainerAssignedEvent cEvent =
(TaskAttemptContainerAssignedEvent) event;
taskAttempt.containerID = cEvent.getContainer().getId();
taskAttempt.nodeHostName = cEvent.getContainer().getNodeId().getHost();
taskAttempt.containerMgrAddress = cEvent.getContainer().getNodeId()
taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
.toString();
taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
@ -1113,6 +1122,8 @@ public abstract class TaskAttemptImpl implements
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
// Costly?
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
.getJobId());

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.recover;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.Dispatcher;

View File

@ -36,11 +36,11 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -158,13 +159,24 @@ public class RecoveryService extends CompositeService implements Recovery {
public Set<TaskId> getCompletedTasks() {
return completedTasks.keySet();
}
@Override
public List<AMInfo> getAMInfos() {
if (jobInfo == null || jobInfo.getAMInfos() == null) {
return new LinkedList<AMInfo>();
}
return new LinkedList<AMInfo>(jobInfo.getAMInfos());
List<AMInfo> amInfos = new LinkedList<AMInfo>();
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
.getAMInfos()) {
AMInfo amInfo =
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
return amInfos;
}
private void parse() throws IOException {

View File

@ -24,7 +24,7 @@ import com.google.inject.Inject;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;

View File

@ -96,6 +96,10 @@ public class MRApp extends MRAppMaster {
private File testWorkDir;
private Path testAbsPath;
public static String NM_HOST = "localhost";
public static int NM_PORT = 1234;
public static int NM_HTTP_PORT = 9999;
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -136,7 +140,8 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), "testhost", 2222, 3333, System.currentTimeMillis());
applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System
.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@ -363,9 +368,9 @@ public class MRApp extends MRAppMaster {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++);
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
Container container = BuilderUtils.newContainer(cId, nodeId,
"localhost:9999", null, null, null);
NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.MockApps;
@ -88,6 +89,10 @@ public class MockJobs extends MockApps {
static final Iterator<String> DIAGS = Iterators.cycle(
"Error: java.lang.OutOfMemoryError: Java heap space",
"Lost task tracker: tasktracker.domain/127.0.0.1:40879");
public static final String NM_HOST = "localhost";
public static final int NM_PORT = 1234;
public static final int NM_HTTP_PORT = 9999;
static final int DT = 1000000; // ms
@ -507,8 +512,7 @@ public class MockJobs extends MockApps {
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(100, 1), attempt);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
return new AMInfo(appAttemptId, System.currentTimeMillis(), containerId,
"testhost", 2222, 3333);
return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
}
}

View File

@ -32,8 +32,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompleti
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@ -103,8 +106,9 @@ public class TestMRClientService {
GetJobReportRequest gjrRequest =
recordFactory.newRecordInstance(GetJobReportRequest.class);
gjrRequest.setJobId(job.getID());
Assert.assertNotNull("JobReport is null",
proxy.getJobReport(gjrRequest).getJobReport());
JobReport jr = proxy.getJobReport(gjrRequest).getJobReport();
verifyJobReport(jr);
GetTaskAttemptCompletionEventsRequest gtaceRequest =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
@ -123,8 +127,10 @@ public class TestMRClientService {
GetTaskAttemptReportRequest gtarRequest =
recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
gtarRequest.setTaskAttemptId(attempt.getID());
Assert.assertNotNull("TaskAttemptReport is null",
proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport());
TaskAttemptReport tar =
proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport();
verifyTaskAttemptReport(tar);
GetTaskReportRequest gtrRequest =
recordFactory.newRecordInstance(GetTaskReportRequest.class);
@ -164,6 +170,31 @@ public class TestMRClientService {
app.waitForState(job, JobState.SUCCEEDED);
}
private void verifyJobReport(JobReport jr) {
Assert.assertNotNull("JobReport is null", jr);
List<AMInfo> amInfos = jr.getAMInfos();
Assert.assertEquals(1, amInfos.size());
Assert.assertEquals(JobState.RUNNING, jr.getJobState());
AMInfo amInfo = amInfos.get(0);
Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
.getAttemptId());
Assert.assertTrue(amInfo.getStartTime() > 0);
}
private void verifyTaskAttemptReport(TaskAttemptReport tar) {
Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState());
Assert.assertNotNull("TaskAttemptReport is null", tar);
Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort());
Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId()
.getAttemptId());
}
class MRAppWithClientService extends MRApp {
MRClientService clientService = null;
MRAppWithClientService(int maps, int reduces, boolean autoComplete) {

View File

@ -117,8 +117,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0, "jobfile"));
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -194,8 +194,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0, "jobfile"));
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -260,8 +260,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0, "jobfile"));
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -374,8 +374,8 @@ public class TestRMContainerAllocator {
@Override
public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress, "jobfile");
JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress, "jobfile", null);
}
}
@ -510,8 +510,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0, "jobfile"));
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);

View File

@ -39,10 +39,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@ -221,9 +221,9 @@ public class TestRecovery {
.getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertEquals("testhost", amInfo.getNodeManagerHost());
Assert.assertEquals(2222, amInfo.getNodeManagerPort());
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
}
long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();

View File

@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.records;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
public interface AMInfo {
public ApplicationAttemptId getAppAttemptId();
public long getStartTime();
public ContainerId getContainerId();
public String getNodeManagerHost();
public int getNodeManagerPort();
public int getNodeManagerHttpPort();
public void setAppAttemptId(ApplicationAttemptId appAttemptId);
public void setStartTime(long startTime);
public void setContainerId(ContainerId containerId);
public void setNodeManagerHost(String nmHost);
public void setNodeManagerPort(int nmPort);
public void setNodeManagerHttpPort(int mnHttpPort);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.api.records;
import java.util.List;
public interface JobReport {
public abstract JobId getJobId();
public abstract JobState getJobState();
@ -25,6 +27,7 @@ public interface JobReport {
public abstract float getReduceProgress();
public abstract float getCleanupProgress();
public abstract float getSetupProgress();
public abstract long getSubmitTime();
public abstract long getStartTime();
public abstract long getFinishTime();
public abstract String getUser();
@ -32,6 +35,7 @@ public interface JobReport {
public abstract String getTrackingUrl();
public abstract String getDiagnostics();
public abstract String getJobFile();
public abstract List<AMInfo> getAMInfos();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@ -39,6 +43,7 @@ public interface JobReport {
public abstract void setReduceProgress(float progress);
public abstract void setCleanupProgress(float progress);
public abstract void setSetupProgress(float progress);
public abstract void setSubmitTime(long submitTime);
public abstract void setStartTime(long startTime);
public abstract void setFinishTime(long finishTime);
public abstract void setUser(String user);
@ -46,4 +51,5 @@ public interface JobReport {
public abstract void setTrackingUrl(String trackingUrl);
public abstract void setDiagnostics(String diagnostics);
public abstract void setJobFile(String jobFile);
public abstract void setAMInfos(List<AMInfo> amInfos);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.api.records;
import org.apache.hadoop.yarn.api.records.ContainerId;
public interface TaskAttemptReport {
public abstract TaskAttemptId getTaskAttemptId();
public abstract TaskAttemptState getTaskAttemptState();
@ -32,6 +34,10 @@ public interface TaskAttemptReport {
public abstract String getDiagnosticInfo();
public abstract String getStateString();
public abstract Phase getPhase();
public abstract String getNodeManagerHost();
public abstract int getNodeManagerPort();
public abstract int getNodeManagerHttpPort();
public abstract ContainerId getContainerId();
public abstract void setTaskAttemptId(TaskAttemptId taskAttemptId);
public abstract void setTaskAttemptState(TaskAttemptState taskAttemptState);
@ -42,6 +48,10 @@ public interface TaskAttemptReport {
public abstract void setDiagnosticInfo(String diagnosticInfo);
public abstract void setStateString(String stateString);
public abstract void setPhase(Phase phase);
public abstract void setNodeManagerHost(String nmHost);
public abstract void setNodeManagerPort(int nmPort);
public abstract void setNodeManagerHttpPort(int nmHttpPort);
public abstract void setContainerId(ContainerId containerId);
/**
* Set the shuffle finish time. Applicable only for reduce attempts

View File

@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
public class AMInfoPBImpl extends ProtoBase<AMInfoProto> implements AMInfo {
AMInfoProto proto = AMInfoProto.getDefaultInstance();
AMInfoProto.Builder builder = null;
boolean viaProto = false;
private ApplicationAttemptId appAttemptId;
private ContainerId containerId;
public AMInfoPBImpl() {
builder = AMInfoProto.newBuilder();
}
public AMInfoPBImpl(AMInfoProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized AMInfoProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void mergeLocalToBuilder() {
if (this.appAttemptId != null
&& !((ApplicationAttemptIdPBImpl) this.appAttemptId).getProto().equals(
builder.getApplicationAttemptId())) {
builder.setApplicationAttemptId(convertToProtoFormat(this.appAttemptId));
}
if (this.getContainerId() != null
&& !((ContainerIdPBImpl) this.containerId).getProto().equals(
builder.getContainerId())) {
builder.setContainerId(convertToProtoFormat(this.containerId));
}
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AMInfoProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized ApplicationAttemptId getAppAttemptId() {
AMInfoProtoOrBuilder p = viaProto ? proto : builder;
if (appAttemptId != null) {
return appAttemptId;
} // Else via proto
if (!p.hasApplicationAttemptId()) {
return null;
}
appAttemptId = convertFromProtoFormat(p.getApplicationAttemptId());
return appAttemptId;
}
@Override
public synchronized void setAppAttemptId(ApplicationAttemptId appAttemptId) {
maybeInitBuilder();
if (appAttemptId == null) {
builder.clearApplicationAttemptId();
}
this.appAttemptId = appAttemptId;
}
@Override
public synchronized long getStartTime() {
AMInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.getStartTime());
}
@Override
public synchronized void setStartTime(long startTime) {
maybeInitBuilder();
builder.setStartTime(startTime);
}
@Override
public synchronized ContainerId getContainerId() {
AMInfoProtoOrBuilder p = viaProto ? proto : builder;
if (containerId != null) {
return containerId;
} // Else via proto
if (!p.hasContainerId()) {
return null;
}
containerId = convertFromProtoFormat(p.getContainerId());
return containerId;
}
@Override
public synchronized void setContainerId(ContainerId containerId) {
maybeInitBuilder();
if (containerId == null) {
builder.clearContainerId();
}
this.containerId = containerId;
}
@Override
public synchronized String getNodeManagerHost() {
AMInfoProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeManagerHost()) {
return null;
}
return p.getNodeManagerHost();
}
@Override
public synchronized void setNodeManagerHost(String nmHost) {
maybeInitBuilder();
if (nmHost == null) {
builder.clearNodeManagerHost();
return;
}
builder.setNodeManagerHost(nmHost);
}
@Override
public synchronized int getNodeManagerPort() {
AMInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.getNodeManagerPort());
}
@Override
public synchronized void setNodeManagerPort(int nmPort) {
maybeInitBuilder();
builder.setNodeManagerPort(nmPort);
}
@Override
public synchronized int getNodeManagerHttpPort() {
AMInfoProtoOrBuilder p = viaProto ? proto : builder;
return p.getNodeManagerHttpPort();
}
@Override
public synchronized void setNodeManagerHttpPort(int httpPort) {
maybeInitBuilder();
builder.setNodeManagerHttpPort(httpPort);
}
private ApplicationAttemptIdPBImpl convertFromProtoFormat(
ApplicationAttemptIdProto p) {
return new ApplicationAttemptIdPBImpl(p);
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private
ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl) t).getProto();
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl) t).getProto();
}
}

View File

@ -19,9 +19,14 @@
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder;
@ -31,12 +36,14 @@ import org.apache.hadoop.yarn.api.records.ProtoBase;
public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobReport {
public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
JobReport {
JobReportProto proto = JobReportProto.getDefaultInstance();
JobReportProto.Builder builder = null;
boolean viaProto = false;
private JobId jobId = null;
private List<AMInfo> amInfos = null;
public JobReportPBImpl() {
@ -48,20 +55,23 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
viaProto = true;
}
public JobReportProto getProto() {
public synchronized JobReportProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
private synchronized void mergeLocalToBuilder() {
if (this.jobId != null) {
builder.setJobId(convertToProtoFormat(this.jobId));
}
if (this.amInfos != null) {
addAMInfosToProto();
}
}
private void mergeLocalToProto() {
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@ -69,7 +79,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
viaProto = true;
}
private void maybeInitBuilder() {
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = JobReportProto.newBuilder(proto);
}
@ -78,7 +88,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
@Override
public JobId getJobId() {
public synchronized JobId getJobId() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
if (this.jobId != null) {
return this.jobId;
@ -91,14 +101,14 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
}
@Override
public void setJobId(JobId jobId) {
public synchronized void setJobId(JobId jobId) {
maybeInitBuilder();
if (jobId == null)
builder.clearJobId();
this.jobId = jobId;
}
@Override
public JobState getJobState() {
public synchronized JobState getJobState() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasJobState()) {
return null;
@ -107,7 +117,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
}
@Override
public void setJobState(JobState jobState) {
public synchronized void setJobState(JobState jobState) {
maybeInitBuilder();
if (jobState == null) {
builder.clearJobState();
@ -116,132 +126,197 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
builder.setJobState(convertToProtoFormat(jobState));
}
@Override
public float getMapProgress() {
public synchronized float getMapProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getMapProgress());
}
@Override
public void setMapProgress(float mapProgress) {
public synchronized void setMapProgress(float mapProgress) {
maybeInitBuilder();
builder.setMapProgress((mapProgress));
}
@Override
public float getReduceProgress() {
public synchronized float getReduceProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReduceProgress());
}
@Override
public void setReduceProgress(float reduceProgress) {
public synchronized void setReduceProgress(float reduceProgress) {
maybeInitBuilder();
builder.setReduceProgress((reduceProgress));
}
@Override
public float getCleanupProgress() {
public synchronized float getCleanupProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getCleanupProgress());
}
@Override
public void setCleanupProgress(float cleanupProgress) {
public synchronized void setCleanupProgress(float cleanupProgress) {
maybeInitBuilder();
builder.setCleanupProgress((cleanupProgress));
}
@Override
public float getSetupProgress() {
public synchronized float getSetupProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getSetupProgress());
}
@Override
public void setSetupProgress(float setupProgress) {
public synchronized void setSetupProgress(float setupProgress) {
maybeInitBuilder();
builder.setSetupProgress((setupProgress));
}
@Override
public long getStartTime() {
public synchronized long getSubmitTime() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getSubmitTime());
}
@Override
public synchronized void setSubmitTime(long submitTime) {
maybeInitBuilder();
builder.setSubmitTime((submitTime));
}
@Override
public synchronized long getStartTime() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getStartTime());
}
@Override
public void setStartTime(long startTime) {
public synchronized void setStartTime(long startTime) {
maybeInitBuilder();
builder.setStartTime((startTime));
}
@Override
public long getFinishTime() {
public synchronized long getFinishTime() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getFinishTime());
}
@Override
public void setFinishTime(long finishTime) {
public synchronized void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime((finishTime));
}
@Override
public String getUser() {
public synchronized String getUser() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getUser());
}
@Override
public void setUser(String user) {
public synchronized void setUser(String user) {
maybeInitBuilder();
builder.setUser((user));
}
@Override
public String getJobName() {
public synchronized String getJobName() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getJobName());
}
@Override
public void setJobName(String jobName) {
public synchronized void setJobName(String jobName) {
maybeInitBuilder();
builder.setJobName((jobName));
}
@Override
public String getTrackingUrl() {
public synchronized String getTrackingUrl() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getTrackingUrl());
}
@Override
public void setTrackingUrl(String trackingUrl) {
public synchronized void setTrackingUrl(String trackingUrl) {
maybeInitBuilder();
builder.setTrackingUrl(trackingUrl);
}
@Override
public String getDiagnostics() {
public synchronized String getDiagnostics() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getDiagnostics();
}
@Override
public void setDiagnostics(String diagnostics) {
public synchronized void setDiagnostics(String diagnostics) {
maybeInitBuilder();
builder.setDiagnostics(diagnostics);
}
@Override
public String getJobFile() {
public synchronized String getJobFile() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getJobFile();
}
@Override
public void setJobFile(String jobFile) {
public synchronized void setJobFile(String jobFile) {
maybeInitBuilder();
builder.setJobFile(jobFile);
}
@Override
public synchronized List<AMInfo> getAMInfos() {
initAMInfos();
return this.amInfos;
}
@Override
public synchronized void setAMInfos(List<AMInfo> amInfos) {
maybeInitBuilder();
if (amInfos == null) {
this.builder.clearAmInfos();
this.amInfos = null;
return;
}
initAMInfos();
this.amInfos.clear();
this.amInfos.addAll(amInfos);
}
private synchronized void initAMInfos() {
if (this.amInfos != null) {
return;
}
JobReportProtoOrBuilder p = viaProto ? proto : builder;
List<AMInfoProto> list = p.getAmInfosList();
this.amInfos = new ArrayList<AMInfo>();
for (AMInfoProto amInfoProto : list) {
this.amInfos.add(convertFromProtoFormat(amInfoProto));
}
}
private synchronized void addAMInfosToProto() {
maybeInitBuilder();
builder.clearAmInfos();
if (this.amInfos == null)
return;
for (AMInfo amInfo : this.amInfos) {
builder.addAmInfos(convertToProtoFormat(amInfo));
}
}
private AMInfoPBImpl convertFromProtoFormat(AMInfoProto p) {
return new AMInfoPBImpl(p);
}
private AMInfoProto convertToProtoFormat(AMInfo t) {
return ((AMInfoPBImpl)t).getProto();
}
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p);
}
@ -257,7 +332,4 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
private JobState convertFromProtoFormat(JobStateProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
}

View File

@ -31,7 +31,10 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptReportProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptReportProtoOrBuilder;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptStateProto;
import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
@ -42,6 +45,7 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
private TaskAttemptId taskAttemptId = null;
private Counters counters = null;
private ContainerId containerId = null;
public TaskAttemptReportPBImpl() {
@ -67,6 +71,9 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
if (this.counters != null) {
builder.setCounters(convertToProtoFormat(this.counters));
}
if (this.containerId != null) {
builder.setContainerId(convertToProtoFormat(this.containerId));
}
}
private void mergeLocalToProto() {
@ -255,7 +262,80 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
}
builder.setPhase(convertToProtoFormat(phase));
}
@Override
public String getNodeManagerHost() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeManagerHost()) {
return null;
}
return p.getNodeManagerHost();
}
@Override
public void setNodeManagerHost(String nmHost) {
maybeInitBuilder();
if (nmHost == null) {
builder.clearNodeManagerHost();
return;
}
builder.setNodeManagerHost(nmHost);
}
@Override
public int getNodeManagerPort() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getNodeManagerPort());
}
@Override
public void setNodeManagerPort(int nmPort) {
maybeInitBuilder();
builder.setNodeManagerPort(nmPort);
}
@Override
public int getNodeManagerHttpPort() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getNodeManagerHttpPort());
}
@Override
public void setNodeManagerHttpPort(int nmHttpPort) {
maybeInitBuilder();
builder.setNodeManagerHttpPort(nmHttpPort);
}
@Override
public ContainerId getContainerId() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
if (containerId != null) {
return containerId;
} // Else via proto
if (!p.hasContainerId()) {
return null;
}
containerId = convertFromProtoFormat(p.getContainerId());
return containerId;
}
@Override
public void setContainerId(ContainerId containerId) {
maybeInitBuilder();
if (containerId == null) {
builder.clearContainerId();
}
this.containerId = containerId;
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private CountersPBImpl convertFromProtoFormat(CountersProto p) {
return new CountersPBImpl(p);
}

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.mapreduce.v2.util;
import java.util.List;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.Records;
public class MRBuilderUtils {
@ -53,14 +58,15 @@ public class MRBuilderUtils {
}
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long startTime, long finishTime,
String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile) {
float cleanupProgress, String jobFile, List<AMInfo> amInfos) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
report.setUser(userName);
report.setJobState(state);
report.setSubmitTime(submitTime);
report.setStartTime(startTime);
report.setFinishTime(finishTime);
report.setSetupProgress(setupProgress);
@ -68,6 +74,20 @@ public class MRBuilderUtils {
report.setMapProgress(mapProgress);
report.setReduceProgress(reduceProgress);
report.setJobFile(jobFile);
report.setAMInfos(amInfos);
return report;
}
public static AMInfo newAMInfo(ApplicationAttemptId appAttemptId,
long startTime, ContainerId containerId, String nmHost, int nmPort,
int nmHttpPort) {
AMInfo amInfo = Records.newRecord(AMInfo.class);
amInfo.setAppAttemptId(appAttemptId);
amInfo.setStartTime(startTime);
amInfo.setContainerId(containerId);
amInfo.setNodeManagerHost(nmHost);
amInfo.setNodeManagerPort(nmPort);
amInfo.setNodeManagerHttpPort(nmHttpPort);
return amInfo;
}
}

View File

@ -119,6 +119,10 @@ message TaskAttemptReportProto {
optional PhaseProto phase = 9;
optional int64 shuffle_finish_time = 10;
optional int64 sort_finish_time=11;
optional string node_manager_host = 12;
optional int32 node_manager_port = 13;
optional int32 node_manager_http_port = 14;
optional ContainerIdProto container_id = 15;
}
enum JobStateProto {
@ -146,6 +150,17 @@ message JobReportProto {
optional string trackingUrl = 11;
optional string diagnostics = 12;
optional string jobFile = 13;
repeated AMInfoProto am_infos = 14;
optional int64 submit_time = 15;
}
message AMInfoProto {
optional ApplicationAttemptIdProto application_attempt_id = 1;
optional int64 start_time = 2;
optional ContainerIdProto container_id = 3;
optional string node_manager_host = 4;
optional int32 node_manager_port = 5;
optional int32 node_manager_http_port = 6;
}
enum TaskAttemptCompletionEventStatusProto {

View File

@ -34,6 +34,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -212,7 +213,20 @@ public class Cluster {
throws IOException, InterruptedException {
return client.getQueue(name);
}
/**
* Get log parameters for the specified jobID or taskAttemptID
* @param jobID the job id.
* @param taskAttemptID the task attempt id. Optional.
* @return the LogParams
* @throws IOException
* @throws InterruptedException
*/
public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
throws IOException, InterruptedException {
return client.getLogFileParams(jobID, taskAttemptID);
}
/**
* Get current cluster status.
*

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -115,6 +116,8 @@ public interface ClientProtocol extends VersionedProtocol {
* MAPREDUCE-2337.
* Version 37: More efficient serialization format for framework counters
* (MAPREDUCE-901)
* Version 38: Added getLogFilePath(JobID, TaskAttemptID) as part of
* MAPREDUCE-3146
*/
public static final long versionID = 37L;
@ -351,4 +354,16 @@ public interface ClientProtocol extends VersionedProtocol {
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
) throws IOException,
InterruptedException;
/**
* Gets the location of the log file for a job if no taskAttemptId is
* specified, otherwise gets the log location for the taskAttemptId.
* @param jobID the jobId.
* @param taskAttemptID the taskAttemptId.
* @return log params.
* @throws IOException
* @throws InterruptedException
*/
public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
throws IOException, InterruptedException;
}

View File

@ -42,9 +42,11 @@ import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper;
/**
* Interprets the map reduce cli options
@ -95,6 +97,7 @@ public class CLI extends Configured implements Tool {
boolean killTask = false;
boolean failTask = false;
boolean setJobPriority = false;
boolean logs = false;
if ("-submit".equals(cmd)) {
if (argv.length != 2) {
@ -205,6 +208,19 @@ public class CLI extends Configured implements Tool {
taskType = argv[2];
taskState = argv[3];
displayTasks = true;
} else if ("-logs".equals(cmd)) {
if (argv.length == 2 || argv.length ==3) {
logs = true;
jobid = argv[1];
if (argv.length == 3) {
taskid = argv[2];
} else {
taskid = null;
}
} else {
displayUsage(cmd);
return exitCode;
}
} else {
displayUsage(cmd);
return exitCode;
@ -313,6 +329,22 @@ public class CLI extends Configured implements Tool {
System.out.println("Could not fail task " + taskid);
exitCode = -1;
}
} else if (logs) {
try {
JobID jobID = JobID.forName(jobid);
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
LogDumper logDumper = new LogDumper();
logDumper.setConf(getConf());
logDumper.dumpAContainersLogs(logParams.getApplicationId(),
logParams.getContainerId(), logParams.getNodeId(),
logParams.getOwner());
} catch (IOException e) {
if (e instanceof RemoteException) {
throw e;
}
System.out.println(e.getMessage());
}
}
} catch (RemoteException re) {
IOException unwrappedException = re.unwrapRemoteException();
@ -380,6 +412,10 @@ public class CLI extends Configured implements Tool {
" <job-id> <task-type> <task-state>]. " +
"Valid values for <task-type> are " + taskTypes + ". " +
"Valid values for <task-state> are " + taskStates);
} else if ("-logs".equals(cmd)) {
System.err.println(prefix + "[" + cmd +
" <job-id> <task-attempt-id>]. " +
" <task-attempt-id> is optional to get task attempt logs.");
} else {
System.err.printf(prefix + "<command> <args>\n");
System.err.printf("\t[-submit <job-file>]\n");
@ -398,7 +434,8 @@ public class CLI extends Configured implements Tool {
"Valid values for <task-type> are " + taskTypes + ". " +
"Valid values for <task-state> are " + taskStates);
System.err.printf("\t[-kill-task <task-attempt-id>]\n");
System.err.printf("\t[-fail-task <task-attempt-id>]\n\n");
System.err.printf("\t[-fail-task <task-attempt-id>]\n");
System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n");
ToolRunner.printGenericCommandUsage(System.out);
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
public class LogParams {
private String containerId;
private String applicationId;
private String nodeId;
private String owner;
public LogParams(String containerIdStr, String applicationIdStr,
String nodeIdStr, String owner) {
this.containerId = containerIdStr;
this.applicationId = applicationIdStr;
this.nodeId = nodeIdStr;
this.owner = owner;
}
public String getContainerId() {
return containerId;
}
public void setContainerId(String containerId) {
this.containerId = containerId;
}
public String getApplicationId() {
return applicationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getOwner() {
return this.owner;
}
public String setOwner(String owner) {
return this.owner;
}
}

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.YarnException;
@ -94,6 +95,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
JobReport.class);
report.setJobId(jobId);
report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
report.setSubmitTime(jobInfo.getSubmitTime());
report.setStartTime(jobInfo.getLaunchTime());
report.setFinishTime(jobInfo.getFinishTime());
report.setJobName(jobInfo.getJobname());
@ -103,6 +105,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setJobFile(confFile.toString());
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
.toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
report.setAMInfos(getAMInfos());
}
@Override
@ -341,6 +344,17 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
@Override
public List<AMInfo> getAMInfos() {
return jobInfo.getAMInfos();
List<AMInfo> amInfos = new LinkedList<AMInfo>();
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
.getAMInfos()) {
AMInfo amInfo =
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
return amInfos;
}
}

View File

@ -79,6 +79,15 @@ public class CompletedTaskAttempt implements TaskAttempt {
// report.setPhase(attemptInfo.get); //TODO
report.setStateString(attemptInfo.getState());
report.setCounters(getCounters());
report.setContainerId(attemptInfo.getContainerId());
String []hostSplits = attemptInfo.getHostname().split(":");
if (hostSplits.length != 2) {
report.setNodeManagerHost("UNKNOWN");
} else {
report.setNodeManagerHost(hostSplits[0]);
report.setNodeManagerPort(Integer.parseInt(hostSplits[1]));
}
report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
}
@Override

View File

@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;

View File

@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;

View File

@ -112,11 +112,11 @@ public class TestJobHistoryParsing {
// Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size());
Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0)
Assert.assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0)
.getNodeManagerHost());
AMInfo amInfo = jobInfo.getAMInfos().get(0);
Assert.assertEquals(2222, amInfo.getNodeManagerPort());
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());

View File

@ -218,7 +218,8 @@ public class TestHSWebApp {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME, BuilderUtils.newNodeId("testhost", 2222).toString());
params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");
@ -229,7 +230,8 @@ public class TestHSWebApp {
verify(spyPw).write(
"Logs not available for container_10_0001_01_000001. Aggregation "
+ "may not be complete,"
+ " Check back later or try the nodemanager on testhost:2222");
+ " Check back later or try the nodemanager on "
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
}
}

View File

@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@ -37,6 +38,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -47,13 +49,17 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -68,6 +74,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.util.BuilderUtils;
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@ -398,5 +405,52 @@ public class ClientServiceDelegate {
return true;
}
public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throws YarnRemoteException, IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
}
JobReport report =
((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
JobState.ERROR).contains(report.getJobState())) {
if (oldTaskAttemptID != null) {
GetTaskAttemptReportRequest taRequest =
recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
TaskAttemptReport taReport =
((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
GetTaskAttemptReportRequest.class, taRequest))
.getTaskAttemptReport();
if (taReport.getContainerId() == null
|| taReport.getNodeManagerHost() == null) {
throw new IOException("Unable to get log information for task: "
+ oldTaskAttemptID);
}
return new LogParams(
taReport.getContainerId().toString(),
taReport.getContainerId().getApplicationAttemptId()
.getApplicationId().toString(),
BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
taReport.getNodeManagerPort()).toString(), report.getUser());
} else {
if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
throw new IOException("Unable to get log information for job: "
+ oldJobID);
}
AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
return new LogParams(
amInfo.getContainerId().toString(),
amInfo.getAppAttemptId().getApplicationId().toString(),
BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
amInfo.getNodeManagerPort()).toString(), report.getUser());
}
} else {
throw new IOException("Cannot get log path for a in-progress job");
}
}
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
@ -504,4 +505,10 @@ public class YARNRunner implements ClientProtocol {
return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
clientMethodsHash);
}
@Override
public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
throws IOException {
return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
}
}

View File

@ -168,7 +168,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
// First AM returns a report with jobName firstGen and simulates AM shutdown
// on second invocation.
@ -180,7 +180,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
when(jobReportResponse2.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
// Second AM generation returns a report with jobName secondGen
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);

View File

@ -20,13 +20,13 @@ package org.apache.hadoop.mapreduce.v2;
import java.io.File;
import java.io.IOException;
import java.util.List;
import junit.framework.Assert;
import org.apache.avro.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.FailingMapper;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -35,8 +35,20 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
@ -105,6 +117,8 @@ public class TestMRJobsWithHistoryService {
return;
}
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(mrCluster.getConfig());
// Job with 3 maps and 2 reduces
@ -113,7 +127,8 @@ public class TestMRJobsWithHistoryService {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
Counters counterMR = job.getCounters();
ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId();
JobId jobId = TypeConverter.toYarn(job.getJobID());
ApplicationId appID = jobId.getAppId();
while (true) {
Thread.sleep(1000);
if (mrCluster.getResourceManager().getRMContext().getRMApps()
@ -126,6 +141,36 @@ public class TestMRJobsWithHistoryService {
LOG.info("CounterHS " + counterHS);
LOG.info("CounterMR " + counterMR);
Assert.assertEquals(counterHS, counterMR);
MRClientProtocol historyClient = instantiateHistoryProxy();
GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
gjReq.setJobId(jobId);
JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
verifyJobReport(jobReport, jobId);
}
private void verifyJobReport(JobReport jobReport, JobId jobId) {
List<AMInfo> amInfos = jobReport.getAMInfos();
Assert.assertEquals(1, amInfos.size());
AMInfo amInfo = amInfos.get(0);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(jobId.getAppId(), 1);
ContainerId amContainerId = BuilderUtils.newContainerId(appAttemptId, 1);
Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId());
Assert.assertEquals(amContainerId, amInfo.getContainerId());
Assert.assertTrue(jobReport.getSubmitTime() > 0);
Assert.assertTrue(jobReport.getStartTime() > 0
&& jobReport.getStartTime() >= jobReport.getSubmitTime());
Assert.assertTrue(jobReport.getFinishTime() > 0
&& jobReport.getFinishTime() >= jobReport.getStartTime());
}
private MRClientProtocol instantiateHistoryProxy() {
final String serviceAddr =
mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol historyClient =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
return historyClient;
}
}

View File

@ -126,6 +126,14 @@ public class ConverterUtils {
return appAttemptId;
}
private static ApplicationId toApplicationId(
Iterator<String> it) throws NumberFormatException {
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(Long.parseLong(it.next()));
appId.setId(Integer.parseInt(it.next()));
return appId;
}
public static String toString(ContainerId cId) {
return cId.toString();
}
@ -178,4 +186,18 @@ public class ConverterUtils {
}
}
public static ApplicationId toApplicationId(
String appIdStr) {
Iterator<String> it = _split(appIdStr).iterator();
if (!it.next().equals(APPLICATION_PREFIX)) {
throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ appIdStr);
}
try {
return toApplicationId(it);
} catch (NumberFormatException n) {
throw new IllegalArgumentException("Invalid AppAttemptId: "
+ appIdStr, n);
}
}
}

View File

@ -140,7 +140,7 @@ public class LogAggregationService extends AbstractService implements
}
super.stop();
}
/**
* Constructs the full filename for an application's log file per node.
* @param remoteRootLogDir

View File

@ -50,6 +50,7 @@ public class LogDumper extends Configured implements Tool {
private static final String CONTAINER_ID_OPTION = "containerId";
private static final String APPLICATION_ID_OPTION = "applicationId";
private static final String NODE_ADDRESS_OPTION = "nodeAddress";
private static final String APP_OWNER_OPTION = "appOwner";
@Override
public int run(String[] args) throws Exception {
@ -58,6 +59,7 @@ public class LogDumper extends Configured implements Tool {
opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
opts.addOption(APP_OWNER_OPTION, true, "AppOwner");
if (args.length < 1) {
HelpFormatter formatter = new HelpFormatter();
@ -69,11 +71,13 @@ public class LogDumper extends Configured implements Tool {
String appIdStr = null;
String containerIdStr = null;
String nodeAddress = null;
String appOwner = null;
try {
CommandLine commandLine = parser.parse(opts, args, true);
appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
containerIdStr = commandLine.getOptionValue(CONTAINER_ID_OPTION);
nodeAddress = commandLine.getOptionValue(NODE_ADDRESS_OPTION);
appOwner = commandLine.getOptionValue(APP_OWNER_OPTION);
} catch (ParseException e) {
System.out.println("options parsing failed: " + e.getMessage());
@ -96,8 +100,11 @@ public class LogDumper extends Configured implements Tool {
DataOutputStream out = new DataOutputStream(System.out);
if (appOwner == null || appOwner.isEmpty()) {
appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
}
if (containerIdStr == null && nodeAddress == null) {
dumpAllContainersLogs(appId, out);
dumpAllContainersLogs(appId, appOwner, out);
} else if ((containerIdStr == null && nodeAddress != null)
|| (containerIdStr != null && nodeAddress == null)) {
System.out.println("ContainerId or NodeAddress cannot be null!");
@ -113,7 +120,7 @@ public class LogDumper extends Configured implements Tool {
LogAggregationService.getRemoteNodeLogFileForApp(
remoteRootLogDir,
appId,
UserGroupInformation.getCurrentUser().getShortUserName(),
appOwner,
ConverterUtils.toNodeId(nodeAddress),
getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)));
@ -123,6 +130,21 @@ public class LogDumper extends Configured implements Tool {
return 0;
}
public void dumpAContainersLogs(String appId, String containerId,
String nodeId, String jobOwner) throws IOException {
Path remoteRootLogDir =
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationService.getRemoteNodeLogDirSuffix(getConf());
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
LogAggregationService.getRemoteNodeLogFileForApp(remoteRootLogDir,
ConverterUtils.toApplicationId(appId), jobOwner,
ConverterUtils.toNodeId(nodeId), suffix));
DataOutputStream out = new DataOutputStream(System.out);
dumpAContainerLogs(containerId, reader, out);
}
private int dumpAContainerLogs(String containerIdStr,
AggregatedLogFormat.LogReader reader, DataOutputStream out)
throws IOException {
@ -152,13 +174,12 @@ public class LogDumper extends Configured implements Tool {
return 0;
}
private void
dumpAllContainersLogs(ApplicationId appId, DataOutputStream out)
throws IOException {
private void dumpAllContainersLogs(ApplicationId appId, String appOwner,
DataOutputStream out) throws IOException {
Path remoteRootLogDir =
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
String user = appOwner;
String logDirSuffix =
getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);

View File

@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@ -4829,6 +4830,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
return secretManager.renewToken(token, user);
}
@Override
public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not supported by JobTracker");
}
JobACLsManager getJobACLsManager() {
return aclsManager.getJobACLsManager();
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
@ -812,4 +813,11 @@ public class LocalJobRunner implements ClientProtocol {
) throws IOException,InterruptedException{
return 0;
}
@Override
public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not supported");
}
}