svn merge -c 1450807 FIXES: YARN-426. Failure to download a public resource prevents further downloads (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1450811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-02-27 15:33:22 +00:00
parent 7b943682fb
commit 829a798201
3 changed files with 127 additions and 13 deletions

View File

@ -325,6 +325,9 @@ Release 0.23.7 - UNRELEASED
YARN-400. RM can return null application resource usage report leading to YARN-400. RM can return null application resource usage report leading to
NPE in client (Jason Lowe via tgraves) NPE in client (Jason Lowe via tgraves)
YARN-426. Failure to download a public resource prevents further downloads
(Jason Lowe via bobby)
Release 0.23.6 - UNRELEASED Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -659,25 +659,23 @@ public class ResourceLocalizationService extends CompositeService
new ContainerResourceFailedEvent( new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(), assoc.getContext().getContainerId(),
assoc.getResource().getRequest(), e.getCause())); assoc.getResource().getRequest(), e.getCause()));
List<LocalizerResourceRequestEvent> reqs;
synchronized (attempts) { synchronized (attempts) {
LocalResourceRequest req = assoc.getResource().getRequest(); LocalResourceRequest req = assoc.getResource().getRequest();
List<LocalizerResourceRequestEvent> reqs = attempts.get(req); reqs = attempts.get(req);
if (null == reqs) { if (null == reqs) {
LOG.error("Missing pending list for " + req); LOG.error("Missing pending list for " + req);
return; return;
} }
if (reqs.isEmpty()) { attempts.remove(req);
attempts.remove(req); }
} // let the other containers know about the localization failure
/* for (LocalizerResourceRequestEvent reqEvent : reqs) {
* Do not retry for now. Once failed is failed! dispatcher.getEventHandler().handle(
* LocalizerResourceRequestEvent request = reqs.remove(0); new ContainerResourceFailedEvent(
reqEvent.getContext().getContainerId(),
pending.put(queue.submit(new FSDownload( reqEvent.getResource().getRequest(), e.getCause()));
lfs, null, conf, publicDirs, }
request.getResource().getRequest(), new Random())),
request);
*/ }
} catch (CancellationException e) { } catch (CancellationException e) {
// ignore; shutting down // ignore; shutting down
} }

View File

@ -27,13 +27,16 @@ import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -46,6 +49,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert; import junit.framework.Assert;
@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
@ -102,6 +108,8 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestResourceLocalizationService { public class TestResourceLocalizationService {
@ -512,6 +520,111 @@ public class TestResourceLocalizationService {
} }
} }
@Test(timeout=20000)
@SuppressWarnings("unchecked") // mocked generics
public void testFailedPublicResource() throws Exception {
Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
dispatcher.init(conf);
dispatcher.start();
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
isA(Configuration.class));
spyService.init(conf);
spyService.start();
final String user = "user0";
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
// init container.
final Container c = getMockContainer(appId, 42);
// init resources
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
// cause chmod to fail after a delay
final CyclicBarrier barrier = new CyclicBarrier(2);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) throws IOException {
try {
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
throw new IOException("forced failure");
}
}).when(spylfs)
.setPermission(isA(Path.class), isA(FsPermission.class));
// Queue up two localization requests for the same public resource
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
dispatcher.await();
// allow the chmod to fail now that both requests have been queued
barrier.await();
verify(containerBus, timeout(5000).times(2))
.handle(isA(ContainerResourceFailedEvent.class));
} finally {
dispatcher.stop();
}
}
private static URL getPath(String path) { private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path); URL url = BuilderUtils.newURL("file", null, 0, path);
return url; return url;