YARN-5797. Add metrics to the node manager for cleaning the PUBLIC and PRIVATE caches. (Chris Trezzo via mingma)

This commit is contained in:
Ming Ma 2017-04-06 17:08:59 -07:00
parent 0391c92022
commit db5b4c292b
8 changed files with 111 additions and 28 deletions

View File

@ -225,7 +225,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
this.metrics = metrics; this.metrics = metrics;
rsrcLocalizationSrvc = rsrcLocalizationSrvc =
createResourceLocalizationService(exec, deletionContext, context); createResourceLocalizationService(exec, deletionContext, context,
metrics);
addService(rsrcLocalizationSrvc); addService(rsrcLocalizationSrvc);
containersLauncher = createContainersLauncher(context, exec); containersLauncher = createContainersLauncher(context, exec);
@ -452,9 +453,10 @@ public ContainersMonitor getContainersMonitor() {
} }
protected ResourceLocalizationService createResourceLocalizationService( protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext, Context context) { ContainerExecutor exec, DeletionService deletionContext,
Context nmContext, NodeManagerMetrics nmMetrics) {
return new ResourceLocalizationService(this.dispatcher, exec, return new ResourceLocalizationService(this.dispatcher, exec,
deletionContext, dirsHandler, context); deletionContext, dirsHandler, nmContext, nmMetrics);
} }
protected SharedCacheUploadService createSharedCacheUploaderService() { protected SharedCacheUploadService createSharedCacheUploaderService() {

View File

@ -131,6 +131,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@ -165,6 +166,8 @@ public class ResourceLocalizationService extends CompositeService
private final ScheduledExecutorService cacheCleanup; private final ScheduledExecutorService cacheCleanup;
private LocalizerTokenSecretManager secretManager; private LocalizerTokenSecretManager secretManager;
private NMStateStoreService stateStore; private NMStateStoreService stateStore;
@VisibleForTesting
final NodeManagerMetrics metrics;
@VisibleForTesting @VisibleForTesting
LocalResourcesTracker publicRsrc; LocalResourcesTracker publicRsrc;
@ -194,7 +197,8 @@ public class ResourceLocalizationService extends CompositeService
public ResourceLocalizationService(Dispatcher dispatcher, public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService, ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler, Context context) { LocalDirsHandlerService dirsHandler, Context context,
NodeManagerMetrics metrics) {
super(ResourceLocalizationService.class.getName()); super(ResourceLocalizationService.class.getName());
this.exec = exec; this.exec = exec;
@ -208,6 +212,7 @@ public ResourceLocalizationService(Dispatcher dispatcher,
.build()); .build());
this.stateStore = context.getNMStateStore(); this.stateStore = context.getNMStateStore();
this.nmContext = context; this.nmContext = context;
this.metrics = metrics;
} }
FileContext getLocalFileContext(Configuration conf) { FileContext getLocalFileContext(Configuration conf) {
@ -530,6 +535,12 @@ LocalCacheCleanerStats handleCacheCleanup() {
} else if (LOG.isInfoEnabled()) { } else if (LOG.isInfoEnabled()) {
LOG.info(stats.toString()); LOG.info(stats.toString());
} }
// Update metrics
metrics.setCacheSizeBeforeClean(stats.getCacheSizeBeforeClean());
metrics.setTotalBytesDeleted(stats.getTotalDelSize());
metrics.setPrivateBytesDeleted(stats.getPrivateDelSize());
metrics.setPublicBytesDeleted(stats.getPublicDelSize());
return stats; return stats;
} }

View File

@ -69,6 +69,15 @@ public class NodeManagerMetrics {
@Metric("# of running opportunistic containers") @Metric("# of running opportunistic containers")
MutableGaugeInt runningOpportunisticContainers; MutableGaugeInt runningOpportunisticContainers;
@Metric("Local cache size (public and private) before clean (Bytes)")
MutableGaugeLong cacheSizeBeforeClean;
@Metric("# of total bytes deleted from the public and private local cache")
MutableGaugeLong totalBytesDeleted;
@Metric("# of bytes deleted from the public local cache")
MutableGaugeLong publicBytesDeleted;
@Metric("# of bytes deleted from the private local cache")
MutableGaugeLong privateBytesDeleted;
// CHECKSTYLE:ON:VisibilityModifier // CHECKSTYLE:ON:VisibilityModifier
private JvmMetrics jvmMetrics = null; private JvmMetrics jvmMetrics = null;
@ -215,6 +224,22 @@ public void setGoodLogDirsDiskUtilizationPerc(
this.goodLogDirsDiskUtilizationPerc.set(goodLogDirsDiskUtilizationPerc); this.goodLogDirsDiskUtilizationPerc.set(goodLogDirsDiskUtilizationPerc);
} }
public void setCacheSizeBeforeClean(long cacheSizeBeforeClean) {
this.cacheSizeBeforeClean.set(cacheSizeBeforeClean);
}
public void setTotalBytesDeleted(long totalBytesDeleted) {
this.totalBytesDeleted.set(totalBytesDeleted);
}
public void setPublicBytesDeleted(long publicBytesDeleted) {
this.publicBytesDeleted.set(publicBytesDeleted);
}
public void setPrivateBytesDeleted(long privateBytesDeleted) {
this.privateBytesDeleted.set(privateBytesDeleted);
}
public int getRunningContainers() { public int getRunningContainers() {
return containersRunning.value(); return containersRunning.value();
} }
@ -275,4 +300,20 @@ public int getAllocatedOpportunisticVCores() {
public int getRunningOpportunisticContainers() { public int getRunningOpportunisticContainers() {
return runningOpportunisticContainers.value(); return runningOpportunisticContainers.value();
} }
public long getCacheSizeBeforeClean() {
return this.cacheSizeBeforeClean.value();
}
public long getTotalBytesDeleted() {
return this.totalBytesDeleted.value();
}
public long getPublicBytesDeleted() {
return this.publicBytesDeleted.value();
}
public long getPrivateBytesDeleted() {
return this.privateBytesDeleted.value();
}
} }

View File

@ -73,9 +73,10 @@ public DummyContainerManager(Context context, ContainerExecutor exec,
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService( protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext, Context context) { ContainerExecutor exec, DeletionService deletionContext, Context context,
NodeManagerMetrics metrics) {
return new ResourceLocalizationService(super.dispatcher, exec, return new ResourceLocalizationService(super.dispatcher, exec,
deletionContext, super.dirsHandler, context) { deletionContext, super.dirsHandler, context, metrics) {
@Override @Override
public void handle(LocalizationEvent event) { public void handle(LocalizationEvent event) {
switch (event.getType()) { switch (event.getType()) {

View File

@ -100,6 +100,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -699,8 +700,10 @@ private void waitForAppState(Application app, ApplicationState state)
private ContainerManagerImpl createContainerManager(Context context) { private ContainerManagerImpl createContainerManager(Context context) {
final LogHandler logHandler = mock(LogHandler.class); final LogHandler logHandler = mock(LogHandler.class);
final NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
final ResourceLocalizationService rsrcSrv = final ResourceLocalizationService rsrcSrv =
new ResourceLocalizationService(null, null, null, null, context) { new ResourceLocalizationService(null, null, null, null, context,
metrics) {
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
} }
@ -739,8 +742,10 @@ protected LogHandler createLogHandler(Configuration conf,
} }
@Override @Override
protected ResourceLocalizationService createResourceLocalizationService( protected ResourceLocalizationService
ContainerExecutor exec, DeletionService deletionContext, Context context) { createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext,
Context context, NodeManagerMetrics metrics) {
return rsrcSrv; return rsrcSrv;
} }

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.junit.Test; import org.junit.Test;
/** /**
@ -80,8 +81,12 @@ public void testBasicCleanup() {
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2")) ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
.getLocalRsrc().size()); .getLocalRsrc().size());
assertEquals(100, stats.getTotalDelSize()); assertEquals(100, stats.getTotalDelSize());
assertEquals(100, rls.metrics.getTotalBytesDeleted());
assertEquals(60, stats.getPublicDelSize()); assertEquals(60, stats.getPublicDelSize());
assertEquals(60, rls.metrics.getPublicBytesDeleted());
assertEquals(40, stats.getPrivateDelSize()); assertEquals(40, stats.getPrivateDelSize());
assertEquals(40, rls.metrics.getPrivateBytesDeleted());
assertEquals(100, rls.metrics.getCacheSizeBeforeClean());
} }
@Test @Test
@ -105,8 +110,12 @@ public void testPositiveRefCount() {
assertEquals(1, resources.getLocalRsrc().size()); assertEquals(1, resources.getLocalRsrc().size());
assertTrue(resources.getLocalRsrc().containsKey(survivor)); assertTrue(resources.getLocalRsrc().containsKey(survivor));
assertEquals(20, stats.getTotalDelSize()); assertEquals(20, stats.getTotalDelSize());
assertEquals(20, rls.metrics.getTotalBytesDeleted());
assertEquals(20, stats.getPublicDelSize()); assertEquals(20, stats.getPublicDelSize());
assertEquals(20, rls.metrics.getPublicBytesDeleted());
assertEquals(0, stats.getPrivateDelSize()); assertEquals(0, stats.getPrivateDelSize());
assertEquals(0, rls.metrics.getPrivateBytesDeleted());
assertEquals(40, rls.metrics.getCacheSizeBeforeClean());
} }
@Test @Test
@ -164,8 +173,12 @@ public void testLRUAcrossTrackers() {
assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1)); assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
assertEquals(80, stats.getTotalDelSize()); assertEquals(80, stats.getTotalDelSize());
assertEquals(80, rls.metrics.getTotalBytesDeleted());
assertEquals(20, stats.getPublicDelSize()); assertEquals(20, stats.getPublicDelSize());
assertEquals(20, rls.metrics.getPublicBytesDeleted());
assertEquals(60, stats.getPrivateDelSize()); assertEquals(60, stats.getPrivateDelSize());
assertEquals(60, rls.metrics.getPrivateBytesDeleted());
assertEquals(160, rls.metrics.getCacheSizeBeforeClean());
} }
private ResourceLocalizationService createLocService( private ResourceLocalizationService createLocService(
@ -174,8 +187,10 @@ private ResourceLocalizationService createLocService(
long targetCacheSize) { long targetCacheSize) {
Context mockedContext = mock(Context.class); Context mockedContext = mock(Context.class);
when(mockedContext.getNMStateStore()).thenReturn(null); when(mockedContext.getNMStateStore()).thenReturn(null);
NodeManagerMetrics metrics = NodeManagerMetrics.create();
ResourceLocalizationService rls = ResourceLocalizationService rls =
new ResourceLocalizationService(null, null, null, null, mockedContext); new ResourceLocalizationService(null, null, null, null, mockedContext,
metrics);
// We set the following members directly so we don't have to deal with // We set the following members directly so we don't have to deal with
// mocking out the service init method. // mocking out the service init method.
rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs); rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.junit.Assert; import static org.mockito.Mockito.mock;
import org.junit.Assert;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -83,8 +85,10 @@ public void testMinimumPerDirectoryFileLimit() {
new NMTokenSecretManagerInNM(), null, new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), new ApplicationACLsManager(conf), new NMNullStateStoreService(),
false, conf); false, conf);
NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
ResourceLocalizationService service = ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null, nmContext); new ResourceLocalizationService(null, null, null, null, nmContext,
metrics);
try { try {
service.init(conf); service.init(conf);
} catch (Exception e1) { } catch (Exception e1) {

View File

@ -70,7 +70,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.AbstractFileSystem;
@ -141,6 +140,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -171,6 +171,7 @@ public class TestResourceLocalizationService {
private AbstractFileSystem spylfs; private AbstractFileSystem spylfs;
private FileContext lfs; private FileContext lfs;
private NMContext nmContext; private NMContext nmContext;
private NodeManagerMetrics metrics;
@BeforeClass @BeforeClass
public static void setupClass() { public static void setupClass() {
mockServer = mock(Server.class); mockServer = mock(Server.class);
@ -189,6 +190,7 @@ public void setup() throws IOException {
conf), new NMTokenSecretManagerInNM(), null, conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), new ApplicationACLsManager(conf), new NMNullStateStoreService(),
false, conf); false, conf);
metrics = mock(NodeManagerMetrics.class);
} }
@After @After
@ -225,7 +227,7 @@ public void testLocalizationInit() throws Exception {
ResourceLocalizationService locService = ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService, spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler, nmContext)); diskhandler, nmContext, metrics));
doReturn(lfs) doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class)); .when(locService).getLocalFileContext(isA(Configuration.class));
try { try {
@ -286,7 +288,7 @@ public void testDirectoryCleanupOnNewlyCreatedStateStore()
ResourceLocalizationService locService = ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService, spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler,nmContext)); diskhandler, nmContext, metrics));
doReturn(lfs) doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class)); .when(locService).getLocalFileContext(isA(Configuration.class));
try { try {
@ -357,7 +359,7 @@ public void testResourceRelease() throws Exception {
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext); dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@ -757,7 +759,7 @@ public void testLocalizerRunnerException() throws Exception {
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy, nmContext); dirsHandlerSpy, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
try { try {
@ -847,7 +849,7 @@ public void testLocalizationHeartbeat() throws Exception {
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext); dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -1143,7 +1145,8 @@ public void testDownloadingResourcesOnContainerKill() throws Exception {
DrainDispatcher dispatcher = getDispatcher(conf); DrainDispatcher dispatcher = getDispatcher(conf);
ResourceLocalizationService rawService = new ResourceLocalizationService( ResourceLocalizationService rawService = new ResourceLocalizationService(
dispatcher, exec, delService, dirsHandler, nmContext); dispatcher, exec, delService, dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -1452,7 +1455,7 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
try { try {
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, spyContext); dirsHandler, spyContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext( doReturn(lfs).when(spyService).getLocalFileContext(
@ -1547,7 +1550,8 @@ public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
dirsHandler.init(conf); dirsHandler.init(conf);
// Start resource localization service. // Start resource localization service.
ResourceLocalizationService rawService = new ResourceLocalizationService( ResourceLocalizationService rawService = new ResourceLocalizationService(
dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext); dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext,
metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService). doReturn(lfs).when(spyService).
@ -1666,7 +1670,7 @@ public void testFailedPublicResource() throws Exception {
try { try {
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext); dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext( doReturn(lfs).when(spyService).getLocalFileContext(
@ -1775,7 +1779,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception {
try { try {
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy, nmContext); dirsHandlerSpy, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext( doReturn(lfs).when(spyService).getLocalFileContext(
@ -1907,7 +1911,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
ResourceLocalizationService rls = ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService, new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler, nmContext); localDirHandler, nmContext, metrics);
dispatcher1.register(LocalizationEventType.class, rls); dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf); rls.init(conf);
@ -2060,7 +2064,7 @@ public void testLocalResourcePath() throws Exception {
ResourceLocalizationService rls = ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService, new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler, nmContext); localDirHandler, nmContext, metrics);
dispatcher1.register(LocalizationEventType.class, rls); dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf); rls.init(conf);
@ -2226,7 +2230,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception {
// it as otherwise it will remove requests from pending queue. // it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService, new ResourceLocalizationService(dispatcher1, exec, delService,
dirsHandler, nmContext); dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService); dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf); spyService.init(conf);
@ -2532,7 +2536,7 @@ private ResourceLocalizationService createSpyService(
new ApplicationACLsManager(conf), stateStore, false, conf); new ApplicationACLsManager(conf), stateStore, false, conf);
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext); dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker( doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
@ -2596,7 +2600,7 @@ public void testFailedDirsResourceRelease() throws Exception {
// setup mocks // setup mocks
ResourceLocalizationService rawService = ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService, new ResourceLocalizationService(dispatcher, exec, delService,
mockDirsHandler, nmContext); mockDirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService); ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer(); doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(