YARN-5767. Fix the order that resources are cleaned up from the local Public/Private caches. Contributed by Chris Trezzo

(cherry picked from commit 1b79c417dc)
This commit is contained in:
Jason Lowe 2016-10-28 15:58:04 +00:00
parent 2ab80a148a
commit ea9a1be109
6 changed files with 440 additions and 219 deletions

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
/**
* A class responsible for cleaning the PUBLIC and PRIVATE local caches on a
* node manager.
*/
class LocalCacheCleaner {
private long currentSize;
private final long targetSize;
private final DeletionService delService;
private final SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap;
LocalCacheCleaner(DeletionService delService, long targetSize) {
this(delService, targetSize, new LRUComparator());
}
LocalCacheCleaner(DeletionService delService, long targetSize,
Comparator<? super LocalizedResource> cmp) {
this(delService, targetSize,
new TreeMap<LocalizedResource, LocalResourcesTracker>(cmp));
}
LocalCacheCleaner(DeletionService delService, long targetSize,
SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap) {
this.resourceMap = resourceMap;
this.delService = delService;
this.targetSize = targetSize;
}
/**
* Adds resources from the passed LocalResourceTracker that are candidates for
* deletion from the cache.
*
* @param newTracker add all resources being tracked by the passed
* LocalResourcesTracker to the LocalCacheCleaner.
*/
public void addResources(LocalResourcesTracker newTracker) {
for (LocalizedResource resource : newTracker) {
currentSize += resource.getSize();
if (resource.getRefCount() > 0) {
// Do not delete resources that are still in use
continue;
}
resourceMap.put(resource, newTracker);
}
}
/**
* Delete resources from the cache in the sorted order generated by the
* Comparator used to construct this class.
*
* @return stats about what was cleaned up during this call of cleanCache
*/
public LocalCacheCleanerStats cleanCache() {
LocalCacheCleanerStats stats = new LocalCacheCleanerStats(currentSize);
for (Iterator<Map.Entry<LocalizedResource, LocalResourcesTracker>> i =
resourceMap.entrySet().iterator();
currentSize - stats.totalDelSize > targetSize && i.hasNext();) {
Map.Entry<LocalizedResource, LocalResourcesTracker> rsrc = i.next();
LocalizedResource resource = rsrc.getKey();
LocalResourcesTracker tracker = rsrc.getValue();
if (tracker.remove(resource, delService)) {
stats.incDelSize(tracker.getUser(), resource.getSize());
}
}
this.resourceMap.clear();
return stats;
}
static class LocalCacheCleanerStats {
private final Map<String, Long> userDelSizes = new TreeMap<String, Long>();
private final long cacheSizeBeforeClean;
private long totalDelSize;
private long publicDelSize;
private long privateDelSize;
LocalCacheCleanerStats(long cacheSizeBeforeClean) {
this.cacheSizeBeforeClean = cacheSizeBeforeClean;
}
void incDelSize(String user, long delSize) {
totalDelSize += delSize;
if (user == null) {
publicDelSize += delSize;
} else {
privateDelSize += delSize;
Long userDel = userDelSizes.get(user);
if (userDel != null) {
userDel += delSize;
userDelSizes.put(user, userDel);
} else {
userDelSizes.put(user, delSize);
}
}
}
Map<String, Long> getUserDelSizes() {
return Collections.unmodifiableMap(userDelSizes);
}
long getCacheSizeBeforeClean() {
return cacheSizeBeforeClean;
}
long getTotalDelSize() {
return totalDelSize;
}
long getPublicDelSize() {
return publicDelSize;
}
long getPrivateDelSize() {
return privateDelSize;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Cache Size Before Clean: ").append(cacheSizeBeforeClean)
.append(", ");
sb.append("Total Deleted: ").append(totalDelSize).append(", ");
sb.append("Public Deleted: ").append(publicDelSize).append(", ");
sb.append("Private Deleted: ").append(privateDelSize);
return sb.toString();
}
public String toStringDetailed() {
StringBuilder sb = new StringBuilder();
sb.append(this.toString());
sb.append(", Private Deleted Detail: {");
for (Map.Entry<String, Long> e : userDelSizes.entrySet()) {
sb.append(" ").append(e.getKey()).append(":").append(e.getValue());
}
sb.append(" }");
return sb.toString();
}
}
private static class LRUComparator implements Comparator<LocalizedResource>,
Serializable {
private static final long serialVersionUID = 7034380228434701685L;
public int compare(LocalizedResource r1, LocalizedResource r2) {
long ret = r1.getTimestamp() - r2.getTimestamp();
if (0 == ret) {
return System.identityHashCode(r1) - System.identityHashCode(r2);
}
return ret > 0 ? 1 : -1;
}
}
}

View File

@ -65,7 +65,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
private final String user;
private final ApplicationId appId;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
@VisibleForTesting
final ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc;
private Configuration conf;
private LocalDirsHandlerService dirsHandler;
/*

View File

@ -113,6 +113,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
@ -152,7 +153,8 @@ public class ResourceLocalizationService extends CompositeService
private Server server;
private InetSocketAddress localizationServerAddress;
private long cacheTargetSize;
@VisibleForTesting
long cacheTargetSize;
private long cacheCleanupPeriod;
private final ContainerExecutor exec;
@ -164,7 +166,8 @@ public class ResourceLocalizationService extends CompositeService
private LocalizerTokenSecretManager secretManager;
private NMStateStoreService stateStore;
private LocalResourcesTracker publicRsrc;
@VisibleForTesting
LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
private DirsChangeListener localDirsChangeListener;
@ -176,7 +179,8 @@ public class ResourceLocalizationService extends CompositeService
* Map of LocalResourceTrackers keyed by username, for private
* resources.
*/
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
@VisibleForTesting
final ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>();
/**
@ -427,7 +431,7 @@ public class ResourceLocalizationService extends CompositeService
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
break;
case CACHE_CLEANUP:
handleCacheCleanup(event);
handleCacheCleanup();
break;
case CLEANUP_CONTAINER_RESOURCES:
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
@ -512,20 +516,21 @@ public class ResourceLocalizationService extends CompositeService
localizerTracker.endContainerLocalization(locId);
}
private void handleCacheCleanup(LocalizationEvent event) {
ResourceRetentionSet retain =
new ResourceRetentionSet(delService, cacheTargetSize);
retain.addResources(publicRsrc);
if (LOG.isDebugEnabled()) {
LOG.debug("Resource cleanup (public) " + retain);
}
@VisibleForTesting
LocalCacheCleanerStats handleCacheCleanup() {
LocalCacheCleaner cleaner =
new LocalCacheCleaner(delService, cacheTargetSize);
cleaner.addResources(publicRsrc);
for (LocalResourcesTracker t : privateRsrc.values()) {
retain.addResources(t);
cleaner.addResources(t);
}
LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache();
if (LOG.isDebugEnabled()) {
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
LOG.debug(stats.toStringDetailed());
} else if (LOG.isInfoEnabled()) {
LOG.info(stats.toString());
}
}
//TODO Check if appRsrcs should also be added to the retention set.
return stats;
}

View File

@ -1,96 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
public class ResourceRetentionSet {
private long delSize;
private long currentSize;
private final long targetSize;
private final DeletionService delService;
private final SortedMap<LocalizedResource,LocalResourcesTracker> retain;
ResourceRetentionSet(DeletionService delService, long targetSize) {
this(delService, targetSize, new LRUComparator());
}
ResourceRetentionSet(DeletionService delService, long targetSize,
Comparator<? super LocalizedResource> cmp) {
this(delService, targetSize,
new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp));
}
ResourceRetentionSet(DeletionService delService, long targetSize,
SortedMap<LocalizedResource,LocalResourcesTracker> retain) {
this.retain = retain;
this.delService = delService;
this.targetSize = targetSize;
}
public void addResources(LocalResourcesTracker newTracker) {
for (LocalizedResource resource : newTracker) {
currentSize += resource.getSize();
if (resource.getRefCount() > 0) {
// always retain resources in use
continue;
}
retain.put(resource, newTracker);
}
for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
retain.entrySet().iterator();
currentSize - delSize > targetSize && i.hasNext();) {
Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
LocalizedResource resource = rsrc.getKey();
LocalResourcesTracker tracker = rsrc.getValue();
if (tracker.remove(resource, delService)) {
delSize += resource.getSize();
i.remove();
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Cache: ").append(currentSize).append(", ");
sb.append("Deleted: ").append(delSize);
return sb.toString();
}
static class LRUComparator implements Comparator<LocalizedResource> {
public int compare(LocalizedResource r1, LocalizedResource r2) {
long ret = r1.getTimestamp() - r2.getTimestamp();
if (0 == ret) {
return System.identityHashCode(r1) - System.identityHashCode(r2);
}
return ret > 0 ? 1 : -1;
}
public boolean equals(Object other) {
return this == other;
}
}
}

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.junit.Test;
/**
* This class tests the clean up of local caches the node manager uses for the
* purpose of resource localization.
*/
public class TestLocalCacheCleanup {
@Test
public void testBasicCleanup() {
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
addResource(publicRsrc, "/pub-resource1.txt", 5, 20, 0);
addResource(publicRsrc, "/pub-resource2.txt", 3, 20, 0);
addResource(publicRsrc, "/pub-resource3.txt", 15, 20, 0);
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String, LocalResourcesTracker>();
ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
addResource(user1rsrcs, "/private-u1-resource4.txt", 1, 20, 0);
LocalResourcesTracker user1Tracker =
new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
privateRsrc.put("user1", user1Tracker);
ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
addResource(user2rsrcs, "/private-u2-resource5.txt", 2, 20, 0);
LocalResourcesTracker user2Tracker =
new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
privateRsrc.put("user2", user2Tracker);
ResourceLocalizationService rls =
createLocService(publicRsrc, privateRsrc, 0);
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
assertEquals(0, ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc)
.getLocalRsrc().size());
assertEquals(0,
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
.getLocalRsrc().size());
assertEquals(0,
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
.getLocalRsrc().size());
assertEquals(100, stats.getTotalDelSize());
assertEquals(60, stats.getPublicDelSize());
assertEquals(40, stats.getPrivateDelSize());
}
@Test
public void testPositiveRefCount() {
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
// Oldest resource with a positive ref count the other with a ref count
// equal to 0.
LocalResourceRequest survivor =
addResource(publicRsrc, "/pub-resource1.txt", 1, 20, 1);
addResource(publicRsrc, "/pub-resource2.txt", 5, 20, 0);
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String, LocalResourcesTracker>();
ResourceLocalizationService rls =
createLocService(publicRsrc, privateRsrc, 0);
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
StubbedLocalResourcesTrackerImpl resources =
(StubbedLocalResourcesTrackerImpl) rls.publicRsrc;
assertEquals(1, resources.getLocalRsrc().size());
assertTrue(resources.getLocalRsrc().containsKey(survivor));
assertEquals(20, stats.getTotalDelSize());
assertEquals(20, stats.getPublicDelSize());
assertEquals(0, stats.getPrivateDelSize());
}
@Test
public void testLRUAcrossTrackers() {
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourceRequest pubSurviver1 =
addResource(publicRsrc, "/pub-resource1.txt", 8, 20, 0);
LocalResourceRequest pubSurviver2 =
addResource(publicRsrc, "/pub-resource2.txt", 7, 20, 0);
addResource(publicRsrc, "/pub-resource3.txt", 1, 20, 0);
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String, LocalResourcesTracker>();
ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourceRequest usr1Surviver1 =
addResource(user1rsrcs, "/private-u1-resource1.txt", 6, 20, 0);
addResource(user1rsrcs, "/private-u1-resource2.txt", 2, 20, 0);
LocalResourcesTracker user1Tracker =
new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
privateRsrc.put("user1", user1Tracker);
ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourceRequest usr2Surviver1 =
addResource(user2rsrcs, "/private-u2-resource1.txt", 5, 20, 0);
addResource(user2rsrcs, "/private-u2-resource2.txt", 3, 20, 0);
addResource(user2rsrcs, "/private-u2-resource3.txt", 4, 20, 0);
LocalResourcesTracker user2Tracker =
new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
privateRsrc.put("user2", user2Tracker);
ResourceLocalizationService rls =
createLocService(publicRsrc, privateRsrc, 80);
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
Map<LocalResourceRequest, LocalizedResource> pubLocalRsrc =
((StubbedLocalResourcesTrackerImpl) rls.publicRsrc).getLocalRsrc();
assertEquals(2, pubLocalRsrc.size());
assertTrue(pubLocalRsrc.containsKey(pubSurviver1));
assertTrue(pubLocalRsrc.containsKey(pubSurviver2));
Map<LocalResourceRequest, LocalizedResource> usr1LocalRsrc =
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
.getLocalRsrc();
assertEquals(1, usr1LocalRsrc.size());
assertTrue(usr1LocalRsrc.containsKey(usr1Surviver1));
Map<LocalResourceRequest, LocalizedResource> usr2LocalRsrc =
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
.getLocalRsrc();
assertEquals(1, usr2LocalRsrc.size());
assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
assertEquals(80, stats.getTotalDelSize());
assertEquals(20, stats.getPublicDelSize());
assertEquals(60, stats.getPrivateDelSize());
}
private ResourceLocalizationService createLocService(
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrcs,
ConcurrentMap<String, LocalResourcesTracker> privateRsrcs,
long targetCacheSize) {
Context mockedContext = mock(Context.class);
when(mockedContext.getNMStateStore()).thenReturn(null);
ResourceLocalizationService rls =
new ResourceLocalizationService(null, null, null, null, mockedContext);
// We set the following members directly so we don't have to deal with
// mocking out the service init method.
rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);
rls.cacheTargetSize = targetCacheSize;
rls.privateRsrc.putAll(privateRsrcs);
return rls;
}
private LocalResourceRequest addResource(
ConcurrentMap<LocalResourceRequest, LocalizedResource> resources,
String path, long timestamp, long size, int refCount) {
LocalResourceRequest request = createLocalResourceRequest(path, timestamp);
LocalizedResource resource =
createLocalizedResource(size, refCount, timestamp, request);
resources.put(request, resource);
return request;
}
private LocalResourceRequest createLocalResourceRequest(String path,
long timestamp) {
return new LocalResourceRequest(new Path(path), timestamp,
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
}
private LocalizedResource createLocalizedResource(long size, int refCount,
long timestamp, LocalResourceRequest req) {
LocalizedResource lr = mock(LocalizedResource.class);
when(lr.getSize()).thenReturn(size);
when(lr.getRefCount()).thenReturn(refCount);
when(lr.getTimestamp()).thenReturn(timestamp);
when(lr.getState()).thenReturn(ResourceState.LOCALIZED);
when(lr.getRequest()).thenReturn(req);
return lr;
}
class StubbedLocalResourcesTrackerImpl extends LocalResourcesTrackerImpl {
StubbedLocalResourcesTrackerImpl(String user,
ConcurrentMap<LocalResourceRequest, LocalizedResource> rsrcs) {
super(user, null, null, rsrcs, false, new Configuration(), null, null);
}
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
LocalizedResource r = localrsrc.remove(rem.getRequest());
if (r != null) {
LOG.info("Removed " + rem.getRequest().getPath()
+ " from localized cache");
return true;
}
return false;
}
Map<LocalResourceRequest, LocalizedResource> getLocalRsrc() {
return Collections.unmodifiableMap(localrsrc);
}
}
}

View File

@ -1,106 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentCaptor;
import static org.mockito.Mockito.*;
public class TestResourceRetention {
@Test
public void testRsrcUnused() {
DeletionService delService = mock(DeletionService.class);
long TARGET_MB = 10 << 20;
ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB);
// 3MB files @{10, 15}
LocalResourcesTracker pubTracker =
createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5);
// 1MB files @{3, 6, 9, 12}
LocalResourcesTracker trackerA =
createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3);
// 4MB file @{1}
LocalResourcesTracker trackerB =
createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5);
// 2MB files @{7, 9, 11}
LocalResourcesTracker trackerC =
createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2);
// Total cache: 20MB; verify removed at least 10MB
rss.addResources(pubTracker);
rss.addResources(trackerA);
rss.addResources(trackerB);
rss.addResources(trackerC);
long deleted = 0L;
ArgumentCaptor<LocalizedResource> captor =
ArgumentCaptor.forClass(LocalizedResource.class);
verify(pubTracker, atMost(2))
.remove(captor.capture(), isA(DeletionService.class));
verify(trackerA, atMost(4))
.remove(captor.capture(), isA(DeletionService.class));
verify(trackerB, atMost(1))
.remove(captor.capture(), isA(DeletionService.class));
verify(trackerC, atMost(3))
.remove(captor.capture(), isA(DeletionService.class));
for (LocalizedResource rem : captor.getAllValues()) {
deleted += rem.getSize();
}
assertTrue(deleted >= 10 * 1024 * 1024);
assertTrue(deleted < 15 * 1024 * 1024);
}
LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
long nRsrcs, long timestamp, long tsstep) {
Configuration conf = new Configuration();
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
null, trackerResources, false, conf, new NMNullStateStoreService(),null));
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
final long ts = timestamp + i * tsstep;
final Path p = new Path("file:///local/" + user + "/rsrc" + i);
LocalizedResource rsrc = new LocalizedResource(req, null) {
@Override public int getRefCount() { return 0; }
@Override public long getSize() { return rsrcSize; }
@Override public Path getLocalPath() { return p; }
@Override public long getTimestamp() { return ts; }
@Override
public ResourceState getState() { return ResourceState.LOCALIZED; }
};
trackerResources.put(req, rsrc);
}
return ret;
}
}