From 37d68c060fcad36ebbbd96d18bb84f33cd53cf30 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 27 Dec 2011 18:27:51 +0000 Subject: [PATCH] MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM immediately after downloading a resource instead of always waiting for a second. Contributed by Siddarth Seth. svn merge -c 1224970 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1224971 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 ++++ .../localizer/ContainerLocalizer.java | 17 ++++++++++++----- .../localizer/TestContainerLocalizer.java | 5 ++++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a7eda92f876..55bfdefcb9a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -92,6 +92,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM + immediately after downloading a resource instead of always waiting for a + second. (Siddarth Seth via vinodkv) + BUG FIXES MAPREDUCE-2950. [Rumen] Fixed TestUserResolve. (Ravi Gummadi via amarrk) diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 4e03fa2a5a1..cb1bfd1ab91 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -31,7 +31,9 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -163,7 +165,8 @@ public LocalizationProtocol run() { ExecutorService exec = null; try { exec = createDownloadThreadPool(); - localizeFiles(nodeManager, exec, ugi); + CompletionService ecs = createCompletionService(exec); + localizeFiles(nodeManager, ecs, ugi); return 0; } catch (Throwable e) { // Print traces to stdout so that they can be logged by the NM address @@ -182,6 +185,10 @@ ExecutorService createDownloadThreadPool() { .setNameFormat("ContainerLocalizer Downloader").build()); } + CompletionService createCompletionService(ExecutorService exec) { + return new ExecutorCompletionService(exec); + } + Callable download(LocalDirAllocator lda, LocalResource rsrc, UserGroupInformation ugi) throws IOException { Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf); @@ -206,7 +213,8 @@ void sleep(int duration) throws InterruptedException { } private void localizeFiles(LocalizationProtocol nodemanager, - ExecutorService exec, UserGroupInformation ugi) throws IOException { + CompletionService cs, UserGroupInformation ugi) + throws IOException { while (true) { try { LocalizerStatus status = createStatus(); @@ -231,7 +239,7 @@ private void localizeFiles(LocalizationProtocol nodemanager, break; } // TODO: Synchronization?? - pendingResources.put(r, exec.submit(download(lda, r, ugi))); + pendingResources.put(r, cs.submit(download(lda, r, ugi))); } } break; @@ -247,8 +255,7 @@ private void localizeFiles(LocalizationProtocol nodemanager, } catch (YarnRemoteException e) { } return; } - // TODO HB immediately when rsrc localized - sleep(1); + cs.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { return; } catch (YarnRemoteException e) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index e4b68ffbc64..32a3367e9aa 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -146,7 +147,8 @@ public void testContainerLocalizerMain() throws Exception { // return result instantly for deterministic test ExecutorService syncExec = mock(ExecutorService.class); - when(syncExec.submit(isA(Callable.class))) + CompletionService cs = mock(CompletionService.class); + when(cs.submit(isA(Callable.class))) .thenAnswer(new Answer>() { @Override public Future answer(InvocationOnMock invoc) @@ -159,6 +161,7 @@ public Future answer(InvocationOnMock invoc) } }); doReturn(syncExec).when(localizer).createDownloadThreadPool(); + doReturn(cs).when(localizer).createCompletionService(syncExec); // run localization assertEquals(0, localizer.runLocalization(nmAddr));