YARN-2931. PublicLocalizer may fail until directory is initialized by LocalizeRunner. (Anubhav Dhoot via kasha)
(cherry picked from commit db73cc9124
)
This commit is contained in:
parent
e2c1ef4deb
commit
9d72b0282f
|
@ -170,6 +170,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2927. [YARN-1492] InMemorySCMStore properties are inconsistent.
|
YARN-2927. [YARN-1492] InMemorySCMStore properties are inconsistent.
|
||||||
(Ray Chiang via kasha)
|
(Ray Chiang via kasha)
|
||||||
|
|
||||||
|
YARN-2931. PublicLocalizer may fail until directory is initialized by
|
||||||
|
LocalizeRunner. (Anubhav Dhoot via kasha)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -775,6 +775,12 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
|
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
|
||||||
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
|
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In case this is not a newly initialized nm state, ensure
|
||||||
|
// initialized local/log dirs similar to LocalizerRunner
|
||||||
|
getInitializedLocalDirs();
|
||||||
|
getInitializedLogDirs();
|
||||||
|
|
||||||
// explicitly synchronize pending here to avoid future task
|
// explicitly synchronize pending here to avoid future task
|
||||||
// completing and being dequeued before pending updated
|
// completing and being dequeued before pending updated
|
||||||
synchronized (pending) {
|
synchronized (pending) {
|
||||||
|
|
|
@ -32,18 +32,16 @@ import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Matchers.isNull;
|
import static org.mockito.Matchers.isNull;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doNothing;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.timeout;
|
import static org.mockito.Mockito.timeout;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -63,10 +61,7 @@ import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
@ -940,6 +935,109 @@ public class TestResourceLocalizationService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testPublicResourceInitializesLocalDir() throws Exception {
|
||||||
|
|
||||||
|
// Setup state to simulate restart NM with existing state meaning no
|
||||||
|
// directory creation during initialization
|
||||||
|
NMStateStoreService spyStateStore = spy(nmContext.getNMStateStore());
|
||||||
|
when(spyStateStore.canRecover()).thenReturn(true);
|
||||||
|
NMContext spyContext = spy(nmContext);
|
||||||
|
when(spyContext.getNMStateStore()).thenReturn(spyStateStore);
|
||||||
|
|
||||||
|
List<Path> localDirs = new ArrayList<Path>();
|
||||||
|
String[] sDirs = new String[4];
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
||||||
|
sDirs[i] = localDirs.get(i).toString();
|
||||||
|
}
|
||||||
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||||
|
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ApplicationEventType.class, applicationBus);
|
||||||
|
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ContainerEventType.class, containerBus);
|
||||||
|
|
||||||
|
ContainerExecutor exec = mock(ContainerExecutor.class);
|
||||||
|
DeletionService delService = mock(DeletionService.class);
|
||||||
|
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||||
|
dirsHandler.init(conf);
|
||||||
|
|
||||||
|
dispatcher.init(conf);
|
||||||
|
dispatcher.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
ResourceLocalizationService rawService =
|
||||||
|
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||||
|
dirsHandler, spyContext);
|
||||||
|
ResourceLocalizationService spyService = spy(rawService);
|
||||||
|
doReturn(mockServer).when(spyService).createServer();
|
||||||
|
doReturn(lfs).when(spyService).getLocalFileContext(
|
||||||
|
isA(Configuration.class));
|
||||||
|
|
||||||
|
spyService.init(conf);
|
||||||
|
spyService.start();
|
||||||
|
|
||||||
|
final FsPermission defaultPerm = new FsPermission((short)0755);
|
||||||
|
|
||||||
|
// verify directory is not created at initialization
|
||||||
|
for (Path p : localDirs) {
|
||||||
|
p = new Path((new URI(p.toString())).getPath());
|
||||||
|
Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
|
||||||
|
verify(spylfs, never())
|
||||||
|
.mkdir(eq(publicCache),eq(defaultPerm), eq(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
final String user = "user0";
|
||||||
|
// init application
|
||||||
|
final Application app = mock(Application.class);
|
||||||
|
final ApplicationId appId =
|
||||||
|
BuilderUtils.newApplicationId(314159265358979L, 3);
|
||||||
|
when(app.getUser()).thenReturn(user);
|
||||||
|
when(app.getAppId()).thenReturn(appId);
|
||||||
|
spyService.handle(new ApplicationLocalizationEvent(
|
||||||
|
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// init container.
|
||||||
|
final Container c = getMockContainer(appId, 42, user);
|
||||||
|
|
||||||
|
// init resources
|
||||||
|
Random r = new Random();
|
||||||
|
long seed = r.nextLong();
|
||||||
|
System.out.println("SEED: " + seed);
|
||||||
|
r.setSeed(seed);
|
||||||
|
|
||||||
|
// Queue up public resource localization
|
||||||
|
final LocalResource pubResource = getPublicMockedResource(r);
|
||||||
|
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
|
||||||
|
|
||||||
|
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
|
||||||
|
new HashMap<LocalResourceVisibility,
|
||||||
|
Collection<LocalResourceRequest>>();
|
||||||
|
req.put(LocalResourceVisibility.PUBLIC,
|
||||||
|
Collections.singletonList(pubReq));
|
||||||
|
|
||||||
|
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
|
||||||
|
pubRsrcs.add(pubReq);
|
||||||
|
|
||||||
|
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// verify directory creation
|
||||||
|
for (Path p : localDirs) {
|
||||||
|
p = new Path((new URI(p.toString())).getPath());
|
||||||
|
Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
|
||||||
|
verify(spylfs).mkdir(eq(publicCache),eq(defaultPerm), eq(true));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
dispatcher.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
@SuppressWarnings("unchecked") // mocked generics
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
public void testFailedPublicResource() throws Exception {
|
public void testFailedPublicResource() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue