MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times into the container request table when multiple hosts for a split happen to be on the same rack. Contributed by Siddarth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241734 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-02-08 01:56:02 +00:00
parent fa7294e5c0
commit 7ae94c6678
3 changed files with 43 additions and 5 deletions

View File

@ -771,6 +771,10 @@ Release 0.23.1 - Unreleased
the same FS scheme, instead of randomly using one. (Mahadev Konar via
sseth)
MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times
into the container request table when multiple hosts for a split happen to
be on the same rack. (Siddarth Seth via vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -27,9 +27,11 @@
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -1079,14 +1081,14 @@ public void transition(TaskAttemptImpl taskAttempt,
taskAttempt.attemptId,
taskAttempt.resourceCapability));
} else {
int i = 0;
String[] racks = new String[taskAttempt.dataLocalHosts.length];
Set<String> racks = new HashSet<String>();
for (String host : taskAttempt.dataLocalHosts) {
racks[i++] = RackResolver.resolve(host).getNetworkLocation();
racks.add(RackResolver.resolve(host).getNetworkLocation());
}
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
.resolveHosts(taskAttempt.dataLocalHosts), racks));
.resolveHosts(taskAttempt.dataLocalHosts), racks
.toArray(new String[racks.size()])));
}
}
}

View File

@ -82,6 +82,38 @@ public void testMRAppHistoryForReduce() throws Exception {
testMRAppHistory(app);
}
@SuppressWarnings("rawtypes")
@Test
public void testSingleRackRequest() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
new TaskAttemptImpl.RequestContainerTransition(false);
EventHandler eventHandler = mock(EventHandler.class);
String[] hosts = new String[3];
hosts[0] = "host1";
hosts[1] = "host2";
hosts[2] = "host3";
TaskSplitMetaInfo splitInfo =
new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
TaskAttemptImpl mockTaskAttempt =
createMapTaskAttemptImplForTest(eventHandler, splitInfo);
TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
rct.transition(mockTaskAttempt, mockTAEvent);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(2)).handle(arg.capture());
if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
Assert.fail("Second Event not of type ContainerRequestEvent");
}
ContainerRequestEvent cre =
(ContainerRequestEvent) arg.getAllValues().get(1);
String[] requestedRacks = cre.getRacks();
//Only a single occurance of /DefaultRack
assertEquals(1, requestedRacks.length);
}
@SuppressWarnings("rawtypes")
@Test
public void testHostResolveAttempt() throws Exception {