diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ca7d4e5932b..244e607c9e5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -771,6 +771,10 @@ Release 0.23.1 - Unreleased the same FS scheme, instead of randomly using one. (Mahadev Konar via sseth) + MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times + into the container request table when multiple hosts for a split happen to + be on the same rack. (Siddarth Seth via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 42a6f08e363..f4635e92b30 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -27,9 +27,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -1079,14 +1081,14 @@ public abstract class TaskAttemptImpl implements taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { - int i = 0; - String[] racks = new String[taskAttempt.dataLocalHosts.length]; + Set racks = new HashSet(); for (String host : taskAttempt.dataLocalHosts) { - racks[i++] = RackResolver.resolve(host).getNetworkLocation(); + racks.add(RackResolver.resolve(host).getNetworkLocation()); } taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt - .resolveHosts(taskAttempt.dataLocalHosts), racks)); + .resolveHosts(taskAttempt.dataLocalHosts), racks + .toArray(new String[racks.size()]))); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 68fa6ae9bf6..b5a54964d24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -81,7 +81,39 @@ public class TestTaskAttempt{ MRApp app = new FailingAttemptsMRApp(0, 1); testMRAppHistory(app); } - + + @SuppressWarnings("rawtypes") + @Test + public void testSingleRackRequest() throws Exception { + TaskAttemptImpl.RequestContainerTransition rct = + new TaskAttemptImpl.RequestContainerTransition(false); + + EventHandler eventHandler = mock(EventHandler.class); + String[] hosts = new String[3]; + hosts[0] = "host1"; + hosts[1] = "host2"; + hosts[2] = "host3"; + TaskSplitMetaInfo splitInfo = + new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); + + TaskAttemptImpl mockTaskAttempt = + createMapTaskAttemptImplForTest(eventHandler, splitInfo); + TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); + + rct.transition(mockTaskAttempt, mockTAEvent); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(arg.capture()); + if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) { + Assert.fail("Second Event not of type ContainerRequestEvent"); + } + ContainerRequestEvent cre = + (ContainerRequestEvent) arg.getAllValues().get(1); + String[] requestedRacks = cre.getRacks(); + //Only a single occurance of /DefaultRack + assertEquals(1, requestedRacks.length); + } + @SuppressWarnings("rawtypes") @Test public void testHostResolveAttempt() throws Exception {