diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c00919a4e03..05ff4cf45cc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -175,6 +175,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM + immediately after downloading a resource instead of always waiting for a + second. (Siddarth Seth via vinodkv) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 4e03fa2a5a1..cb1bfd1ab91 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -31,7 +31,9 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -163,7 +165,8 @@ public class ContainerLocalizer { ExecutorService exec = null; try { exec = createDownloadThreadPool(); - localizeFiles(nodeManager, exec, ugi); + CompletionService ecs = createCompletionService(exec); + localizeFiles(nodeManager, ecs, ugi); return 0; } catch (Throwable e) { // Print traces to stdout so that they can be logged by the NM address @@ -182,6 +185,10 @@ public class ContainerLocalizer { .setNameFormat("ContainerLocalizer Downloader").build()); } + CompletionService createCompletionService(ExecutorService exec) { + return new ExecutorCompletionService(exec); + } + Callable download(LocalDirAllocator lda, LocalResource rsrc, UserGroupInformation ugi) throws IOException { Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf); @@ -206,7 +213,8 @@ public class ContainerLocalizer { } private void localizeFiles(LocalizationProtocol nodemanager, - ExecutorService exec, UserGroupInformation ugi) throws IOException { + CompletionService cs, UserGroupInformation ugi) + throws IOException { while (true) { try { LocalizerStatus status = createStatus(); @@ -231,7 +239,7 @@ public class ContainerLocalizer { break; } // TODO: Synchronization?? - pendingResources.put(r, exec.submit(download(lda, r, ugi))); + pendingResources.put(r, cs.submit(download(lda, r, ugi))); } } break; @@ -247,8 +255,7 @@ public class ContainerLocalizer { } catch (YarnRemoteException e) { } return; } - // TODO HB immediately when rsrc localized - sleep(1); + cs.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { return; } catch (YarnRemoteException e) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index e4b68ffbc64..32a3367e9aa 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -146,7 +147,8 @@ public class TestContainerLocalizer { // return result instantly for deterministic test ExecutorService syncExec = mock(ExecutorService.class); - when(syncExec.submit(isA(Callable.class))) + CompletionService cs = mock(CompletionService.class); + when(cs.submit(isA(Callable.class))) .thenAnswer(new Answer>() { @Override public Future answer(InvocationOnMock invoc) @@ -159,6 +161,7 @@ public class TestContainerLocalizer { } }); doReturn(syncExec).when(localizer).createDownloadThreadPool(); + doReturn(cs).when(localizer).createCompletionService(syncExec); // run localization assertEquals(0, localizer.runLocalization(nmAddr));