YARN-5767. Fix the order that resources are cleaned up from the local Public/Private caches. Contributed by Chris Trezzo
This commit is contained in:
parent
7146359bfd
commit
1b79c417dc
|
@ -0,0 +1,182 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
|
||||
/**
|
||||
* A class responsible for cleaning the PUBLIC and PRIVATE local caches on a
|
||||
* node manager.
|
||||
*/
|
||||
class LocalCacheCleaner {
|
||||
|
||||
private long currentSize;
|
||||
private final long targetSize;
|
||||
private final DeletionService delService;
|
||||
private final SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap;
|
||||
|
||||
LocalCacheCleaner(DeletionService delService, long targetSize) {
|
||||
this(delService, targetSize, new LRUComparator());
|
||||
}
|
||||
|
||||
LocalCacheCleaner(DeletionService delService, long targetSize,
|
||||
Comparator<? super LocalizedResource> cmp) {
|
||||
this(delService, targetSize,
|
||||
new TreeMap<LocalizedResource, LocalResourcesTracker>(cmp));
|
||||
}
|
||||
|
||||
LocalCacheCleaner(DeletionService delService, long targetSize,
|
||||
SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap) {
|
||||
this.resourceMap = resourceMap;
|
||||
this.delService = delService;
|
||||
this.targetSize = targetSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds resources from the passed LocalResourceTracker that are candidates for
|
||||
* deletion from the cache.
|
||||
*
|
||||
* @param newTracker add all resources being tracked by the passed
|
||||
* LocalResourcesTracker to the LocalCacheCleaner.
|
||||
*/
|
||||
public void addResources(LocalResourcesTracker newTracker) {
|
||||
for (LocalizedResource resource : newTracker) {
|
||||
currentSize += resource.getSize();
|
||||
if (resource.getRefCount() > 0) {
|
||||
// Do not delete resources that are still in use
|
||||
continue;
|
||||
}
|
||||
resourceMap.put(resource, newTracker);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete resources from the cache in the sorted order generated by the
|
||||
* Comparator used to construct this class.
|
||||
*
|
||||
* @return stats about what was cleaned up during this call of cleanCache
|
||||
*/
|
||||
public LocalCacheCleanerStats cleanCache() {
|
||||
LocalCacheCleanerStats stats = new LocalCacheCleanerStats(currentSize);
|
||||
for (Iterator<Map.Entry<LocalizedResource, LocalResourcesTracker>> i =
|
||||
resourceMap.entrySet().iterator();
|
||||
currentSize - stats.totalDelSize > targetSize && i.hasNext();) {
|
||||
Map.Entry<LocalizedResource, LocalResourcesTracker> rsrc = i.next();
|
||||
LocalizedResource resource = rsrc.getKey();
|
||||
LocalResourcesTracker tracker = rsrc.getValue();
|
||||
if (tracker.remove(resource, delService)) {
|
||||
stats.incDelSize(tracker.getUser(), resource.getSize());
|
||||
}
|
||||
}
|
||||
this.resourceMap.clear();
|
||||
return stats;
|
||||
}
|
||||
|
||||
static class LocalCacheCleanerStats {
|
||||
private final Map<String, Long> userDelSizes = new TreeMap<String, Long>();
|
||||
private final long cacheSizeBeforeClean;
|
||||
private long totalDelSize;
|
||||
private long publicDelSize;
|
||||
private long privateDelSize;
|
||||
|
||||
LocalCacheCleanerStats(long cacheSizeBeforeClean) {
|
||||
this.cacheSizeBeforeClean = cacheSizeBeforeClean;
|
||||
}
|
||||
|
||||
void incDelSize(String user, long delSize) {
|
||||
totalDelSize += delSize;
|
||||
if (user == null) {
|
||||
publicDelSize += delSize;
|
||||
} else {
|
||||
privateDelSize += delSize;
|
||||
Long userDel = userDelSizes.get(user);
|
||||
if (userDel != null) {
|
||||
userDel += delSize;
|
||||
userDelSizes.put(user, userDel);
|
||||
} else {
|
||||
userDelSizes.put(user, delSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Long> getUserDelSizes() {
|
||||
return Collections.unmodifiableMap(userDelSizes);
|
||||
}
|
||||
|
||||
long getCacheSizeBeforeClean() {
|
||||
return cacheSizeBeforeClean;
|
||||
}
|
||||
|
||||
long getTotalDelSize() {
|
||||
return totalDelSize;
|
||||
}
|
||||
|
||||
long getPublicDelSize() {
|
||||
return publicDelSize;
|
||||
}
|
||||
|
||||
long getPrivateDelSize() {
|
||||
return privateDelSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Cache Size Before Clean: ").append(cacheSizeBeforeClean)
|
||||
.append(", ");
|
||||
sb.append("Total Deleted: ").append(totalDelSize).append(", ");
|
||||
sb.append("Public Deleted: ").append(publicDelSize).append(", ");
|
||||
sb.append("Private Deleted: ").append(privateDelSize);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public String toStringDetailed() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(this.toString());
|
||||
sb.append(", Private Deleted Detail: {");
|
||||
for (Map.Entry<String, Long> e : userDelSizes.entrySet()) {
|
||||
sb.append(" ").append(e.getKey()).append(":").append(e.getValue());
|
||||
}
|
||||
sb.append(" }");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static class LRUComparator implements Comparator<LocalizedResource>,
|
||||
Serializable {
|
||||
|
||||
private static final long serialVersionUID = 7034380228434701685L;
|
||||
|
||||
public int compare(LocalizedResource r1, LocalizedResource r2) {
|
||||
long ret = r1.getTimestamp() - r2.getTimestamp();
|
||||
if (0 == ret) {
|
||||
return System.identityHashCode(r1) - System.identityHashCode(r2);
|
||||
}
|
||||
return ret > 0 ? 1 : -1;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -65,7 +65,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
private final String user;
|
||||
private final ApplicationId appId;
|
||||
private final Dispatcher dispatcher;
|
||||
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
|
||||
@VisibleForTesting
|
||||
final ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc;
|
||||
private Configuration conf;
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
/*
|
||||
|
|
|
@ -113,6 +113,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
|
||||
|
@ -152,7 +153,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
|
||||
private Server server;
|
||||
private InetSocketAddress localizationServerAddress;
|
||||
private long cacheTargetSize;
|
||||
@VisibleForTesting
|
||||
long cacheTargetSize;
|
||||
private long cacheCleanupPeriod;
|
||||
|
||||
private final ContainerExecutor exec;
|
||||
|
@ -164,7 +166,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
private LocalizerTokenSecretManager secretManager;
|
||||
private NMStateStoreService stateStore;
|
||||
|
||||
private LocalResourcesTracker publicRsrc;
|
||||
@VisibleForTesting
|
||||
LocalResourcesTracker publicRsrc;
|
||||
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
private DirsChangeListener localDirsChangeListener;
|
||||
|
@ -176,7 +179,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
* Map of LocalResourceTrackers keyed by username, for private
|
||||
* resources.
|
||||
*/
|
||||
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
|
||||
@VisibleForTesting
|
||||
final ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
|
||||
new ConcurrentHashMap<String,LocalResourcesTracker>();
|
||||
|
||||
/**
|
||||
|
@ -427,7 +431,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
|
||||
break;
|
||||
case CACHE_CLEANUP:
|
||||
handleCacheCleanup(event);
|
||||
handleCacheCleanup();
|
||||
break;
|
||||
case CLEANUP_CONTAINER_RESOURCES:
|
||||
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
|
||||
|
@ -512,20 +516,21 @@ public class ResourceLocalizationService extends CompositeService
|
|||
localizerTracker.endContainerLocalization(locId);
|
||||
}
|
||||
|
||||
private void handleCacheCleanup(LocalizationEvent event) {
|
||||
ResourceRetentionSet retain =
|
||||
new ResourceRetentionSet(delService, cacheTargetSize);
|
||||
retain.addResources(publicRsrc);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Resource cleanup (public) " + retain);
|
||||
}
|
||||
@VisibleForTesting
|
||||
LocalCacheCleanerStats handleCacheCleanup() {
|
||||
LocalCacheCleaner cleaner =
|
||||
new LocalCacheCleaner(delService, cacheTargetSize);
|
||||
cleaner.addResources(publicRsrc);
|
||||
for (LocalResourcesTracker t : privateRsrc.values()) {
|
||||
retain.addResources(t);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
|
||||
}
|
||||
cleaner.addResources(t);
|
||||
}
|
||||
//TODO Check if appRsrcs should also be added to the retention set.
|
||||
LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(stats.toStringDetailed());
|
||||
} else if (LOG.isInfoEnabled()) {
|
||||
LOG.info(stats.toString());
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
|
||||
public class ResourceRetentionSet {
|
||||
|
||||
private long delSize;
|
||||
private long currentSize;
|
||||
private final long targetSize;
|
||||
private final DeletionService delService;
|
||||
private final SortedMap<LocalizedResource,LocalResourcesTracker> retain;
|
||||
|
||||
ResourceRetentionSet(DeletionService delService, long targetSize) {
|
||||
this(delService, targetSize, new LRUComparator());
|
||||
}
|
||||
|
||||
ResourceRetentionSet(DeletionService delService, long targetSize,
|
||||
Comparator<? super LocalizedResource> cmp) {
|
||||
this(delService, targetSize,
|
||||
new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp));
|
||||
}
|
||||
|
||||
ResourceRetentionSet(DeletionService delService, long targetSize,
|
||||
SortedMap<LocalizedResource,LocalResourcesTracker> retain) {
|
||||
this.retain = retain;
|
||||
this.delService = delService;
|
||||
this.targetSize = targetSize;
|
||||
}
|
||||
|
||||
public void addResources(LocalResourcesTracker newTracker) {
|
||||
for (LocalizedResource resource : newTracker) {
|
||||
currentSize += resource.getSize();
|
||||
if (resource.getRefCount() > 0) {
|
||||
// always retain resources in use
|
||||
continue;
|
||||
}
|
||||
retain.put(resource, newTracker);
|
||||
}
|
||||
for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
|
||||
retain.entrySet().iterator();
|
||||
currentSize - delSize > targetSize && i.hasNext();) {
|
||||
Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
|
||||
LocalizedResource resource = rsrc.getKey();
|
||||
LocalResourcesTracker tracker = rsrc.getValue();
|
||||
if (tracker.remove(resource, delService)) {
|
||||
delSize += resource.getSize();
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Cache: ").append(currentSize).append(", ");
|
||||
sb.append("Deleted: ").append(delSize);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
static class LRUComparator implements Comparator<LocalizedResource> {
|
||||
public int compare(LocalizedResource r1, LocalizedResource r2) {
|
||||
long ret = r1.getTimestamp() - r2.getTimestamp();
|
||||
if (0 == ret) {
|
||||
return System.identityHashCode(r1) - System.identityHashCode(r2);
|
||||
}
|
||||
return ret > 0 ? 1 : -1;
|
||||
}
|
||||
public boolean equals(Object other) {
|
||||
return this == other;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,235 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class tests the clean up of local caches the node manager uses for the
|
||||
* purpose of resource localization.
|
||||
*/
|
||||
public class TestLocalCacheCleanup {
|
||||
|
||||
@Test
|
||||
public void testBasicCleanup() {
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
addResource(publicRsrc, "/pub-resource1.txt", 5, 20, 0);
|
||||
addResource(publicRsrc, "/pub-resource2.txt", 3, 20, 0);
|
||||
addResource(publicRsrc, "/pub-resource3.txt", 15, 20, 0);
|
||||
|
||||
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
|
||||
new ConcurrentHashMap<String, LocalResourcesTracker>();
|
||||
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
addResource(user1rsrcs, "/private-u1-resource4.txt", 1, 20, 0);
|
||||
LocalResourcesTracker user1Tracker =
|
||||
new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
|
||||
privateRsrc.put("user1", user1Tracker);
|
||||
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
addResource(user2rsrcs, "/private-u2-resource5.txt", 2, 20, 0);
|
||||
LocalResourcesTracker user2Tracker =
|
||||
new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
|
||||
privateRsrc.put("user2", user2Tracker);
|
||||
|
||||
ResourceLocalizationService rls =
|
||||
createLocService(publicRsrc, privateRsrc, 0);
|
||||
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
|
||||
assertEquals(0, ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc)
|
||||
.getLocalRsrc().size());
|
||||
assertEquals(0,
|
||||
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
|
||||
.getLocalRsrc().size());
|
||||
assertEquals(0,
|
||||
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
|
||||
.getLocalRsrc().size());
|
||||
assertEquals(100, stats.getTotalDelSize());
|
||||
assertEquals(60, stats.getPublicDelSize());
|
||||
assertEquals(40, stats.getPrivateDelSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPositiveRefCount() {
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
// Oldest resource with a positive ref count the other with a ref count
|
||||
// equal to 0.
|
||||
LocalResourceRequest survivor =
|
||||
addResource(publicRsrc, "/pub-resource1.txt", 1, 20, 1);
|
||||
addResource(publicRsrc, "/pub-resource2.txt", 5, 20, 0);
|
||||
|
||||
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
|
||||
new ConcurrentHashMap<String, LocalResourcesTracker>();
|
||||
|
||||
ResourceLocalizationService rls =
|
||||
createLocService(publicRsrc, privateRsrc, 0);
|
||||
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
|
||||
StubbedLocalResourcesTrackerImpl resources =
|
||||
(StubbedLocalResourcesTrackerImpl) rls.publicRsrc;
|
||||
assertEquals(1, resources.getLocalRsrc().size());
|
||||
assertTrue(resources.getLocalRsrc().containsKey(survivor));
|
||||
assertEquals(20, stats.getTotalDelSize());
|
||||
assertEquals(20, stats.getPublicDelSize());
|
||||
assertEquals(0, stats.getPrivateDelSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLRUAcrossTrackers() {
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
LocalResourceRequest pubSurviver1 =
|
||||
addResource(publicRsrc, "/pub-resource1.txt", 8, 20, 0);
|
||||
LocalResourceRequest pubSurviver2 =
|
||||
addResource(publicRsrc, "/pub-resource2.txt", 7, 20, 0);
|
||||
addResource(publicRsrc, "/pub-resource3.txt", 1, 20, 0);
|
||||
|
||||
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
|
||||
new ConcurrentHashMap<String, LocalResourcesTracker>();
|
||||
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
LocalResourceRequest usr1Surviver1 =
|
||||
addResource(user1rsrcs, "/private-u1-resource1.txt", 6, 20, 0);
|
||||
addResource(user1rsrcs, "/private-u1-resource2.txt", 2, 20, 0);
|
||||
LocalResourcesTracker user1Tracker =
|
||||
new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
|
||||
privateRsrc.put("user1", user1Tracker);
|
||||
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
LocalResourceRequest usr2Surviver1 =
|
||||
addResource(user2rsrcs, "/private-u2-resource1.txt", 5, 20, 0);
|
||||
addResource(user2rsrcs, "/private-u2-resource2.txt", 3, 20, 0);
|
||||
addResource(user2rsrcs, "/private-u2-resource3.txt", 4, 20, 0);
|
||||
LocalResourcesTracker user2Tracker =
|
||||
new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
|
||||
privateRsrc.put("user2", user2Tracker);
|
||||
|
||||
ResourceLocalizationService rls =
|
||||
createLocService(publicRsrc, privateRsrc, 80);
|
||||
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
|
||||
|
||||
Map<LocalResourceRequest, LocalizedResource> pubLocalRsrc =
|
||||
((StubbedLocalResourcesTrackerImpl) rls.publicRsrc).getLocalRsrc();
|
||||
assertEquals(2, pubLocalRsrc.size());
|
||||
assertTrue(pubLocalRsrc.containsKey(pubSurviver1));
|
||||
assertTrue(pubLocalRsrc.containsKey(pubSurviver2));
|
||||
|
||||
Map<LocalResourceRequest, LocalizedResource> usr1LocalRsrc =
|
||||
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
|
||||
.getLocalRsrc();
|
||||
assertEquals(1, usr1LocalRsrc.size());
|
||||
assertTrue(usr1LocalRsrc.containsKey(usr1Surviver1));
|
||||
|
||||
Map<LocalResourceRequest, LocalizedResource> usr2LocalRsrc =
|
||||
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
|
||||
.getLocalRsrc();
|
||||
assertEquals(1, usr2LocalRsrc.size());
|
||||
assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
|
||||
|
||||
assertEquals(80, stats.getTotalDelSize());
|
||||
assertEquals(20, stats.getPublicDelSize());
|
||||
assertEquals(60, stats.getPrivateDelSize());
|
||||
}
|
||||
|
||||
private ResourceLocalizationService createLocService(
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrcs,
|
||||
ConcurrentMap<String, LocalResourcesTracker> privateRsrcs,
|
||||
long targetCacheSize) {
|
||||
Context mockedContext = mock(Context.class);
|
||||
when(mockedContext.getNMStateStore()).thenReturn(null);
|
||||
ResourceLocalizationService rls =
|
||||
new ResourceLocalizationService(null, null, null, null, mockedContext);
|
||||
// We set the following members directly so we don't have to deal with
|
||||
// mocking out the service init method.
|
||||
rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);
|
||||
rls.cacheTargetSize = targetCacheSize;
|
||||
rls.privateRsrc.putAll(privateRsrcs);
|
||||
return rls;
|
||||
}
|
||||
|
||||
private LocalResourceRequest addResource(
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> resources,
|
||||
String path, long timestamp, long size, int refCount) {
|
||||
LocalResourceRequest request = createLocalResourceRequest(path, timestamp);
|
||||
LocalizedResource resource =
|
||||
createLocalizedResource(size, refCount, timestamp, request);
|
||||
resources.put(request, resource);
|
||||
return request;
|
||||
}
|
||||
|
||||
private LocalResourceRequest createLocalResourceRequest(String path,
|
||||
long timestamp) {
|
||||
return new LocalResourceRequest(new Path(path), timestamp,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
|
||||
}
|
||||
|
||||
private LocalizedResource createLocalizedResource(long size, int refCount,
|
||||
long timestamp, LocalResourceRequest req) {
|
||||
LocalizedResource lr = mock(LocalizedResource.class);
|
||||
when(lr.getSize()).thenReturn(size);
|
||||
when(lr.getRefCount()).thenReturn(refCount);
|
||||
when(lr.getTimestamp()).thenReturn(timestamp);
|
||||
when(lr.getState()).thenReturn(ResourceState.LOCALIZED);
|
||||
when(lr.getRequest()).thenReturn(req);
|
||||
return lr;
|
||||
}
|
||||
|
||||
class StubbedLocalResourcesTrackerImpl extends LocalResourcesTrackerImpl {
|
||||
StubbedLocalResourcesTrackerImpl(String user,
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> rsrcs) {
|
||||
super(user, null, null, rsrcs, false, new Configuration(), null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(LocalizedResource rem, DeletionService delService) {
|
||||
LocalizedResource r = localrsrc.remove(rem.getRequest());
|
||||
if (r != null) {
|
||||
LOG.info("Removed " + rem.getRequest().getPath()
|
||||
+ " from localized cache");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Map<LocalResourceRequest, LocalizedResource> getLocalRsrc() {
|
||||
return Collections.unmodifiableMap(localrsrc);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,106 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestResourceRetention {
|
||||
|
||||
@Test
|
||||
public void testRsrcUnused() {
|
||||
DeletionService delService = mock(DeletionService.class);
|
||||
long TARGET_MB = 10 << 20;
|
||||
ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB);
|
||||
// 3MB files @{10, 15}
|
||||
LocalResourcesTracker pubTracker =
|
||||
createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5);
|
||||
// 1MB files @{3, 6, 9, 12}
|
||||
LocalResourcesTracker trackerA =
|
||||
createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3);
|
||||
// 4MB file @{1}
|
||||
LocalResourcesTracker trackerB =
|
||||
createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5);
|
||||
// 2MB files @{7, 9, 11}
|
||||
LocalResourcesTracker trackerC =
|
||||
createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2);
|
||||
// Total cache: 20MB; verify removed at least 10MB
|
||||
rss.addResources(pubTracker);
|
||||
rss.addResources(trackerA);
|
||||
rss.addResources(trackerB);
|
||||
rss.addResources(trackerC);
|
||||
long deleted = 0L;
|
||||
ArgumentCaptor<LocalizedResource> captor =
|
||||
ArgumentCaptor.forClass(LocalizedResource.class);
|
||||
verify(pubTracker, atMost(2))
|
||||
.remove(captor.capture(), isA(DeletionService.class));
|
||||
verify(trackerA, atMost(4))
|
||||
.remove(captor.capture(), isA(DeletionService.class));
|
||||
verify(trackerB, atMost(1))
|
||||
.remove(captor.capture(), isA(DeletionService.class));
|
||||
verify(trackerC, atMost(3))
|
||||
.remove(captor.capture(), isA(DeletionService.class));
|
||||
for (LocalizedResource rem : captor.getAllValues()) {
|
||||
deleted += rem.getSize();
|
||||
}
|
||||
assertTrue(deleted >= 10 * 1024 * 1024);
|
||||
assertTrue(deleted < 15 * 1024 * 1024);
|
||||
}
|
||||
|
||||
LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
|
||||
long nRsrcs, long timestamp, long tsstep) {
|
||||
Configuration conf = new Configuration();
|
||||
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
|
||||
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
|
||||
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
|
||||
null, trackerResources, false, conf, new NMNullStateStoreService(),null));
|
||||
for (int i = 0; i < nRsrcs; ++i) {
|
||||
final LocalResourceRequest req = new LocalResourceRequest(
|
||||
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
|
||||
final long ts = timestamp + i * tsstep;
|
||||
final Path p = new Path("file:///local/" + user + "/rsrc" + i);
|
||||
LocalizedResource rsrc = new LocalizedResource(req, null) {
|
||||
@Override public int getRefCount() { return 0; }
|
||||
@Override public long getSize() { return rsrcSize; }
|
||||
@Override public Path getLocalPath() { return p; }
|
||||
@Override public long getTimestamp() { return ts; }
|
||||
@Override
|
||||
public ResourceState getState() { return ResourceState.LOCALIZED; }
|
||||
};
|
||||
trackerResources.put(req, rsrc);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue