YARN-1338. Recover localized resource cache state upon nodemanager restart (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1598640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Junping Du 2014-05-30 15:37:27 +00:00
parent 75112e97b4
commit 66598697a6
32 changed files with 2397 additions and 128 deletions

View File

@ -29,6 +29,9 @@ Release 2.5.0 - UNRELEASED
YARN-1362. Distinguish between nodemanager shutdown for decommission vs shutdown
for restart. (Jason Lowe via junping_du)
YARN-1338. Recover localized resource cache state upon nodemanager restart
(Jason Lowe via junping_du)
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -156,6 +156,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies>
<profiles>
@ -292,6 +296,7 @@
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>yarn_server_nodemanager_recovery.proto</include>
<include>yarn_server_nodemanager_service_protos.proto</include>
<include>LocalizationProtocol.proto</include>
</includes>

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -67,6 +68,8 @@ public interface Context {
ApplicationACLsManager getApplicationACLsManager();
NMStateStoreService getNMStateStore();
boolean getDecommissioned();
void setDecommissioned(boolean isDecommissioned);

View File

@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@ -78,6 +81,7 @@ public class NodeManager extends CompositeService
private ContainerManagerImpl containerManager;
private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private NMStateStoreService nmStore = null;
private AtomicBoolean isStopping = new AtomicBoolean(false);
@ -115,9 +119,10 @@ public class NodeManager extends CompositeService
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
NMTokenSecretManagerInNM nmTokenSecretManager,
NMStateStoreService stateStore) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
dirsHandler, aclsManager);
dirsHandler, aclsManager, stateStore);
}
protected void doSecureLogin() throws IOException {
@ -125,11 +130,8 @@ public class NodeManager extends CompositeService
YarnConfiguration.NM_PRINCIPAL);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
private void initAndStartRecoveryStore(Configuration conf)
throws IOException {
boolean recoveryEnabled = conf.getBoolean(
YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
@ -142,7 +144,36 @@ public class NodeManager extends CompositeService
}
Path recoveryRoot = new Path(recoveryDirName);
recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700));
nmStore = new NMLeveldbStateStoreService();
} else {
nmStore = new NMNullStateStoreService();
}
nmStore.init(conf);
nmStore.start();
}
private void stopRecoveryStore() throws IOException {
nmStore.stop();
if (context.getDecommissioned() && nmStore.canRecover()) {
LOG.info("Removing state store due to decommission");
Configuration conf = getConfig();
Path recoveryRoot = new Path(
conf.get(YarnConfiguration.NM_RECOVERY_DIR));
LOG.info("Removing state store at " + recoveryRoot
+ " due to decommission");
FileSystem recoveryFs = FileSystem.getLocal(conf);
if (!recoveryFs.delete(recoveryRoot, true)) {
LOG.warn("Unable to delete " + recoveryRoot);
}
}
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
initAndStartRecoveryStore(conf);
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf);
@ -171,7 +202,7 @@ public class NodeManager extends CompositeService
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
nmTokenSecretManager, nmStore);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
@ -220,6 +251,7 @@ public class NodeManager extends CompositeService
return;
}
super.serviceStop();
stopRecoveryStore();
DefaultMetricsSystem.shutdown();
}
@ -272,11 +304,13 @@ public class NodeManager extends CompositeService
private WebServer webServer;
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
private final NMStateStoreService stateStore;
private boolean isDecommissioned = false;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
NMStateStoreService stateStore) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@ -284,6 +318,7 @@ public class NodeManager extends CompositeService
this.nodeHealthStatus.setIsNodeHealthy(true);
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
this.stateStore = stateStore;
}
/**
@ -351,6 +386,11 @@ public class NodeManager extends CompositeService
return aclsManager;
}
@Override
public NMStateStoreService getNMStateStore() {
return stateStore;
}
@Override
public boolean getDecommissioned() {
return isDecommissioned;

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.service.Service.STATE.STARTED;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -116,6 +117,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -218,6 +220,15 @@ public class ContainerManagerImpl extends CompositeService implements
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf);
recover();
}
private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore();
if (stateStore.canRecover()) {
rsrcLocalizationSrvc.recoverLocalizedResources(
stateStore.loadLocalizationState());
}
}
protected LogHandler createLogHandler(Configuration conf, Context context,
@ -239,7 +250,7 @@ public class ContainerManagerImpl extends CompositeService implements
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(this.dispatcher, exec,
deletionContext, dirsHandler);
deletionContext, dirsHandler, context.getNMStateStore());
}
protected ContainersLauncher createContainersLauncher(Context context,

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
/**
* {@link LocalCacheDirectoryManager} is used for managing hierarchical
* directories for local cache. It will allow to restrict the number of files in
@ -99,6 +101,57 @@ public class LocalCacheDirectoryManager {
}
}
/**
* Increment the file count for a relative directory within the cache
*
* @param relPath the relative path
*/
public synchronized void incrementFileCountForPath(String relPath) {
relPath = relPath == null ? "" : relPath.trim();
Directory subDir = knownDirectories.get(relPath);
if (subDir == null) {
int dirnum = Directory.getDirectoryNumber(relPath);
totalSubDirectories = Math.max(dirnum, totalSubDirectories);
subDir = new Directory(dirnum);
nonFullDirectories.add(subDir);
knownDirectories.put(subDir.getRelativePath(), subDir);
}
if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
nonFullDirectories.remove(subDir);
}
}
/**
* Given a path to a directory within a local cache tree return the
* root of the cache directory.
*
* @param path the directory within a cache directory
* @return the local cache directory root or null if not found
*/
public static Path getCacheDirectoryRoot(Path path) {
while (path != null) {
String name = path.getName();
if (name.length() != 1) {
return path;
}
int dirnum = DIRECTORIES_PER_LEVEL;
try {
dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL);
} catch (NumberFormatException e) {
}
if (dirnum >= DIRECTORIES_PER_LEVEL) {
return path;
}
path = path.getParent();
}
return path;
}
@VisibleForTesting
synchronized Directory getDirectory(String relPath) {
return knownDirectories.get(relPath);
}
/*
* It limits the number of files and sub directories in the directory to the
* limit LocalCacheDirectoryManager#perDirectoryFileLimit.
@ -108,11 +161,9 @@ public class LocalCacheDirectoryManager {
private final String relativePath;
private int fileCount;
public Directory(int directoryNo) {
fileCount = 0;
if (directoryNo == 0) {
relativePath = "";
} else {
static String getRelativePath(int directoryNo) {
String relativePath = "";
if (directoryNo > 0) {
String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
StringBuffer sb = new StringBuffer();
if (tPath.length() == 1) {
@ -128,6 +179,27 @@ public class LocalCacheDirectoryManager {
}
relativePath = sb.toString();
}
return relativePath;
}
static int getDirectoryNumber(String relativePath) {
String numStr = relativePath.replace("/", "");
if (relativePath.isEmpty()) {
return 0;
}
if (numStr.length() > 1) {
// undo step from getRelativePath() to reuse 0th sub directory
String firstChar = Integer.toString(
Integer.parseInt(numStr.substring(0, 1),
DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL);
numStr = firstChar + numStr.substring(1);
}
return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1;
}
public Directory(int directoryNo) {
fileCount = 0;
relativePath = getRelativePath(directoryNo);
}
public int incrementAndGetCount() {

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import com.google.common.annotations.VisibleForTesting;
/**
* Component tracking resources all of the same {@link LocalResourceVisibility}
*
@ -34,18 +31,11 @@ import com.google.common.annotations.VisibleForTesting;
interface LocalResourcesTracker
extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> {
// TODO: Not used at all!!
boolean contains(LocalResourceRequest resource);
boolean remove(LocalizedResource req, DeletionService delService);
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
String getUser();
long nextUniqueNumber();
@VisibleForTesting
@Private
LocalizedResource getLocalizedResource(LocalResourceRequest request);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -27,14 +28,21 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import com.google.common.annotations.VisibleForTesting;
@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
.compile(RANDOM_DIR_REGEX);
private final String user;
private final ApplicationId appId;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
* per APPLICATION, USER and PUBLIC cache.
*/
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
private NMStateStoreService stateStore;
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
boolean useLocalCacheDirectoryManager, Configuration conf) {
this(user, dispatcher,
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
Configuration conf, NMStateStoreService stateStore) {
this(user, appId, dispatcher,
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
useLocalCacheDirectoryManager, conf);
useLocalCacheDirectoryManager, conf, stateStore);
}
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
LocalResourcesTrackerImpl(String user, ApplicationId appId,
Dispatcher dispatcher,
ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
boolean useLocalCacheDirectoryManager, Configuration conf) {
boolean useLocalCacheDirectoryManager, Configuration conf,
NMStateStoreService stateStore) {
this.appId = appId;
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
this.stateStore = stateStore;
}
/*
@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
localrsrc.remove(req);
decrementFileCountForLocalCacheDirectory(req, rsrc);
removeResource(req);
rsrc = null;
}
if (null == rsrc) {
@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
}
break;
case LOCALIZATION_FAILED:
decrementFileCountForLocalCacheDirectory(req, null);
/*
* If resource localization fails then Localized resource will be
* removed from local cache.
*/
localrsrc.remove(req);
removeResource(req);
break;
case RECOVERED:
if (rsrc != null) {
LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
return;
}
rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
localrsrc.put(req, rsrc);
break;
}
rsrc.handle(event);
if (event.getType() == ResourceEventType.LOCALIZED) {
if (rsrc.getLocalPath() != null) {
try {
stateStore.finishResourceLocalization(user, appId,
buildLocalizedResourceProto(rsrc));
} catch (IOException ioe) {
LOG.error("Error storing resource state for " + rsrc, ioe);
}
} else {
LOG.warn("Resource " + rsrc + " localized without a location");
}
}
}
private LocalizedResource recoverResource(LocalResourceRequest req,
ResourceRecoveredEvent event) {
// unique number for a resource is the directory of the resource
Path localDir = event.getLocalPath().getParent();
long rsrcId = Long.parseLong(localDir.getName());
// update ID generator to avoid conflicts with existing resources
while (true) {
long currentRsrcId = uniqueNumberGenerator.get();
long nextRsrcId = Math.max(currentRsrcId, rsrcId);
if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
break;
}
}
incrementFileCountForLocalCacheDirectory(localDir.getParent());
return new LocalizedResource(req, dispatcher);
}
private LocalizedResourceProto buildLocalizedResourceProto(
LocalizedResource rsrc) {
return LocalizedResourceProto.newBuilder()
.setResource(buildLocalResourceProto(rsrc.getRequest()))
.setLocalPath(rsrc.getLocalPath().toString())
.setSize(rsrc.getSize())
.build();
}
private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
LocalResourcePBImpl lrpb;
if (!(lr instanceof LocalResourcePBImpl)) {
lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
lr.getPattern());
}
lrpb = (LocalResourcePBImpl) lr;
return lrpb.getProto();
}
public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
if (useLocalCacheDirectoryManager) {
Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
cacheDir);
if (cacheRoot != null) {
LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
if (dir == null) {
dir = new LocalCacheDirectoryManager(conf);
LocalCacheDirectoryManager otherDir =
directoryManagers.putIfAbsent(cacheRoot, dir);
if (otherDir != null) {
dir = otherDir;
}
}
if (cacheDir.equals(cacheRoot)) {
dir.incrementFileCountForPath("");
} else {
String dirStr = cacheDir.toUri().getRawPath();
String rootStr = cacheRoot.toUri().getRawPath();
dir.incrementFileCountForPath(
dirStr.substring(rootStr.length() + 1));
}
}
}
}
/*
@ -216,11 +317,6 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
return ret;
}
@Override
public boolean contains(LocalResourceRequest resource) {
return localrsrc.containsKey(resource);
}
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
+ " with non-zero refcount");
return false;
} else { // ResourceState is LOCALIZED or INIT
localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
removeResource(rem.getRequest());
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
return true;
}
}
private void removeResource(LocalResourceRequest req) {
LocalizedResource rsrc = localrsrc.remove(req);
decrementFileCountForLocalCacheDirectory(req, rsrc);
if (rsrc != null) {
Path localPath = rsrc.getLocalPath();
if (localPath != null) {
try {
stateStore.removeLocalizedResource(user, appId, localPath);
} catch (IOException e) {
LOG.error("Unable to remove resource " + rsrc + " from state store",
e);
}
}
}
}
/**
* Returns the path up to the random directory component.
*/
@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
@Override
public Path
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
Path rPath = localDirPath;
if (useLocalCacheDirectoryManager && localDirPath != null) {
if (!directoryManagers.containsKey(localDirPath)) {
@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
}
LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
Path rPath = localDirPath;
rPath = localDirPath;
String hierarchicalPath = dir.getRelativePathForLocalization();
// For most of the scenarios we will get root path only which
// is an empty string
@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
rPath = new Path(localDirPath, hierarchicalPath);
}
inProgressLocalResourcesMap.put(req, rPath);
return rPath;
} else {
return localDirPath;
}
rPath = new Path(rPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
Path localPath = new Path(rPath, req.getPath().getName());
LocalizedResource rsrc = localrsrc.get(req);
rsrc.setLocalPath(localPath);
LocalResource lr = LocalResource.newInstance(req.getResource(),
req.getType(), req.getVisibility(), req.getSize(),
req.getTimestamp());
try {
stateStore.startResourceLocalization(user, appId,
((LocalResourcePBImpl) lr).getProto(), localPath);
} catch (IOException e) {
LOG.error("Unable to record localization start for " + rsrc, e);
}
return rPath;
}
@Override
public long nextUniqueNumber() {
return uniqueNumberGenerator.incrementAndGet();
}
@VisibleForTesting
@Private
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
}
@VisibleForTesting
LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
LocalCacheDirectoryManager mgr = null;
if (useLocalCacheDirectoryManager) {
mgr = directoryManagers.get(localDirPath);
}
return mgr;
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@ -54,8 +55,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
Path localPath;
long size = -1;
volatile Path localPath;
volatile long size = -1;
final LocalResourceRequest rsrc;
final Dispatcher dispatcher;
final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
@ -76,6 +77,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
.addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
ResourceEventType.RECOVERED, new RecoveredTransition())
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
@ -157,6 +160,10 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
return localPath;
}
public void setLocalPath(Path localPath) {
this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
}
public long getTimestamp() {
return timestamp.get();
}
@ -234,7 +241,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
rsrc.localPath = locEvent.getLocation();
rsrc.localPath =
Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
@ -291,4 +299,13 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
rsrc.release(relEvent.getContainer());
}
}
private static class RecoveredTransition extends ResourceTransition {
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
rsrc.localPath = recoveredEvent.getLocalPath();
rsrc.size = recoveredEvent.getSize();
}
}
}

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
@ -109,10 +112,15 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -142,6 +150,7 @@ public class ResourceLocalizationService extends CompositeService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
private LocalizerTokenSecretManager secretManager;
private NMStateStoreService stateStore;
private LocalResourcesTracker publicRsrc;
@ -163,7 +172,7 @@ public class ResourceLocalizationService extends CompositeService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler) {
LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@ -175,6 +184,7 @@ public class ResourceLocalizationService extends CompositeService
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
this.stateStore = stateStore;
}
FileContext getLocalFileContext(Configuration conf) {
@ -203,15 +213,17 @@ public class ResourceLocalizationService extends CompositeService
@Override
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
this.publicRsrc =
new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
true, conf, stateStore);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
cleanUpLocalDir(lfs,delService);
if (!stateStore.canRecover()) {
cleanUpLocalDir(lfs,delService);
}
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
@ -249,6 +261,74 @@ public class ResourceLocalizationService extends CompositeService
super.serviceInit(conf);
}
//Recover localized resources after an NM restart
public void recoverLocalizedResources(RecoveredLocalizationState state)
throws URISyntaxException {
LocalResourceTrackerState trackerState = state.getPublicTrackerState();
recoverTrackerResources(publicRsrc, trackerState);
for (Map.Entry<String, RecoveredUserResources> userEntry :
state.getUserResources().entrySet()) {
String user = userEntry.getKey();
RecoveredUserResources userResources = userEntry.getValue();
trackerState = userResources.getPrivateTrackerState();
if (!trackerState.isEmpty()) {
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
null, dispatcher, true, super.getConfig(), stateStore);
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
tracker);
if (oldTracker != null) {
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
}
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
userResources.getAppTrackerStates().entrySet()) {
trackerState = appEntry.getValue();
if (!trackerState.isEmpty()) {
ApplicationId appId = appEntry.getKey();
String appIdStr = ConverterUtils.toString(appId);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, false, super.getConfig(), stateStore);
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
tracker);
if (oldTracker != null) {
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
}
}
}
}
private void recoverTrackerResources(LocalResourcesTracker tracker,
LocalResourceTrackerState state) throws URISyntaxException {
for (LocalizedResourceProto proto : state.getLocalizedResources()) {
LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
LocalResourceRequest req = new LocalResourceRequest(rsrc);
LOG.info("Recovering localized resource " + req + " at "
+ proto.getLocalPath());
tracker.handle(new ResourceRecoveredEvent(req,
new Path(proto.getLocalPath()), proto.getSize()));
}
for (Map.Entry<LocalResourceProto, Path> entry :
state.getInProgressResources().entrySet()) {
LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
LocalResourceRequest req = new LocalResourceRequest(rsrc);
Path localPath = entry.getValue();
tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
// delete any in-progress localizations, containers will request again
LOG.info("Deleting in-progress localization for " + req + " at "
+ localPath);
tracker.remove(tracker.getLocalizedResource(req), delService);
}
// TODO: remove untracked directories in local filesystem
}
@Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
return localizerTracker.processHeartbeat(status);
@ -337,17 +417,10 @@ public class ResourceLocalizationService extends CompositeService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
dispatcher, true, super.getConfig()));
if (null != appRsrc.putIfAbsent(
ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
.getConfig()))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it
// should appear in logs, but it's an internal error
// that should have no effect on applications
}
null, dispatcher, true, super.getConfig(), stateStore));
String appIdStr = ConverterUtils.toString(app.getAppId());
appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
// 1) Signal container init
//
// This is handled by the ApplicationImpl state machine and allows
@ -446,18 +519,28 @@ public class ResourceLocalizationService extends CompositeService
@SuppressWarnings({"unchecked"})
private void handleDestroyApplicationResources(Application application) {
String userName;
String appIDStr;
String userName = application.getUser();
ApplicationId appId = application.getAppId();
String appIDStr = application.toString();
LocalResourcesTracker appLocalRsrcsTracker =
appRsrc.remove(ConverterUtils.toString(application.getAppId()));
if (null == appLocalRsrcsTracker) {
appRsrc.remove(ConverterUtils.toString(appId));
if (appLocalRsrcsTracker != null) {
for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
Path localPath = rsrc.getLocalPath();
if (localPath != null) {
try {
stateStore.removeLocalizedResource(userName, appId, localPath);
} catch (IOException e) {
LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
+ " from state store", e);
}
}
}
} else {
LOG.warn("Removing uninitialized application " + application);
}
// TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned app-dir
@ -668,19 +751,15 @@ public class ResourceLocalizationService extends CompositeService
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicDirDestPath =
Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path hierarchicalPath =
publicRsrc.getPathForLocalization(key, publicDirDestPath);
if (!hierarchicalPath.equals(publicDirDestPath)) {
publicDirDestPath = hierarchicalPath;
Path publicDirDestPath =
publicRsrc.getPathForLocalization(key, publicRootPath);
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
publicDirDestPath =
new Path(publicDirDestPath, Long.toString(publicRsrc
.nextUniqueNumber()));
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
@ -968,9 +1047,8 @@ public class ResourceLocalizationService extends CompositeService
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath);
return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath);
}
@Override

View File

@ -31,5 +31,7 @@ public enum ResourceEventType {
/** See {@link ResourceReleaseEvent} */
RELEASE,
/** See {@link ResourceFailedLocalizationEvent} */
LOCALIZATION_FAILED
LOCALIZATION_FAILED,
/** See {@link ResourceRecoveredEvent} */
RECOVERED
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
public class ResourceRecoveredEvent extends ResourceEvent {
private final Path localPath;
private final long size;
public ResourceRecoveredEvent(LocalResourceRequest rsrc, Path localPath,
long size) {
super(rsrc, ResourceEventType.RECOVERED);
this.localPath = localPath;
this.size = size;
}
public Path getLocalPath() {
return localPath;
}
public long getSize() {
return size;
}
}

View File

@ -0,0 +1,377 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
public class NMLeveldbStateStoreService extends NMStateStoreService {
public static final Log LOG =
LogFactory.getLog(NMLeveldbStateStoreService.class);
private static final String DB_NAME = "yarn-nm-state";
private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
private static final String DB_SCHEMA_VERSION = "1.0";
private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
LOCALIZATION_KEY_PREFIX + "public/";
private static final String LOCALIZATION_PRIVATE_KEY_PREFIX =
LOCALIZATION_KEY_PREFIX + "private/";
private static final String LOCALIZATION_STARTED_SUFFIX = "started/";
private static final String LOCALIZATION_COMPLETED_SUFFIX = "completed/";
private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
private DB db;
public NMLeveldbStateStoreService() {
super(NMLeveldbStateStoreService.class.getName());
}
@Override
protected void startStorage() throws IOException {
}
@Override
protected void closeStorage() throws IOException {
if (db != null) {
db.close();
}
}
@Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
try {
LeveldbIterator iter = new LeveldbIterator(db);
iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
state.publicTrackerState = loadResourceTrackerState(iter,
LOCALIZATION_PUBLIC_KEY_PREFIX);
iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) {
break;
}
int userEndPos = key.indexOf('/',
LOCALIZATION_PRIVATE_KEY_PREFIX.length());
if (userEndPos < 0) {
throw new IOException("Unable to determine user in resource key: "
+ key);
}
String user = key.substring(
LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos);
state.userResources.put(user, loadUserLocalizedResources(iter,
key.substring(0, userEndPos+1)));
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
return state;
}
private LocalResourceTrackerState loadResourceTrackerState(
LeveldbIterator iter, String keyPrefix) throws IOException {
final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX;
final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
LocalResourceTrackerState state = new LocalResourceTrackerState();
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
if (key.startsWith(completedPrefix)) {
state.localizedResources = loadCompletedResources(iter,
completedPrefix);
} else if (key.startsWith(startedPrefix)) {
state.inProgressResources = loadStartedResources(iter, startedPrefix);
} else {
throw new IOException("Unexpected key in resource tracker state: "
+ key);
}
}
return state;
}
private List<LocalizedResourceProto> loadCompletedResources(
LeveldbIterator iter, String keyPrefix) throws IOException {
List<LocalizedResourceProto> rsrcs =
new ArrayList<LocalizedResourceProto>();
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loading completed resource from " + key);
}
rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue()));
iter.next();
}
return rsrcs;
}
private Map<LocalResourceProto, Path> loadStartedResources(
LeveldbIterator iter, String keyPrefix) throws IOException {
Map<LocalResourceProto, Path> rsrcs =
new HashMap<LocalResourceProto, Path>();
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
Path localPath = new Path(key.substring(keyPrefix.length()));
if (LOG.isDebugEnabled()) {
LOG.debug("Loading in-progress resource at " + localPath);
}
rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath);
iter.next();
}
return rsrcs;
}
private RecoveredUserResources loadUserLocalizedResources(
LeveldbIterator iter, String keyPrefix) throws IOException {
RecoveredUserResources userResources = new RecoveredUserResources();
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) {
userResources.privateTrackerState = loadResourceTrackerState(iter,
keyPrefix + LOCALIZATION_FILECACHE_SUFFIX);
} else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX,
keyPrefix.length())) {
int appIdStartPos = keyPrefix.length() +
LOCALIZATION_APPCACHE_SUFFIX.length();
int appIdEndPos = key.indexOf('/', appIdStartPos);
if (appIdEndPos < 0) {
throw new IOException("Unable to determine appID in resource key: "
+ key);
}
ApplicationId appId = ConverterUtils.toApplicationId(
key.substring(appIdStartPos, appIdEndPos));
userResources.appTrackerStates.put(appId,
loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1)));
} else {
throw new IOException("Unexpected user resource key " + key);
}
}
return userResources;
}
@Override
public void startResourceLocalization(String user, ApplicationId appId,
LocalResourceProto proto, Path localPath) throws IOException {
String key = getResourceStartedKey(user, appId, localPath.toString());
try {
db.put(bytes(key), proto.toByteArray());
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
}
@Override
public void finishResourceLocalization(String user, ApplicationId appId,
LocalizedResourceProto proto) throws IOException {
String localPath = proto.getLocalPath();
String startedKey = getResourceStartedKey(user, appId, localPath);
String completedKey = getResourceCompletedKey(user, appId, localPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing localized resource to " + completedKey);
}
try {
WriteBatch batch = db.createWriteBatch();
try {
batch.delete(bytes(startedKey));
batch.put(bytes(completedKey), proto.toByteArray());
db.write(batch);
} finally {
batch.close();
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
}
@Override
public void removeLocalizedResource(String user, ApplicationId appId,
Path localPath) throws IOException {
String localPathStr = localPath.toString();
String startedKey = getResourceStartedKey(user, appId, localPathStr);
String completedKey = getResourceCompletedKey(user, appId, localPathStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing local resource at " + localPathStr);
}
try {
WriteBatch batch = db.createWriteBatch();
try {
batch.delete(bytes(startedKey));
batch.delete(bytes(completedKey));
db.write(batch);
} finally {
batch.close();
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
}
private String getResourceStartedKey(String user, ApplicationId appId,
String localPath) {
return getResourceTrackerKeyPrefix(user, appId)
+ LOCALIZATION_STARTED_SUFFIX + localPath;
}
private String getResourceCompletedKey(String user, ApplicationId appId,
String localPath) {
return getResourceTrackerKeyPrefix(user, appId)
+ LOCALIZATION_COMPLETED_SUFFIX + localPath;
}
private String getResourceTrackerKeyPrefix(String user,
ApplicationId appId) {
if (user == null) {
return LOCALIZATION_PUBLIC_KEY_PREFIX;
}
if (appId == null) {
return LOCALIZATION_PRIVATE_KEY_PREFIX + user + "/"
+ LOCALIZATION_FILECACHE_SUFFIX;
}
return LOCALIZATION_PRIVATE_KEY_PREFIX + user + "/"
+ LOCALIZATION_APPCACHE_SUFFIX + appId + "/";
}
@Override
protected void initStorage(Configuration conf)
throws IOException {
Path storeRoot = createStorageDir(conf);
Options options = new Options();
options.createIfMissing(false);
options.logger(new LeveldbLogger());
LOG.info("Using state database at " + storeRoot + " for recovery");
File dbfile = new File(storeRoot.toString());
byte[] schemaVersionData = null;
try {
db = JniDBFactory.factory.open(dbfile, options);
try {
schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY));
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = bytes(DB_SCHEMA_VERSION);
db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData);
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
if (schemaVersionData != null) {
String schemaVersion = asString(schemaVersionData);
// only support exact schema matches for now
if (!DB_SCHEMA_VERSION.equals(schemaVersion)) {
throw new IOException("Incompatible state database schema, found "
+ schemaVersion + " expected " + DB_SCHEMA_VERSION);
}
} else {
throw new IOException("State database schema version not found");
}
}
private Path createStorageDir(Configuration conf) throws IOException {
final String storeUri = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
if (storeUri == null) {
throw new IOException("No store location directory configured in " +
YarnConfiguration.NM_RECOVERY_DIR);
}
Path root = new Path(storeUri, DB_NAME);
FileSystem fs = FileSystem.getLocal(conf);
fs.mkdirs(root, new FsPermission((short)0700));
return root;
}
private static class LeveldbLogger implements Logger {
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
@Override
public void log(String message) {
LOG.info(message);
}
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
// The state store to use when state isn't being stored
public class NMNullStateStoreService extends NMStateStoreService {
public NMNullStateStoreService() {
super(NMNullStateStoreService.class.getName());
}
@Override
public boolean canRecover() {
return false;
}
@Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");
}
@Override
public void startResourceLocalization(String user, ApplicationId appId,
LocalResourceProto proto, Path localPath) throws IOException {
}
@Override
public void finishResourceLocalization(String user, ApplicationId appId,
LocalizedResourceProto proto) throws IOException {
}
@Override
public void removeLocalizedResource(String user, ApplicationId appId,
Path localPath) throws IOException {
}
@Override
protected void initStorage(Configuration conf) throws IOException {
}
@Override
protected void startStorage() throws IOException {
}
@Override
protected void closeStorage() throws IOException {
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@Private
@Unstable
public abstract class NMStateStoreService extends AbstractService {
public NMStateStoreService(String name) {
super(name);
}
public static class LocalResourceTrackerState {
List<LocalizedResourceProto> localizedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> inProgressResources =
new HashMap<LocalResourceProto, Path>();
public List<LocalizedResourceProto> getLocalizedResources() {
return localizedResources;
}
public Map<LocalResourceProto, Path> getInProgressResources() {
return inProgressResources;
}
public boolean isEmpty() {
return localizedResources.isEmpty() && inProgressResources.isEmpty();
}
}
public static class RecoveredUserResources {
LocalResourceTrackerState privateTrackerState =
new LocalResourceTrackerState();
Map<ApplicationId, LocalResourceTrackerState> appTrackerStates =
new HashMap<ApplicationId, LocalResourceTrackerState>();
public LocalResourceTrackerState getPrivateTrackerState() {
return privateTrackerState;
}
public Map<ApplicationId, LocalResourceTrackerState>
getAppTrackerStates() {
return appTrackerStates;
}
}
public static class RecoveredLocalizationState {
LocalResourceTrackerState publicTrackerState =
new LocalResourceTrackerState();
Map<String, RecoveredUserResources> userResources =
new HashMap<String, RecoveredUserResources>();
public LocalResourceTrackerState getPublicTrackerState() {
return publicTrackerState;
}
public Map<String, RecoveredUserResources> getUserResources() {
return userResources;
}
}
/** Initialize the state storage */
@Override
public void serviceInit(Configuration conf) throws IOException {
initStorage(conf);
}
/** Start the state storage for use */
@Override
public void serviceStart() throws IOException {
startStorage();
}
/** Shutdown the state storage. */
@Override
public void serviceStop() throws IOException {
closeStorage();
}
public boolean canRecover() {
return true;
}
/**
* Load the state of localized resources
* @return recovered localized resource state
* @throws IOException
*/
public abstract RecoveredLocalizationState loadLocalizationState()
throws IOException;
/**
* Record the start of localization for a resource
* @param user the username or null if the resource is public
* @param appId the application ID if the resource is app-specific or null
* @param proto the resource request
* @param localPath local filesystem path where the resource will be stored
* @throws IOException
*/
public abstract void startResourceLocalization(String user,
ApplicationId appId, LocalResourceProto proto, Path localPath)
throws IOException;
/**
* Record the completion of a resource localization
* @param user the username or null if the resource is public
* @param appId the application ID if the resource is app-specific or null
* @param proto the serialized localized resource
* @throws IOException
*/
public abstract void finishResourceLocalization(String user,
ApplicationId appId, LocalizedResourceProto proto) throws IOException;
/**
* Remove records related to a resource localization
* @param user the username or null if the resource is public
* @param appId the application ID if the resource is app-specific or null
* @param localPath local filesystem path where the resource will be stored
* @throws IOException
*/
public abstract void removeLocalizedResource(String user,
ApplicationId appId, Path localPath) throws IOException;
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;
protected abstract void closeStorage() throws IOException;
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnServerNodemanagerRecoveryProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
message LocalizedResourceProto {
optional LocalResourceProto resource = 1;
optional string localPath = 2;
optional int64 size = 3;
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
public class DummyContainerManager extends ContainerManagerImpl {
@ -75,7 +76,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec,
deletionContext, super.dirsHandler) {
deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -79,7 +80,8 @@ public class TestEventFlow {
YarnConfiguration conf = new YarnConfiguration();
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null, null) {
new NMTokenSecretManagerInNM(), null, null,
new NMNullStateStoreService()) {
@Override
public int getHttpPort() {
return 1234;

View File

@ -108,6 +108,36 @@ public class TestNodeManagerShutdown {
localFS.delete(new Path(basedir.getPath()), true);
}
@Test
public void testStateStoreRemovalOnDecommission() throws IOException {
final File recoveryDir = new File(basedir, "nm-recovery");
nm = new TestNodeManager();
YarnConfiguration conf = createNMConfig();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, recoveryDir.getAbsolutePath());
// verify state store is not removed on normal shutdown
nm.init(conf);
nm.start();
Assert.assertTrue(recoveryDir.exists());
Assert.assertTrue(recoveryDir.isDirectory());
nm.stop();
nm = null;
Assert.assertTrue(recoveryDir.exists());
Assert.assertTrue(recoveryDir.isDirectory());
// verify state store is removed on decommissioned shutdown
nm = new TestNodeManager();
nm.init(conf);
nm.start();
Assert.assertTrue(recoveryDir.exists());
Assert.assertTrue(recoveryDir.isDirectory());
nm.getNMContext().setDecommissioned(true);
nm.stop();
nm = null;
Assert.assertFalse(recoveryDir.exists());
}
@Test
public void testKillContainersOnShutdown() throws IOException,
YarnException {

View File

@ -91,6 +91,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@SuppressWarnings("rawtypes")
public class TestNodeStatusUpdater {
@ -1159,7 +1161,8 @@ public class TestNodeStatusUpdater {
@Override
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
NMTokenSecretManagerInNM nmTokenSecretManager,
NMStateStoreService store) {
return new MyNMContext(containerTokenSecretManager,
nmTokenSecretManager);
}
@ -1268,7 +1271,8 @@ public class TestNodeStatusUpdater {
public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
super(containerTokenSecretManager, nmTokenSecretManager, null, null);
super(containerTokenSecretManager, nmTokenSecretManager, null, null,
new NMNullStateStoreService());
}
@Override

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -103,7 +104,8 @@ public abstract class BaseContainerManagerTest {
protected static final int HTTP_PORT = 5412;
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf)) {
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
public int getHttpPort() {
return HTTP_PORT;
};

View File

@ -23,6 +23,7 @@ import org.junit.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
import org.junit.Test;
public class TestLocalCacheDirectoryManager {
@ -73,7 +74,7 @@ public class TestLocalCacheDirectoryManager {
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
Exception e = null;
ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null);
new ResourceLocalizationService(null, null, null, null, null);
try {
service.init(conf);
} catch (Exception e1) {
@ -109,4 +110,49 @@ public class TestLocalCacheDirectoryManager {
// first sub directory
Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization());
}
@Test
public void testDirectoryConversion() {
for (int i = 0; i < 10000; ++i) {
String path = Directory.getRelativePath(i);
Assert.assertEquals("Incorrect conversion for " + i, i,
Directory.getDirectoryNumber(path));
}
}
@Test
public void testIncrementFileCountForPath() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
LocalCacheDirectoryManager.DIRECTORIES_PER_LEVEL + 2);
LocalCacheDirectoryManager mgr = new LocalCacheDirectoryManager(conf);
final String rootPath = "";
mgr.incrementFileCountForPath(rootPath);
Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
Assert.assertFalse("root dir should be full",
rootPath.equals(mgr.getRelativePathForLocalization()));
// finish filling the other directory
mgr.getRelativePathForLocalization();
// free up space in the root dir
mgr.decrementFileCountForPath(rootPath);
mgr.decrementFileCountForPath(rootPath);
Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
String otherDir = mgr.getRelativePathForLocalization();
Assert.assertFalse("root dir should be full", otherDir.equals(rootPath));
final String deepDir0 = "d/e/e/p/0";
final String deepDir1 = "d/e/e/p/1";
final String deepDir2 = "d/e/e/p/2";
final String deepDir3 = "d/e/e/p/3";
mgr.incrementFileCountForPath(deepDir0);
Assert.assertEquals(otherDir, mgr.getRelativePathForLocalization());
Assert.assertEquals(deepDir0, mgr.getRelativePathForLocalization());
Assert.assertEquals("total dir count incorrect after increment",
deepDir1, mgr.getRelativePathForLocalization());
mgr.incrementFileCountForPath(deepDir2);
mgr.incrementFileCountForPath(deepDir1);
mgr.incrementFileCountForPath(deepDir2);
Assert.assertEquals(deepDir3, mgr.getRelativePathForLocalization());
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.mockito.Mockito.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -34,13 +35,17 @@ import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
@ -52,10 +57,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestLocalResourcesTrackerImpl {
@ -92,8 +101,8 @@ public class TestLocalResourcesTrackerImpl {
localrsrc.put(req1, lr1);
localrsrc.put(req2, lr2);
LocalResourcesTracker tracker =
new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
conf);
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
false, conf, new NMNullStateStoreService());
ResourceEvent req11Event =
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@ -176,7 +185,8 @@ public class TestLocalResourcesTrackerImpl {
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
localrsrc.put(req1, lr1);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
dispatcher, localrsrc, false, conf);
null, dispatcher, localrsrc, false, conf,
new NMNullStateStoreService());
ResourceEvent req11Event = new ResourceRequestEvent(req1,
LocalResourceVisibility.PUBLIC, lc1);
@ -246,7 +256,8 @@ public class TestLocalResourcesTrackerImpl {
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker =
new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
true, conf, new NMNullStateStoreService());
LocalResourceRequest lr =
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@ -264,6 +275,7 @@ public class TestLocalResourcesTrackerImpl {
// Container-1 requesting local resource.
tracker.handle(reqEvent1);
dispatcher.await();
// New localized Resource should have been added to local resource map
// and the requesting container will be added to its waiting queue.
@ -280,6 +292,7 @@ public class TestLocalResourcesTrackerImpl {
ResourceEvent reqEvent2 =
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
tracker.handle(reqEvent2);
dispatcher.await();
// Container 2 should have been added to the waiting queue of the local
// resource
@ -295,6 +308,7 @@ public class TestLocalResourcesTrackerImpl {
LocalizedResource localizedResource = localrsrc.get(lr);
tracker.handle(resourceFailedEvent);
dispatcher.await();
// After receiving failed resource event; all waiting containers will be
// notified with Container Resource Failed Event.
@ -308,6 +322,7 @@ public class TestLocalResourcesTrackerImpl {
// exception.
ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
tracker.handle(relEvent1);
dispatcher.await();
// Container-3 now requests for the same resource. This request call
// is coming prior to Container-2's release call.
@ -316,6 +331,7 @@ public class TestLocalResourcesTrackerImpl {
ResourceEvent reqEvent3 =
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
tracker.handle(reqEvent3);
dispatcher.await();
// Local resource cache now should have the requested resource and the
// number of waiting containers should be 1.
@ -327,6 +343,7 @@ public class TestLocalResourcesTrackerImpl {
// Container-2 Releases the resource
ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
tracker.handle(relEvent2);
dispatcher.await();
// Making sure that there is no change in the cache after the release.
Assert.assertEquals(1, localrsrc.size());
@ -340,6 +357,7 @@ public class TestLocalResourcesTrackerImpl {
ResourceLocalizedEvent localizedEvent =
new ResourceLocalizedEvent(lr, localizedPath, 123L);
tracker.handle(localizedEvent);
dispatcher.await();
// Verifying ContainerResourceLocalizedEvent .
verify(containerEventHandler, times(1)).handle(
@ -351,6 +369,7 @@ public class TestLocalResourcesTrackerImpl {
// Container-3 releasing the resource.
ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
tracker.handle(relEvent3);
dispatcher.await();
Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
@ -384,7 +403,8 @@ public class TestLocalResourcesTrackerImpl {
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
dispatcher, localrsrc, true, conf);
null, dispatcher, localrsrc, true, conf,
new NMNullStateStoreService());
// This is a random path. NO File creation will take place at this place.
Path localDir = new Path("/tmp");
@ -401,7 +421,9 @@ public class TestLocalResourcesTrackerImpl {
tracker.handle(reqEvent1);
// Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
// NOTE: Localization path from tracker has resource ID at end
Path hierarchicalPath1 =
tracker.getPathForLocalization(lr1, localDir).getParent();
// Simulate lr1 getting localized
ResourceLocalizedEvent rle1 =
new ResourceLocalizedEvent(lr1,
@ -417,7 +439,8 @@ public class TestLocalResourcesTrackerImpl {
new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent2);
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
Path hierarchicalPath2 =
tracker.getPathForLocalization(lr2, localDir).getParent();
// localization failed.
ResourceFailedLocalizationEvent rfe2 =
new ResourceFailedLocalizationEvent(
@ -435,7 +458,8 @@ public class TestLocalResourcesTrackerImpl {
ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent3);
Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
Path hierarchicalPath3 =
tracker.getPathForLocalization(lr3, localDir).getParent();
// localization successful
ResourceLocalizedEvent rle3 =
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@ -479,6 +503,284 @@ public class TestLocalResourcesTrackerImpl {
}
}
@Test
@SuppressWarnings("unchecked")
public void testStateStoreSuccessfulLocalization() throws Exception {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(1, 1);
// This is a random path. NO File creation will take place at this place.
final Path localDir = new Path("/tmp");
Configuration conf = new YarnConfiguration();
DrainDispatcher dispatcher = null;
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
DeletionService mockDelService = mock(DeletionService.class);
NMStateStoreService stateStore = mock(NMStateStoreService.class);
try {
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, false, conf, stateStore);
// Container 1 needs lr1 resource
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
LocalResourceVisibility.APPLICATION);
LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
// Container 1 requests lr1 to be localized
ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
LocalResourceVisibility.APPLICATION, lc1);
tracker.handle(reqEvent1);
dispatcher.await();
// Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
ArgumentCaptor.forClass(LocalResourceProto.class);
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(stateStore).startResourceLocalization(eq(user), eq(appId),
localResourceCaptor.capture(), pathCaptor.capture());
LocalResourceProto lrProto = localResourceCaptor.getValue();
Path localizedPath1 = pathCaptor.getValue();
Assert.assertEquals(lr1,
new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
// Simulate lr1 getting localized
ResourceLocalizedEvent rle1 =
new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120);
tracker.handle(rle1);
dispatcher.await();
ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor =
ArgumentCaptor.forClass(LocalizedResourceProto.class);
verify(stateStore).finishResourceLocalization(eq(user), eq(appId),
localizedProtoCaptor.capture());
LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue();
Assert.assertEquals(lr1, new LocalResourceRequest(
new LocalResourcePBImpl(localizedProto.getResource())));
Assert.assertEquals(localizedPath1.toString(),
localizedProto.getLocalPath());
LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1);
Assert.assertNotNull(localizedRsrc1);
// simulate release and retention processing
tracker.handle(new ResourceReleaseEvent(lr1, cId1));
dispatcher.await();
boolean removeResult = tracker.remove(localizedRsrc1, mockDelService);
Assert.assertTrue(removeResult);
verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
eq(localizedPath1));
} finally {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testStateStoreFailedLocalization() throws Exception {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(1, 1);
// This is a random path. NO File creation will take place at this place.
final Path localDir = new Path("/tmp");
Configuration conf = new YarnConfiguration();
DrainDispatcher dispatcher = null;
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
NMStateStoreService stateStore = mock(NMStateStoreService.class);
try {
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, false, conf, stateStore);
// Container 1 needs lr1 resource
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
LocalResourceVisibility.APPLICATION);
LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
// Container 1 requests lr1 to be localized
ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
LocalResourceVisibility.APPLICATION, lc1);
tracker.handle(reqEvent1);
dispatcher.await();
// Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
ArgumentCaptor.forClass(LocalResourceProto.class);
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(stateStore).startResourceLocalization(eq(user), eq(appId),
localResourceCaptor.capture(), pathCaptor.capture());
LocalResourceProto lrProto = localResourceCaptor.getValue();
Path localizedPath1 = pathCaptor.getValue();
Assert.assertEquals(lr1,
new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
ResourceFailedLocalizationEvent rfe1 =
new ResourceFailedLocalizationEvent(
lr1, new Exception("Test").toString());
tracker.handle(rfe1);
dispatcher.await();
verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
eq(localizedPath1));
} finally {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testRecoveredResource() throws Exception {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(1, 1);
// This is a random path. NO File creation will take place at this place.
final Path localDir = new Path("/tmp/localdir");
Configuration conf = new YarnConfiguration();
DrainDispatcher dispatcher = null;
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
NMStateStoreService stateStore = mock(NMStateStoreService.class);
try {
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, false, conf, stateStore);
// Container 1 needs lr1 resource
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
LocalResourceVisibility.APPLICATION);
Assert.assertNull(tracker.getLocalizedResource(lr1));
final long localizedId1 = 52;
Path hierarchicalPath1 = new Path(localDir,
Long.toString(localizedId1));
Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
dispatcher.await();
Assert.assertNotNull(tracker.getLocalizedResource(lr1));
// verify new paths reflect recovery of previous resources
LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
LocalResourceVisibility.APPLICATION);
LocalizerContext lc2 = new LocalizerContext(user, cId1, null);
ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
LocalResourceVisibility.APPLICATION, lc2);
tracker.handle(reqEvent2);
dispatcher.await();
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
Assert.assertEquals(localizedId1 + 1, localizedId2);
} finally {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testRecoveredResourceWithDirCacheMgr() throws Exception {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(1, 1);
// This is a random path. NO File creation will take place at this place.
final Path localDirRoot = new Path("/tmp/localdir");
Configuration conf = new YarnConfiguration();
DrainDispatcher dispatcher = null;
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
NMStateStoreService stateStore = mock(NMStateStoreService.class);
try {
LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, true, conf, stateStore);
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
LocalResourceVisibility.PUBLIC);
Assert.assertNull(tracker.getLocalizedResource(lr1));
final long localizedId1 = 52;
Path hierarchicalPath1 = new Path(localDirRoot + "/4/2",
Long.toString(localizedId1));
Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
dispatcher.await();
Assert.assertNotNull(tracker.getLocalizedResource(lr1));
LocalCacheDirectoryManager dirMgrRoot =
tracker.getDirectoryManager(localDirRoot);
Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
Assert.assertEquals(1, dirMgrRoot.getDirectory("4/2").getCount());
LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
LocalResourceVisibility.PUBLIC);
Assert.assertNull(tracker.getLocalizedResource(lr2));
final long localizedId2 = localizedId1 + 1;
Path hierarchicalPath2 = new Path(localDirRoot + "/4/2",
Long.toString(localizedId2));
Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar");
tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120));
dispatcher.await();
Assert.assertNotNull(tracker.getLocalizedResource(lr2));
Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3,
LocalResourceVisibility.PUBLIC);
Assert.assertNull(tracker.getLocalizedResource(lr3));
final long localizedId3 = 128;
Path hierarchicalPath3 = new Path(localDirRoot + "/4/3",
Long.toString(localizedId3));
Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar");
tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120));
dispatcher.await();
Assert.assertNotNull(tracker.getLocalizedResource(lr3));
Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
LocalResourceRequest lr4 = createLocalResourceRequest(user, 4, 4,
LocalResourceVisibility.PUBLIC);
Assert.assertNull(tracker.getLocalizedResource(lr4));
final long localizedId4 = 256;
Path hierarchicalPath4 = new Path(localDirRoot + "/4",
Long.toString(localizedId4));
Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar");
tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120));
dispatcher.await();
Assert.assertNotNull(tracker.getLocalizedResource(lr4));
Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount());
Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
} finally {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@ -120,6 +122,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
@ -188,7 +194,8 @@ public class TestResourceLocalizationService {
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler));
diskhandler,
new NMNullStateStoreService()));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@ -253,7 +260,8 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
dirsHandler,
new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@ -287,7 +295,7 @@ public class TestResourceLocalizationService {
user, appId);
// init container.
final Container c = getMockContainer(appId, 42);
final Container c = getMockContainer(appId, 42, user);
// init resources
Random r = new Random();
@ -402,6 +410,233 @@ public class TestResourceLocalizationService {
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testRecovery() throws Exception {
final String user1 = "user1";
final String user2 = "user2";
final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
final ApplicationId appId2 = ApplicationId.newInstance(1, 2);
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
//Ignore actual localization
EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerBus);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
ResourceLocalizationService spyService =
createSpyService(dispatcher, dirsHandler, stateStore);
try {
spyService.init(conf);
spyService.start();
final Application app1 = mock(Application.class);
when(app1.getUser()).thenReturn(user1);
when(app1.getAppId()).thenReturn(appId1);
final Application app2 = mock(Application.class);
when(app2.getUser()).thenReturn(user2);
when(app2.getAppId()).thenReturn(appId2);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app1));
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app2));
dispatcher.await();
//Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
LocalResourcesTracker appTracker1 =
spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user1, appId1);
LocalResourcesTracker privTracker1 =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
user1, null);
LocalResourcesTracker appTracker2 =
spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user2, appId2);
LocalResourcesTracker pubTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
null, null);
// init containers
final Container c1 = getMockContainer(appId1, 1, user1);
final Container c2 = getMockContainer(appId2, 2, user2);
// init resources
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
// Send localization requests of each type.
final LocalResource privResource1 = getPrivateMockedResource(r);
final LocalResourceRequest privReq1 =
new LocalResourceRequest(privResource1);
final LocalResource privResource2 = getPrivateMockedResource(r);
final LocalResourceRequest privReq2 =
new LocalResourceRequest(privResource2);
final LocalResource pubResource1 = getPublicMockedResource(r);
final LocalResourceRequest pubReq1 =
new LocalResourceRequest(pubResource1);
final LocalResource pubResource2 = getPublicMockedResource(r);
final LocalResourceRequest pubReq2 =
new LocalResourceRequest(pubResource2);
final LocalResource appResource1 = getAppMockedResource(r);
final LocalResourceRequest appReq1 =
new LocalResourceRequest(appResource1);
final LocalResource appResource2 = getAppMockedResource(r);
final LocalResourceRequest appReq2 =
new LocalResourceRequest(appResource2);
final LocalResource appResource3 = getAppMockedResource(r);
final LocalResourceRequest appReq3 =
new LocalResourceRequest(appResource3);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req1.put(LocalResourceVisibility.PRIVATE,
Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 }));
req1.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq1));
req1.put(LocalResourceVisibility.APPLICATION,
Collections.singletonList(appReq1));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req2.put(LocalResourceVisibility.APPLICATION,
Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 }));
req2.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq2));
// Send Request event
spyService.handle(new ContainerLocalizationRequestEvent(c1, req1));
spyService.handle(new ContainerLocalizationRequestEvent(c2, req2));
dispatcher.await();
// Simulate start of localization for all resources
privTracker1.getPathForLocalization(privReq1,
dirsHandler.getLocalPathForWrite(
ContainerLocalizer.USERCACHE + user1));
privTracker1.getPathForLocalization(privReq2,
dirsHandler.getLocalPathForWrite(
ContainerLocalizer.USERCACHE + user1));
LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
appTracker1.getPathForLocalization(appReq1,
dirsHandler.getLocalPathForWrite(
ContainerLocalizer.APPCACHE + appId1));
LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
appTracker2.getPathForLocalization(appReq2,
dirsHandler.getLocalPathForWrite(
ContainerLocalizer.APPCACHE + appId2));
LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
appTracker2.getPathForLocalization(appReq3,
dirsHandler.getLocalPathForWrite(
ContainerLocalizer.APPCACHE + appId2));
LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
pubTracker.getPathForLocalization(pubReq1,
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
pubTracker.getPathForLocalization(pubReq2,
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
// Simulate completion of localization for most resources with
// possibly different sizes than in the request
assertNotNull("Localization not started", privLr1.getLocalPath());
privTracker1.handle(new ResourceLocalizedEvent(privReq1,
privLr1.getLocalPath(), privLr1.getSize() + 5));
assertNotNull("Localization not started", privLr2.getLocalPath());
privTracker1.handle(new ResourceLocalizedEvent(privReq2,
privLr2.getLocalPath(), privLr2.getSize() + 10));
assertNotNull("Localization not started", appLr1.getLocalPath());
appTracker1.handle(new ResourceLocalizedEvent(appReq1,
appLr1.getLocalPath(), appLr1.getSize()));
assertNotNull("Localization not started", appLr3.getLocalPath());
appTracker2.handle(new ResourceLocalizedEvent(appReq3,
appLr3.getLocalPath(), appLr3.getSize() + 7));
assertNotNull("Localization not started", pubLr1.getLocalPath());
pubTracker.handle(new ResourceLocalizedEvent(pubReq1,
pubLr1.getLocalPath(), pubLr1.getSize() + 1000));
assertNotNull("Localization not started", pubLr2.getLocalPath());
pubTracker.handle(new ResourceLocalizedEvent(pubReq2,
pubLr2.getLocalPath(), pubLr2.getSize() + 99999));
dispatcher.await();
assertEquals(ResourceState.LOCALIZED, privLr1.getState());
assertEquals(ResourceState.LOCALIZED, privLr2.getState());
assertEquals(ResourceState.LOCALIZED, appLr1.getState());
assertEquals(ResourceState.DOWNLOADING, appLr2.getState());
assertEquals(ResourceState.LOCALIZED, appLr3.getState());
assertEquals(ResourceState.LOCALIZED, pubLr1.getState());
assertEquals(ResourceState.LOCALIZED, pubLr2.getState());
// restart and recover
spyService = createSpyService(dispatcher, dirsHandler, stateStore);
spyService.init(conf);
spyService.recoverLocalizedResources(
stateStore.loadLocalizationState());
dispatcher.await();
appTracker1 = spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user1, appId1);
privTracker1 = spyService.getLocalResourcesTracker(
LocalResourceVisibility.PRIVATE, user1, null);
appTracker2 = spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user2, appId2);
pubTracker = spyService.getLocalResourcesTracker(
LocalResourceVisibility.PUBLIC, null, null);
LocalizedResource recoveredRsrc =
privTracker1.getLocalizedResource(privReq1);
assertEquals(privReq1, recoveredRsrc.getRequest());
assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath());
assertEquals(privLr1.getSize(), recoveredRsrc.getSize());
assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
recoveredRsrc = privTracker1.getLocalizedResource(privReq2);
assertEquals(privReq2, recoveredRsrc.getRequest());
assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath());
assertEquals(privLr2.getSize(), recoveredRsrc.getSize());
assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
recoveredRsrc = appTracker1.getLocalizedResource(appReq1);
assertEquals(appReq1, recoveredRsrc.getRequest());
assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath());
assertEquals(appLr1.getSize(), recoveredRsrc.getSize());
assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
recoveredRsrc = appTracker2.getLocalizedResource(appReq2);
assertNull("in-progress resource should not be present", recoveredRsrc);
recoveredRsrc = appTracker2.getLocalizedResource(appReq3);
assertEquals(appReq3, recoveredRsrc.getRequest());
assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath());
assertEquals(appLr3.getSize(), recoveredRsrc.getSize());
assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
} finally {
dispatcher.stop();
stateStore.close();
}
}
@Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
@ -436,7 +671,8 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
dirsHandler,
new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -469,7 +705,7 @@ public class TestResourceLocalizationService {
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
final Container c = getMockContainer(appId, 42);
final Container c = getMockContainer(appId, 42, "user0");
FSDataOutputStream out =
new FSDataOutputStream(new DataOutputBuffer(), null);
doReturn(out).when(spylfs).createInternal(isA(Path.class),
@ -616,7 +852,8 @@ public class TestResourceLocalizationService {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
dirsHandler,
new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -637,7 +874,7 @@ public class TestResourceLocalizationService {
dispatcher.await();
// init container.
final Container c = getMockContainer(appId, 42);
final Container c = getMockContainer(appId, 42, user);
// init resources
Random r = new Random();
@ -725,7 +962,7 @@ public class TestResourceLocalizationService {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy);
dirsHandlerSpy, new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -758,7 +995,7 @@ public class TestResourceLocalizationService {
.put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
// init container.
final Container c = getMockContainer(appId, 42);
final Container c = getMockContainer(appId, 42, user);
// first test ioexception
Mockito
@ -838,7 +1075,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler);
localDirHandler, new NMNullStateStoreService());
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@ -991,7 +1228,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler);
localDirHandler, new NMNullStateStoreService());
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@ -1157,7 +1394,7 @@ public class TestResourceLocalizationService {
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
dirsHandler);
dirsHandler, new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
@ -1424,12 +1661,13 @@ public class TestResourceLocalizationService {
return getMockedResource(r, LocalResourceVisibility.PRIVATE);
}
private static Container getMockContainer(ApplicationId appId, int id) {
private static Container getMockContainer(ApplicationId appId, int id,
String user) {
Container c = mock(Container.class);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
when(c.getUser()).thenReturn("user0");
when(c.getUser()).thenReturn(user);
when(c.getContainerId()).thenReturn(cId);
Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id));
@ -1438,6 +1676,24 @@ public class TestResourceLocalizationService {
return c;
}
private ResourceLocalizationService createSpyService(
DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
NMStateStoreService stateStore) {
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
DeletionService delService = mock(DeletionService.class);
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, stateStore);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
isA(Configuration.class));
doReturn(lfs).when(spyService)
.getLocalFileContext(isA(Configuration.class));
return spyService;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
static Token<? extends TokenIdentifier> getToken(int id) {
return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),

View File

@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentCaptor;
import static org.mockito.Mockito.*;
public class TestResourceRetention {
@ -81,7 +83,7 @@ public class TestResourceRetention {
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
trackerResources, false, conf));
null, trackerResources, false, conf, new NMNullStateStoreService()));
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<TrackerKey, TrackerState> trackerStates;
public NMMemoryStateStoreService() {
super(NMMemoryStateStoreService.class.getName());
}
private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
LocalResourceTrackerState result = new LocalResourceTrackerState();
result.localizedResources.addAll(ts.localizedResources.values());
for (Map.Entry<Path, LocalResourceProto> entry :
ts.inProgressMap.entrySet()) {
result.inProgressResources.put(entry.getValue(), entry.getKey());
}
return result;
}
private TrackerState getTrackerState(TrackerKey key) {
TrackerState ts = trackerStates.get(key);
if (ts == null) {
ts = new TrackerState();
trackerStates.put(key, ts);
}
return ts;
}
@Override
public synchronized RecoveredLocalizationState loadLocalizationState() {
RecoveredLocalizationState result = new RecoveredLocalizationState();
for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) {
TrackerKey tk = e.getKey();
TrackerState ts = e.getValue();
// check what kind of tracker state we have and recover appropriately
// public trackers have user == null
// private trackers have a valid user but appId == null
// app-specific trackers have a valid user and valid appId
if (tk.user == null) {
result.publicTrackerState = loadTrackerState(ts);
} else {
RecoveredUserResources rur = result.userResources.get(tk.user);
if (rur == null) {
rur = new RecoveredUserResources();
result.userResources.put(tk.user, rur);
}
if (tk.appId == null) {
rur.privateTrackerState = loadTrackerState(ts);
} else {
rur.appTrackerStates.put(tk.appId, loadTrackerState(ts));
}
}
}
return result;
}
@Override
public synchronized void startResourceLocalization(String user,
ApplicationId appId, LocalResourceProto proto, Path localPath) {
TrackerState ts = getTrackerState(new TrackerKey(user, appId));
ts.inProgressMap.put(localPath, proto);
}
@Override
public synchronized void finishResourceLocalization(String user,
ApplicationId appId, LocalizedResourceProto proto) {
TrackerState ts = getTrackerState(new TrackerKey(user, appId));
Path localPath = new Path(proto.getLocalPath());
ts.inProgressMap.remove(localPath);
ts.localizedResources.put(localPath, proto);
}
@Override
public synchronized void removeLocalizedResource(String user,
ApplicationId appId, Path localPath) {
TrackerState ts = trackerStates.get(new TrackerKey(user, appId));
if (ts != null) {
ts.inProgressMap.remove(localPath);
ts.localizedResources.remove(localPath);
}
}
@Override
protected void initStorage(Configuration conf) {
trackerStates = new HashMap<TrackerKey, TrackerState>();
}
@Override
protected void startStorage() {
}
@Override
protected void closeStorage() {
}
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();
Map<Path, LocalizedResourceProto> localizedResources =
new HashMap<Path, LocalizedResourceProto>();
}
private static class TrackerKey {
String user;
ApplicationId appId;
public TrackerKey(String user, ApplicationId appId) {
this.user = user;
this.appId = appId;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((appId == null) ? 0 : appId.hashCode());
result = prime * result + ((user == null) ? 0 : user.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (!(obj instanceof TrackerKey))
return false;
TrackerKey other = (TrackerKey) obj;
if (appId == null) {
if (other.appId != null)
return false;
} else if (!appId.equals(other.appId))
return false;
if (user == null) {
if (other.user != null)
return false;
} else if (!user.equals(other.user))
return false;
return true;
}
}
}

View File

@ -0,0 +1,407 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestNMLeveldbStateStoreService {
private static final File TMP_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestNMLeveldbStateStoreService.class.getName());
YarnConfiguration conf;
NMLeveldbStateStoreService stateStore;
@Before
public void setup() throws IOException {
FileUtil.fullyDelete(TMP_DIR);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, TMP_DIR.toString());
restartStateStore();
}
@After
public void cleanup() throws IOException {
if (stateStore != null) {
stateStore.close();
}
FileUtil.fullyDelete(TMP_DIR);
}
private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock
if (stateStore != null) {
stateStore.close();
}
stateStore = new NMLeveldbStateStoreService();
stateStore.init(conf);
stateStore.start();
}
private void verifyEmptyState() throws IOException {
RecoveredLocalizationState state = stateStore.loadLocalizationState();
assertNotNull(state);
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertNotNull(pubts);
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
assertTrue(state.getUserResources().isEmpty());
}
@Test
public void testEmptyState() throws IOException {
assertTrue(stateStore.canRecover());
verifyEmptyState();
}
@Test
public void testStartResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// start a local resource for an application
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
// restart and verify only app resource is marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty());
assertTrue(privts.getInProgressResources().isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getLocalizedResources().isEmpty());
assertEquals(1, appts.getInProgressResources().size());
assertEquals(appRsrcLocalPath,
appts.getInProgressResources().get(appRsrcProto));
// start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// restart and verify resources are marked in-progress
restartStateStore();
state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty());
assertEquals(2, pubts.getInProgressResources().size());
assertEquals(pubRsrcLocalPath1,
pubts.getInProgressResources().get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
userResources = state.getUserResources();
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty());
assertEquals(1, privts.getInProgressResources().size());
assertEquals(privRsrcLocalPath,
privts.getInProgressResources().get(privRsrcProto));
assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getLocalizedResources().isEmpty());
assertEquals(1, appts.getInProgressResources().size());
assertEquals(appRsrcLocalPath,
appts.getInProgressResources().get(appRsrcProto));
}
@Test
public void testFinishResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// start and finish a local resource for an application
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
LocalizedResourceProto appLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto)
.setLocalPath(appRsrcLocalPath.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
// restart and verify only app resource is completed
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty());
assertTrue(privts.getInProgressResources().isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getInProgressResources().isEmpty());
assertEquals(1, appts.getLocalizedResources().size());
assertEquals(appLocalizedProto,
appts.getLocalizedResources().iterator().next());
// start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// finish some of the resources
LocalizedResourceProto pubLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto1)
.setLocalPath(pubRsrcLocalPath1.toString())
.setSize(pubRsrcProto1.getSize())
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
LocalizedResourceProto privLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(privRsrcProto)
.setLocalPath(privRsrcLocalPath.toString())
.setSize(privRsrcProto.getSize())
.build();
stateStore.finishResourceLocalization(user, null, privLocalizedProto);
// restart and verify state
restartStateStore();
state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState();
assertEquals(1, pubts.getLocalizedResources().size());
assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next());
assertEquals(1, pubts.getInProgressResources().size());
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
userResources = state.getUserResources();
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertEquals(1, privts.getLocalizedResources().size());
assertEquals(privLocalizedProto,
privts.getLocalizedResources().iterator().next());
assertTrue(privts.getInProgressResources().isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getInProgressResources().isEmpty());
assertEquals(1, appts.getLocalizedResources().size());
assertEquals(appLocalizedProto,
appts.getLocalizedResources().iterator().next());
}
@Test
public void testRemoveLocalizedResource() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// go through the complete lifecycle for an application local resource
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
LocalizedResourceProto appLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto)
.setLocalPath(appRsrcLocalPath.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);
restartStateStore();
verifyEmptyState();
// remove an app resource that didn't finish
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);
restartStateStore();
verifyEmptyState();
// add public and private resources and remove some
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
LocalizedResourceProto pubLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto1)
.setLocalPath(pubRsrcLocalPath1.toString())
.setSize(789L)
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
LocalizedResourceProto pubLocalizedProto2 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto2)
.setLocalPath(pubRsrcLocalPath2.toString())
.setSize(7654321L)
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
stateStore.removeLocalizedResource(user, null, privRsrcLocalPath);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getInProgressResources().isEmpty());
assertEquals(1, pubts.getLocalizedResources().size());
assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
assertTrue(userResources.isEmpty());
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -77,7 +78,8 @@ public class TestContainerLogsPage {
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, new ApplicationACLsManager(conf));
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
new ApplicationACLsManager(conf), new NMNullStateStoreService());
// Add an application and the corresponding containers
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
String user = "nobody";

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -77,7 +79,8 @@ public class TestNMWebServer {
}
private int startNMWebAppServer(String webAddr) {
Context nmContext = new NodeManager.NMContext(null, null, null, null);
Context nmContext = new NodeManager.NMContext(null, null, null, null,
null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -135,7 +138,8 @@ public class TestNMWebServer {
@Test
public void testNMWebApp() throws IOException, YarnException {
Context nmContext = new NodeManager.NMContext(null, null, null, null);
Context nmContext = new NodeManager.NMContext(null, null, null, null,
null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -185,6 +189,7 @@ public class TestNMWebServer {
ContainerId container2 =
BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1);
NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
NMStateStoreService stateStore = new NMNullStateStoreService();
for (ContainerId containerId : new ContainerId[] { container1,
container2}) {
// TODO: Use builder utils

View File

@ -107,7 +107,8 @@ public class TestNMWebServices extends JerseyTest {
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
aclsManager, null);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {

View File

@ -99,7 +99,8 @@ public class TestNMWebServicesApps extends JerseyTest {
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
aclsManager, null);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {

View File

@ -122,7 +122,8 @@ public class TestNMWebServicesContainers extends JerseyTest {
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
aclsManager, null) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};