merge -r 1309439:1309440 from trunk. FIXES: MAPREDUCE-3682
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1309446 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
03debdda38
commit
378ed9b673
|
@ -146,6 +146,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-3672. Killed maps shouldn't be counted towards
|
||||
JobCounter.NUM_FAILED_MAPS. (Anupam Seth via tgraves)
|
||||
|
||||
MAPREDUCE-3682 Tracker URL says AM tasks run on localhost.
|
||||
(Ravi Prakash via tgraves)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -643,7 +643,8 @@ public class MRAppMaster extends CompositeService {
|
|||
public synchronized void start() {
|
||||
if (job.isUber()) {
|
||||
this.containerAllocator = new LocalContainerAllocator(
|
||||
this.clientService, this.context);
|
||||
this.clientService, this.context, nmHost, nmPort, nmHttpPort
|
||||
, containerID);
|
||||
} else {
|
||||
this.containerAllocator = new RMContainerAllocator(
|
||||
this.clientService, this.context);
|
||||
|
|
|
@ -65,14 +65,23 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
private AtomicInteger containerCount = new AtomicInteger();
|
||||
private long retryInterval;
|
||||
private long retrystartTime;
|
||||
private String nmHost;
|
||||
private int nmPort;
|
||||
private int nmHttpPort;
|
||||
private ContainerId containerId;
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
public LocalContainerAllocator(ClientService clientService,
|
||||
AppContext context) {
|
||||
AppContext context, String nmHost, int nmPort, int nmHttpPort
|
||||
, ContainerId cId) {
|
||||
super(clientService, context);
|
||||
this.eventHandler = context.getEventHandler();
|
||||
this.nmHost = nmHost;
|
||||
this.nmPort = nmPort;
|
||||
this.nmHttpPort = nmHttpPort;
|
||||
this.containerId = cId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,17 +140,17 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
LOG.info("Processing the event " + event.toString());
|
||||
ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
|
||||
cID.setApplicationAttemptId(applicationAttemptId);
|
||||
// use negative ids to denote that these are local. Need a better way ??
|
||||
cID.setId((-1) * containerCount.getAndIncrement());
|
||||
// Assign the same container ID as the AM
|
||||
cID.setId(this.containerId.getId());
|
||||
|
||||
Container container = recordFactory.newRecordInstance(Container.class);
|
||||
container.setId(cID);
|
||||
NodeId nodeId = Records.newRecord(NodeId.class);
|
||||
nodeId.setHost("localhost");
|
||||
nodeId.setPort(1234);
|
||||
nodeId.setHost(this.nmHost);
|
||||
nodeId.setPort(this.nmPort);
|
||||
container.setNodeId(nodeId);
|
||||
container.setContainerToken(null);
|
||||
container.setNodeHttpAddress("localhost:8042");
|
||||
container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);
|
||||
// send the container-assigned event to task attempt
|
||||
|
||||
if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
|
||||
|
|
Loading…
Reference in New Issue