YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)

This commit is contained in:
Robert Kanter 2015-05-06 14:19:06 -07:00
parent 0d3188fd25
commit b72507810a
6 changed files with 142 additions and 36 deletions

View File

@ -180,6 +180,8 @@ Release 2.8.0 - UNRELEASED
YARN-3396. Handle URISyntaxException in ResourceLocalizationService.
(Brahma Reddy Battula via junping_du)
YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -42,9 +42,12 @@
/**
* Manages a list of local storage directories.
*/
class DirectoryCollection {
public class DirectoryCollection {
private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
/**
* The enum defines disk failure type.
*/
public enum DiskErrorCause {
DISK_FULL, OTHER
}
@ -59,6 +62,13 @@ static class DiskErrorInformation {
}
}
/**
* The interface provides a callback when localDirs is changed.
*/
public interface DirsChangeListener {
void onDirsChanged();
}
/**
* Returns a merged list which contains all the elements of l1 and l2
* @param l1 the first list to be included
@ -84,6 +94,8 @@ static List<String> concat(List<String> l1, List<String> l2) {
private int goodDirsDiskUtilizationPercentage;
private Set<DirsChangeListener> dirsChangeListeners;
/**
* Create collection for the directories specified. No check for free space.
*
@ -154,6 +166,20 @@ public DirectoryCollection(String[] dirs,
: utilizationPercentageCutOff);
diskUtilizationSpaceCutoff =
utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
dirsChangeListeners = new HashSet<DirsChangeListener>();
}
synchronized void registerDirsChangeListener(
DirsChangeListener listener) {
if (dirsChangeListeners.add(listener)) {
listener.onDirsChanged();
}
}
synchronized void deregisterDirsChangeListener(
DirsChangeListener listener) {
dirsChangeListeners.remove(listener);
}
/**
@ -280,6 +306,11 @@ synchronized boolean checkDirs() {
}
}
setGoodDirsDiskUtilizationPercentage();
if (setChanged) {
for (DirsChangeListener listener : dirsChangeListeners) {
listener.onDirsChanged();
}
}
return setChanged;
}

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
/**
@ -192,6 +193,22 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
public void registerLocalDirsChangeListener(DirsChangeListener listener) {
localDirs.registerDirsChangeListener(listener);
}
public void registerLogDirsChangeListener(DirsChangeListener listener) {
logDirs.registerDirsChangeListener(listener);
}
public void deregisterLocalDirsChangeListener(DirsChangeListener listener) {
localDirs.deregisterDirsChangeListener(listener);
}
public void deregisterLogDirsChangeListener(DirsChangeListener listener) {
logDirs.deregisterDirsChangeListener(listener);
}
/**
* @return the good/valid local directories based on disks' health
*/

View File

@ -92,6 +92,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
@ -161,6 +162,8 @@ public class ResourceLocalizationService extends CompositeService
private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
private DirsChangeListener localDirsChangeListener;
private DirsChangeListener logDirsChangeListener;
private Context nmContext;
/**
@ -254,6 +257,18 @@ public void serviceInit(Configuration conf) throws Exception {
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
localDirsChangeListener = new DirsChangeListener() {
@Override
public void onDirsChanged() {
checkAndInitializeLocalDirs();
}
};
logDirsChangeListener = new DirsChangeListener() {
@Override
public void onDirsChanged() {
initializeLogDirs(lfs);
}
};
super.serviceInit(conf);
}
@ -345,6 +360,8 @@ public void serviceStart() throws Exception {
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
super.serviceStart();
dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
}
LocalizerTracker createLocalizerTracker(Configuration conf) {
@ -375,6 +392,8 @@ Server createServer() {
@Override
public void serviceStop() throws Exception {
dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener);
dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener);
if (server != null) {
server.stop();
}
@ -814,11 +833,6 @@ public void addResource(LocalizerResourceRequestEvent request) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
// In case this is not a newly initialized nm state, ensure
// initialized local/log dirs similar to LocalizerRunner
getInitializedLocalDirs();
getInitializedLogDirs();
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
@ -1120,8 +1134,6 @@ public void run() {
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
List<String> localDirs = getInitializedLocalDirs();
List<String> logDirs = getInitializedLogDirs();
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
@ -1387,13 +1399,12 @@ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
}
/**
* Synchronized method to get a list of initialized local dirs. Method will
* check each local dir to ensure it has been setup correctly and will attempt
* to fix any issues it finds.
*
* @return list of initialized local dirs
* Check each local dir to ensure it has been setup correctly and will
* attempt to fix any issues it finds.
* @return void
*/
synchronized private List<String> getInitializedLocalDirs() {
@VisibleForTesting
void checkAndInitializeLocalDirs() {
List<String> dirs = dirsHandler.getLocalDirs();
List<String> checkFailedDirs = new ArrayList<String>();
for (String dir : dirs) {
@ -1415,7 +1426,6 @@ synchronized private List<String> getInitializedLocalDirs() {
throw new YarnRuntimeException(msg, e);
}
}
return dirs;
}
private boolean checkLocalDir(String localDir) {
@ -1463,17 +1473,4 @@ private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir)
localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
return localDirPathFsPermissionsMap;
}
/**
* Synchronized method to get a list of initialized log dirs. Method will
* check each local dir to ensure it has been setup correctly and will attempt
* to fix any issues it finds.
*
* @return list of initialized log dirs
*/
synchronized private List<String> getInitializedLogDirs() {
List<String> dirs = dirsHandler.getLogDirs();
initializeLogDirs(lfs);
return dirs;
}
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -258,4 +259,50 @@ public void testConstructors() {
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
}
@Test
public void testDirsChangeListener() {
DirsChangeListenerTest listener1 = new DirsChangeListenerTest();
DirsChangeListenerTest listener2 = new DirsChangeListenerTest();
DirsChangeListenerTest listener3 = new DirsChangeListenerTest();
String dirA = new File(testDir, "dirA").getPath();
String[] dirs = { dirA };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(listener1.num, 0);
Assert.assertEquals(listener2.num, 0);
Assert.assertEquals(listener3.num, 0);
dc.registerDirsChangeListener(listener1);
dc.registerDirsChangeListener(listener2);
dc.registerDirsChangeListener(listener3);
Assert.assertEquals(listener1.num, 1);
Assert.assertEquals(listener2.num, 1);
Assert.assertEquals(listener3.num, 1);
dc.deregisterDirsChangeListener(listener3);
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(listener1.num, 2);
Assert.assertEquals(listener2.num, 2);
Assert.assertEquals(listener3.num, 1);
dc.deregisterDirsChangeListener(listener2);
dc.setDiskUtilizationPercentageCutoff(100.0F);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(listener1.num, 3);
Assert.assertEquals(listener2.num, 2);
Assert.assertEquals(listener3.num, 1);
}
static class DirsChangeListenerTest implements DirsChangeListener {
public int num = 0;
public DirsChangeListenerTest() {
}
@Override
public void onDirsChanged() {
num++;
}
}
}

View File

@ -1098,7 +1098,6 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
isA(Configuration.class));
spyService.init(conf);
spyService.start();
final FsPermission defaultPerm = new FsPermission((short)0755);
@ -1110,6 +1109,8 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
.mkdir(eq(publicCache),eq(defaultPerm), eq(true));
}
spyService.start();
final String user = "user0";
// init application
final Application app = mock(Application.class);
@ -1131,21 +1132,32 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
r.setSeed(seed);
// Queue up public resource localization
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
final LocalResource pubResource1 = getPublicMockedResource(r);
final LocalResourceRequest pubReq1 =
new LocalResourceRequest(pubResource1);
LocalResource pubResource2 = null;
do {
pubResource2 = getPublicMockedResource(r);
} while (pubResource2 == null || pubResource2.equals(pubResource1));
// above call to make sure we don't get identical resources.
final LocalResourceRequest pubReq2 =
new LocalResourceRequest(pubResource2);
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq1);
pubRsrcs.add(pubReq2);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);
req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
dispatcher.await();
verify(spyService, times(1)).checkAndInitializeLocalDirs();
// verify directory creation
for (Path p : localDirs) {
p = new Path((new URI(p.toString())).getPath());