MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM immediately after downloading a resource instead of always waiting for a second. Contributed by Siddarth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1224970 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-27 18:26:21 +00:00
parent ae0d48854d
commit cd90b82227
3 changed files with 20 additions and 6 deletions

View File

@ -175,6 +175,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
Vavilapalli via sseth) Vavilapalli via sseth)
MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM
immediately after downloading a resource instead of always waiting for a
second. (Siddarth Seth via vinodkv)
BUG FIXES BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -31,7 +31,9 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -163,7 +165,8 @@ public class ContainerLocalizer {
ExecutorService exec = null; ExecutorService exec = null;
try { try {
exec = createDownloadThreadPool(); exec = createDownloadThreadPool();
localizeFiles(nodeManager, exec, ugi); CompletionService<Path> ecs = createCompletionService(exec);
localizeFiles(nodeManager, ecs, ugi);
return 0; return 0;
} catch (Throwable e) { } catch (Throwable e) {
// Print traces to stdout so that they can be logged by the NM address // Print traces to stdout so that they can be logged by the NM address
@ -182,6 +185,10 @@ public class ContainerLocalizer {
.setNameFormat("ContainerLocalizer Downloader").build()); .setNameFormat("ContainerLocalizer Downloader").build());
} }
CompletionService<Path> createCompletionService(ExecutorService exec) {
return new ExecutorCompletionService<Path>(exec);
}
Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc, Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
UserGroupInformation ugi) throws IOException { UserGroupInformation ugi) throws IOException {
Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf); Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
@ -206,7 +213,8 @@ public class ContainerLocalizer {
} }
private void localizeFiles(LocalizationProtocol nodemanager, private void localizeFiles(LocalizationProtocol nodemanager,
ExecutorService exec, UserGroupInformation ugi) throws IOException { CompletionService<Path> cs, UserGroupInformation ugi)
throws IOException {
while (true) { while (true) {
try { try {
LocalizerStatus status = createStatus(); LocalizerStatus status = createStatus();
@ -231,7 +239,7 @@ public class ContainerLocalizer {
break; break;
} }
// TODO: Synchronization?? // TODO: Synchronization??
pendingResources.put(r, exec.submit(download(lda, r, ugi))); pendingResources.put(r, cs.submit(download(lda, r, ugi)));
} }
} }
break; break;
@ -247,8 +255,7 @@ public class ContainerLocalizer {
} catch (YarnRemoteException e) { } } catch (YarnRemoteException e) { }
return; return;
} }
// TODO HB immediately when rsrc localized cs.poll(1000, TimeUnit.MILLISECONDS);
sleep(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {

View File

@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -146,7 +147,8 @@ public class TestContainerLocalizer {
// return result instantly for deterministic test // return result instantly for deterministic test
ExecutorService syncExec = mock(ExecutorService.class); ExecutorService syncExec = mock(ExecutorService.class);
when(syncExec.submit(isA(Callable.class))) CompletionService<Path> cs = mock(CompletionService.class);
when(cs.submit(isA(Callable.class)))
.thenAnswer(new Answer<Future<Path>>() { .thenAnswer(new Answer<Future<Path>>() {
@Override @Override
public Future<Path> answer(InvocationOnMock invoc) public Future<Path> answer(InvocationOnMock invoc)
@ -159,6 +161,7 @@ public class TestContainerLocalizer {
} }
}); });
doReturn(syncExec).when(localizer).createDownloadThreadPool(); doReturn(syncExec).when(localizer).createDownloadThreadPool();
doReturn(cs).when(localizer).createCompletionService(syncExec);
// run localization // run localization
assertEquals(0, localizer.runLocalization(nmAddr)); assertEquals(0, localizer.runLocalization(nmAddr));