diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java new file mode 100644 index 00000000000..731191963c7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; + +/** + * A class responsible for cleaning the PUBLIC and PRIVATE local caches on a + * node manager. + */ +class LocalCacheCleaner { + + private long currentSize; + private final long targetSize; + private final DeletionService delService; + private final SortedMap resourceMap; + + LocalCacheCleaner(DeletionService delService, long targetSize) { + this(delService, targetSize, new LRUComparator()); + } + + LocalCacheCleaner(DeletionService delService, long targetSize, + Comparator cmp) { + this(delService, targetSize, + new TreeMap(cmp)); + } + + LocalCacheCleaner(DeletionService delService, long targetSize, + SortedMap resourceMap) { + this.resourceMap = resourceMap; + this.delService = delService; + this.targetSize = targetSize; + } + + /** + * Adds resources from the passed LocalResourceTracker that are candidates for + * deletion from the cache. + * + * @param newTracker add all resources being tracked by the passed + * LocalResourcesTracker to the LocalCacheCleaner. + */ + public void addResources(LocalResourcesTracker newTracker) { + for (LocalizedResource resource : newTracker) { + currentSize += resource.getSize(); + if (resource.getRefCount() > 0) { + // Do not delete resources that are still in use + continue; + } + resourceMap.put(resource, newTracker); + } + } + + /** + * Delete resources from the cache in the sorted order generated by the + * Comparator used to construct this class. + * + * @return stats about what was cleaned up during this call of cleanCache + */ + public LocalCacheCleanerStats cleanCache() { + LocalCacheCleanerStats stats = new LocalCacheCleanerStats(currentSize); + for (Iterator> i = + resourceMap.entrySet().iterator(); + currentSize - stats.totalDelSize > targetSize && i.hasNext();) { + Map.Entry rsrc = i.next(); + LocalizedResource resource = rsrc.getKey(); + LocalResourcesTracker tracker = rsrc.getValue(); + if (tracker.remove(resource, delService)) { + stats.incDelSize(tracker.getUser(), resource.getSize()); + } + } + this.resourceMap.clear(); + return stats; + } + + static class LocalCacheCleanerStats { + private final Map userDelSizes = new TreeMap(); + private final long cacheSizeBeforeClean; + private long totalDelSize; + private long publicDelSize; + private long privateDelSize; + + LocalCacheCleanerStats(long cacheSizeBeforeClean) { + this.cacheSizeBeforeClean = cacheSizeBeforeClean; + } + + void incDelSize(String user, long delSize) { + totalDelSize += delSize; + if (user == null) { + publicDelSize += delSize; + } else { + privateDelSize += delSize; + Long userDel = userDelSizes.get(user); + if (userDel != null) { + userDel += delSize; + userDelSizes.put(user, userDel); + } else { + userDelSizes.put(user, delSize); + } + } + } + + Map getUserDelSizes() { + return Collections.unmodifiableMap(userDelSizes); + } + + long getCacheSizeBeforeClean() { + return cacheSizeBeforeClean; + } + + long getTotalDelSize() { + return totalDelSize; + } + + long getPublicDelSize() { + return publicDelSize; + } + + long getPrivateDelSize() { + return privateDelSize; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Cache Size Before Clean: ").append(cacheSizeBeforeClean) + .append(", "); + sb.append("Total Deleted: ").append(totalDelSize).append(", "); + sb.append("Public Deleted: ").append(publicDelSize).append(", "); + sb.append("Private Deleted: ").append(privateDelSize); + return sb.toString(); + } + + public String toStringDetailed() { + StringBuilder sb = new StringBuilder(); + sb.append(this.toString()); + sb.append(", Private Deleted Detail: {"); + for (Map.Entry e : userDelSizes.entrySet()) { + sb.append(" ").append(e.getKey()).append(":").append(e.getValue()); + } + sb.append(" }"); + return sb.toString(); + } + } + + private static class LRUComparator implements Comparator, + Serializable { + + private static final long serialVersionUID = 7034380228434701685L; + + public int compare(LocalizedResource r1, LocalizedResource r2) { + long ret = r1.getTimestamp() - r2.getTimestamp(); + if (0 == ret) { + return System.identityHashCode(r1) - System.identityHashCode(r2); + } + return ret > 0 ? 1 : -1; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 38fffe6c6cf..940c5992042 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -65,7 +65,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { private final String user; private final ApplicationId appId; private final Dispatcher dispatcher; - private final ConcurrentMap localrsrc; + @VisibleForTesting + final ConcurrentMap localrsrc; private Configuration conf; private LocalDirsHandlerService dirsHandler; /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4bd004b83a7..4cd1acc5cc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -113,6 +113,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; @@ -152,7 +153,8 @@ public class ResourceLocalizationService extends CompositeService private Server server; private InetSocketAddress localizationServerAddress; - private long cacheTargetSize; + @VisibleForTesting + long cacheTargetSize; private long cacheCleanupPeriod; private final ContainerExecutor exec; @@ -164,7 +166,8 @@ public class ResourceLocalizationService extends CompositeService private LocalizerTokenSecretManager secretManager; private NMStateStoreService stateStore; - private LocalResourcesTracker publicRsrc; + @VisibleForTesting + LocalResourcesTracker publicRsrc; private LocalDirsHandlerService dirsHandler; private DirsChangeListener localDirsChangeListener; @@ -176,7 +179,8 @@ public class ResourceLocalizationService extends CompositeService * Map of LocalResourceTrackers keyed by username, for private * resources. */ - private final ConcurrentMap privateRsrc = + @VisibleForTesting + final ConcurrentMap privateRsrc = new ConcurrentHashMap(); /** @@ -427,7 +431,7 @@ public void handle(LocalizationEvent event) { handleContainerResourcesLocalized((ContainerLocalizationEvent) event); break; case CACHE_CLEANUP: - handleCacheCleanup(event); + handleCacheCleanup(); break; case CLEANUP_CONTAINER_RESOURCES: handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event); @@ -512,20 +516,21 @@ private void handleContainerResourcesLocalized( localizerTracker.endContainerLocalization(locId); } - private void handleCacheCleanup(LocalizationEvent event) { - ResourceRetentionSet retain = - new ResourceRetentionSet(delService, cacheTargetSize); - retain.addResources(publicRsrc); - if (LOG.isDebugEnabled()) { - LOG.debug("Resource cleanup (public) " + retain); - } + @VisibleForTesting + LocalCacheCleanerStats handleCacheCleanup() { + LocalCacheCleaner cleaner = + new LocalCacheCleaner(delService, cacheTargetSize); + cleaner.addResources(publicRsrc); for (LocalResourcesTracker t : privateRsrc.values()) { - retain.addResources(t); - if (LOG.isDebugEnabled()) { - LOG.debug("Resource cleanup " + t.getUser() + ":" + retain); - } + cleaner.addResources(t); } - //TODO Check if appRsrcs should also be added to the retention set. + LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache(); + if (LOG.isDebugEnabled()) { + LOG.debug(stats.toStringDetailed()); + } else if (LOG.isInfoEnabled()) { + LOG.info(stats.toString()); + } + return stats; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java deleted file mode 100644 index 447a7924a2f..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; - -public class ResourceRetentionSet { - - private long delSize; - private long currentSize; - private final long targetSize; - private final DeletionService delService; - private final SortedMap retain; - - ResourceRetentionSet(DeletionService delService, long targetSize) { - this(delService, targetSize, new LRUComparator()); - } - - ResourceRetentionSet(DeletionService delService, long targetSize, - Comparator cmp) { - this(delService, targetSize, - new TreeMap(cmp)); - } - - ResourceRetentionSet(DeletionService delService, long targetSize, - SortedMap retain) { - this.retain = retain; - this.delService = delService; - this.targetSize = targetSize; - } - - public void addResources(LocalResourcesTracker newTracker) { - for (LocalizedResource resource : newTracker) { - currentSize += resource.getSize(); - if (resource.getRefCount() > 0) { - // always retain resources in use - continue; - } - retain.put(resource, newTracker); - } - for (Iterator> i = - retain.entrySet().iterator(); - currentSize - delSize > targetSize && i.hasNext();) { - Map.Entry rsrc = i.next(); - LocalizedResource resource = rsrc.getKey(); - LocalResourcesTracker tracker = rsrc.getValue(); - if (tracker.remove(resource, delService)) { - delSize += resource.getSize(); - i.remove(); - } - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Cache: ").append(currentSize).append(", "); - sb.append("Deleted: ").append(delSize); - return sb.toString(); - } - - static class LRUComparator implements Comparator { - public int compare(LocalizedResource r1, LocalizedResource r2) { - long ret = r1.getTimestamp() - r2.getTimestamp(); - if (0 == ret) { - return System.identityHashCode(r1) - System.identityHashCode(r2); - } - return ret > 0 ? 1 : -1; - } - public boolean equals(Object other) { - return this == other; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java new file mode 100644 index 00000000000..d6db67a9db1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java @@ -0,0 +1,235 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats; +import org.junit.Test; + +/** + * This class tests the clean up of local caches the node manager uses for the + * purpose of resource localization. + */ +public class TestLocalCacheCleanup { + + @Test + public void testBasicCleanup() { + ConcurrentMap publicRsrc = + new ConcurrentHashMap(); + addResource(publicRsrc, "/pub-resource1.txt", 5, 20, 0); + addResource(publicRsrc, "/pub-resource2.txt", 3, 20, 0); + addResource(publicRsrc, "/pub-resource3.txt", 15, 20, 0); + + ConcurrentMap privateRsrc = + new ConcurrentHashMap(); + + ConcurrentMap user1rsrcs = + new ConcurrentHashMap(); + addResource(user1rsrcs, "/private-u1-resource4.txt", 1, 20, 0); + LocalResourcesTracker user1Tracker = + new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs); + privateRsrc.put("user1", user1Tracker); + + ConcurrentMap user2rsrcs = + new ConcurrentHashMap(); + addResource(user2rsrcs, "/private-u2-resource5.txt", 2, 20, 0); + LocalResourcesTracker user2Tracker = + new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs); + privateRsrc.put("user2", user2Tracker); + + ResourceLocalizationService rls = + createLocService(publicRsrc, privateRsrc, 0); + LocalCacheCleanerStats stats = rls.handleCacheCleanup(); + assertEquals(0, ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc) + .getLocalRsrc().size()); + assertEquals(0, + ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1")) + .getLocalRsrc().size()); + assertEquals(0, + ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2")) + .getLocalRsrc().size()); + assertEquals(100, stats.getTotalDelSize()); + assertEquals(60, stats.getPublicDelSize()); + assertEquals(40, stats.getPrivateDelSize()); + } + + @Test + public void testPositiveRefCount() { + ConcurrentMap publicRsrc = + new ConcurrentHashMap(); + // Oldest resource with a positive ref count the other with a ref count + // equal to 0. + LocalResourceRequest survivor = + addResource(publicRsrc, "/pub-resource1.txt", 1, 20, 1); + addResource(publicRsrc, "/pub-resource2.txt", 5, 20, 0); + + ConcurrentMap privateRsrc = + new ConcurrentHashMap(); + + ResourceLocalizationService rls = + createLocService(publicRsrc, privateRsrc, 0); + LocalCacheCleanerStats stats = rls.handleCacheCleanup(); + StubbedLocalResourcesTrackerImpl resources = + (StubbedLocalResourcesTrackerImpl) rls.publicRsrc; + assertEquals(1, resources.getLocalRsrc().size()); + assertTrue(resources.getLocalRsrc().containsKey(survivor)); + assertEquals(20, stats.getTotalDelSize()); + assertEquals(20, stats.getPublicDelSize()); + assertEquals(0, stats.getPrivateDelSize()); + } + + @Test + public void testLRUAcrossTrackers() { + ConcurrentMap publicRsrc = + new ConcurrentHashMap(); + LocalResourceRequest pubSurviver1 = + addResource(publicRsrc, "/pub-resource1.txt", 8, 20, 0); + LocalResourceRequest pubSurviver2 = + addResource(publicRsrc, "/pub-resource2.txt", 7, 20, 0); + addResource(publicRsrc, "/pub-resource3.txt", 1, 20, 0); + + ConcurrentMap privateRsrc = + new ConcurrentHashMap(); + + ConcurrentMap user1rsrcs = + new ConcurrentHashMap(); + LocalResourceRequest usr1Surviver1 = + addResource(user1rsrcs, "/private-u1-resource1.txt", 6, 20, 0); + addResource(user1rsrcs, "/private-u1-resource2.txt", 2, 20, 0); + LocalResourcesTracker user1Tracker = + new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs); + privateRsrc.put("user1", user1Tracker); + + ConcurrentMap user2rsrcs = + new ConcurrentHashMap(); + LocalResourceRequest usr2Surviver1 = + addResource(user2rsrcs, "/private-u2-resource1.txt", 5, 20, 0); + addResource(user2rsrcs, "/private-u2-resource2.txt", 3, 20, 0); + addResource(user2rsrcs, "/private-u2-resource3.txt", 4, 20, 0); + LocalResourcesTracker user2Tracker = + new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs); + privateRsrc.put("user2", user2Tracker); + + ResourceLocalizationService rls = + createLocService(publicRsrc, privateRsrc, 80); + LocalCacheCleanerStats stats = rls.handleCacheCleanup(); + + Map pubLocalRsrc = + ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc).getLocalRsrc(); + assertEquals(2, pubLocalRsrc.size()); + assertTrue(pubLocalRsrc.containsKey(pubSurviver1)); + assertTrue(pubLocalRsrc.containsKey(pubSurviver2)); + + Map usr1LocalRsrc = + ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1")) + .getLocalRsrc(); + assertEquals(1, usr1LocalRsrc.size()); + assertTrue(usr1LocalRsrc.containsKey(usr1Surviver1)); + + Map usr2LocalRsrc = + ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2")) + .getLocalRsrc(); + assertEquals(1, usr2LocalRsrc.size()); + assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1)); + + assertEquals(80, stats.getTotalDelSize()); + assertEquals(20, stats.getPublicDelSize()); + assertEquals(60, stats.getPrivateDelSize()); + } + + private ResourceLocalizationService createLocService( + ConcurrentMap publicRsrcs, + ConcurrentMap privateRsrcs, + long targetCacheSize) { + Context mockedContext = mock(Context.class); + when(mockedContext.getNMStateStore()).thenReturn(null); + ResourceLocalizationService rls = + new ResourceLocalizationService(null, null, null, null, mockedContext); + // We set the following members directly so we don't have to deal with + // mocking out the service init method. + rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs); + rls.cacheTargetSize = targetCacheSize; + rls.privateRsrc.putAll(privateRsrcs); + return rls; + } + + private LocalResourceRequest addResource( + ConcurrentMap resources, + String path, long timestamp, long size, int refCount) { + LocalResourceRequest request = createLocalResourceRequest(path, timestamp); + LocalizedResource resource = + createLocalizedResource(size, refCount, timestamp, request); + resources.put(request, resource); + return request; + } + + private LocalResourceRequest createLocalResourceRequest(String path, + long timestamp) { + return new LocalResourceRequest(new Path(path), timestamp, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null); + } + + private LocalizedResource createLocalizedResource(long size, int refCount, + long timestamp, LocalResourceRequest req) { + LocalizedResource lr = mock(LocalizedResource.class); + when(lr.getSize()).thenReturn(size); + when(lr.getRefCount()).thenReturn(refCount); + when(lr.getTimestamp()).thenReturn(timestamp); + when(lr.getState()).thenReturn(ResourceState.LOCALIZED); + when(lr.getRequest()).thenReturn(req); + return lr; + } + + class StubbedLocalResourcesTrackerImpl extends LocalResourcesTrackerImpl { + StubbedLocalResourcesTrackerImpl(String user, + ConcurrentMap rsrcs) { + super(user, null, null, rsrcs, false, new Configuration(), null, null); + } + + @Override + public boolean remove(LocalizedResource rem, DeletionService delService) { + LocalizedResource r = localrsrc.remove(rem.getRequest()); + if (r != null) { + LOG.info("Removed " + rem.getRequest().getPath() + + " from localized cache"); + return true; + } + return false; + } + + Map getLocalRsrc() { + return Collections.unmodifiableMap(localrsrc); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java deleted file mode 100644 index 81e69e210f7..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; -import org.junit.Test; - -import static org.junit.Assert.*; - -import org.mockito.ArgumentCaptor; - -import static org.mockito.Mockito.*; - -public class TestResourceRetention { - - @Test - public void testRsrcUnused() { - DeletionService delService = mock(DeletionService.class); - long TARGET_MB = 10 << 20; - ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB); - // 3MB files @{10, 15} - LocalResourcesTracker pubTracker = - createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5); - // 1MB files @{3, 6, 9, 12} - LocalResourcesTracker trackerA = - createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3); - // 4MB file @{1} - LocalResourcesTracker trackerB = - createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5); - // 2MB files @{7, 9, 11} - LocalResourcesTracker trackerC = - createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2); - // Total cache: 20MB; verify removed at least 10MB - rss.addResources(pubTracker); - rss.addResources(trackerA); - rss.addResources(trackerB); - rss.addResources(trackerC); - long deleted = 0L; - ArgumentCaptor captor = - ArgumentCaptor.forClass(LocalizedResource.class); - verify(pubTracker, atMost(2)) - .remove(captor.capture(), isA(DeletionService.class)); - verify(trackerA, atMost(4)) - .remove(captor.capture(), isA(DeletionService.class)); - verify(trackerB, atMost(1)) - .remove(captor.capture(), isA(DeletionService.class)); - verify(trackerC, atMost(3)) - .remove(captor.capture(), isA(DeletionService.class)); - for (LocalizedResource rem : captor.getAllValues()) { - deleted += rem.getSize(); - } - assertTrue(deleted >= 10 * 1024 * 1024); - assertTrue(deleted < 15 * 1024 * 1024); - } - - LocalResourcesTracker createMockTracker(String user, final long rsrcSize, - long nRsrcs, long timestamp, long tsstep) { - Configuration conf = new Configuration(); - ConcurrentMap trackerResources = - new ConcurrentHashMap(); - LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, - null, trackerResources, false, conf, new NMNullStateStoreService(),null)); - for (int i = 0; i < nRsrcs; ++i) { - final LocalResourceRequest req = new LocalResourceRequest( - new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, - LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null); - final long ts = timestamp + i * tsstep; - final Path p = new Path("file:///local/" + user + "/rsrc" + i); - LocalizedResource rsrc = new LocalizedResource(req, null) { - @Override public int getRefCount() { return 0; } - @Override public long getSize() { return rsrcSize; } - @Override public Path getLocalPath() { return p; } - @Override public long getTimestamp() { return ts; } - @Override - public ResourceState getState() { return ResourceState.LOCALIZED; } - }; - trackerResources.put(req, rsrc); - } - return ret; - } - -}