YARN-539. Addressed memory leak of LocalResource objects NM when a resource localization fails. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466756 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e3b56f6e9
commit
4234bc87b3
|
@ -220,6 +220,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
YARN-534. Change RM restart recovery to also account for AM max-attempts
|
||||
configuration after the restart. (Jian He via vinodkv)
|
||||
|
||||
YARN-539. Addressed memory leak of LocalResource objects NM when a resource
|
||||
localization fails. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
Release 2.0.4-alpha - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -270,4 +270,11 @@
|
|||
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
|
||||
</Match>
|
||||
|
||||
<!-- This type cast problem will never occur. -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
|
||||
<Method name="handle" />
|
||||
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||
</Match>
|
||||
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -40,8 +40,5 @@ interface LocalResourcesTracker
|
|||
|
||||
String getUser();
|
||||
|
||||
// TODO: Remove this in favour of EventHandler.handle
|
||||
void localizationCompleted(LocalResourceRequest req, boolean success);
|
||||
|
||||
long nextUniqueNumber();
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -96,13 +97,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
this.conf = conf;
|
||||
}
|
||||
|
||||
/*
|
||||
* Synchronizing this method for avoiding races due to multiple ResourceEvent's
|
||||
* coming to LocalResourcesTracker from Public/Private localizer and
|
||||
* Resource Localization Service.
|
||||
*/
|
||||
@Override
|
||||
public void handle(ResourceEvent event) {
|
||||
public synchronized void handle(ResourceEvent event) {
|
||||
LocalResourceRequest req = event.getLocalResourceRequest();
|
||||
LocalizedResource rsrc = localrsrc.get(req);
|
||||
switch (event.getType()) {
|
||||
case REQUEST:
|
||||
case LOCALIZED:
|
||||
if (useLocalCacheDirectoryManager) {
|
||||
inProgressLocalResourcesMap.remove(req);
|
||||
}
|
||||
break;
|
||||
case REQUEST:
|
||||
if (rsrc != null && (!isResourcePresent(rsrc))) {
|
||||
LOG.info("Resource " + rsrc.getLocalPath()
|
||||
+ " is missing, localizing it again");
|
||||
|
@ -117,10 +127,24 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
break;
|
||||
case RELEASE:
|
||||
if (null == rsrc) {
|
||||
LOG.info("Release unknown rsrc null (discard)");
|
||||
// The container sent a release event on a resource which
|
||||
// 1) Failed
|
||||
// 2) Removed for some reason (ex. disk is no longer accessible)
|
||||
ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
|
||||
LOG.info("Container " + relEvent.getContainer()
|
||||
+ " sent RELEASE event on a resource request " + req
|
||||
+ " not present in cache.");
|
||||
return;
|
||||
}
|
||||
break;
|
||||
case LOCALIZATION_FAILED:
|
||||
decrementFileCountForLocalCacheDirectory(req, null);
|
||||
/*
|
||||
* If resource localization fails then Localized resource will be
|
||||
* removed from local cache.
|
||||
*/
|
||||
localrsrc.remove(req);
|
||||
break;
|
||||
}
|
||||
rsrc.handle(event);
|
||||
}
|
||||
|
@ -279,18 +303,6 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void localizationCompleted(LocalResourceRequest req,
|
||||
boolean success) {
|
||||
if (useLocalCacheDirectoryManager) {
|
||||
if (!success) {
|
||||
decrementFileCountForLocalCacheDirectory(req, null);
|
||||
} else {
|
||||
inProgressLocalResourcesMap.remove(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nextUniqueNumber() {
|
||||
return uniqueNumberGenerator.incrementAndGet();
|
||||
|
|
|
@ -32,10 +32,12 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
|
||||
|
@ -89,6 +91,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
|
|||
.addTransition(ResourceState.DOWNLOADING,
|
||||
EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
|
||||
ResourceEventType.RELEASE, new ReleasePendingTransition())
|
||||
.addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
|
||||
ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
|
||||
|
||||
// From LOCALIZED (ref >= 0, on disk)
|
||||
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
|
||||
|
@ -126,12 +130,14 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
|
|||
}
|
||||
|
||||
private void release(ContainerId container) {
|
||||
if (!ref.remove(container)) {
|
||||
LOG.info("Attempt to release claim on " + this +
|
||||
" from unregistered container " + container);
|
||||
assert false; // TODO: FIX
|
||||
}
|
||||
if (ref.remove(container)) {
|
||||
// updating the timestamp only in case of success.
|
||||
timestamp.set(currentTime());
|
||||
} else {
|
||||
LOG.info("Container " + container
|
||||
+ " doesn't exist in the container list of the Resource " + this
|
||||
+ " to which it sent RELEASE event");
|
||||
}
|
||||
}
|
||||
|
||||
private long currentTime() {
|
||||
|
@ -250,6 +256,25 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource localization failed, notify waiting containers.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static class FetchFailedTransition extends ResourceTransition {
|
||||
@Override
|
||||
public void transition(LocalizedResource rsrc, ResourceEvent event) {
|
||||
ResourceFailedLocalizationEvent failedEvent =
|
||||
(ResourceFailedLocalizationEvent) event;
|
||||
Queue<ContainerId> containers = rsrc.ref;
|
||||
Throwable failureCause = failedEvent.getCause();
|
||||
for (ContainerId container : containers) {
|
||||
rsrc.dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceFailedEvent(container, failedEvent
|
||||
.getLocalResourceRequest(), failureCause));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource already localized, notify immediately.
|
||||
*/
|
||||
|
|
|
@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResour
|
|||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
|
@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
|
||||
|
@ -683,7 +683,6 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
public void run() {
|
||||
try {
|
||||
// TODO shutdown, better error handling esp. DU
|
||||
|
@ -699,10 +698,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
return;
|
||||
}
|
||||
LocalResourceRequest key = assoc.getResource().getRequest();
|
||||
assoc.getResource().handle(
|
||||
new ResourceLocalizedEvent(key,
|
||||
local, FileUtil.getDU(new File(local.toUri()))));
|
||||
publicRsrc.localizationCompleted(key, true);
|
||||
publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
|
||||
.getDU(new File(local.toUri()))));
|
||||
synchronized (attempts) {
|
||||
attempts.remove(key);
|
||||
}
|
||||
|
@ -710,13 +707,10 @@ public class ResourceLocalizationService extends CompositeService
|
|||
LOG.info("Failed to download rsrc " + assoc.getResource(),
|
||||
e.getCause());
|
||||
LocalResourceRequest req = assoc.getResource().getRequest();
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceFailedEvent(
|
||||
assoc.getContext().getContainerId(),
|
||||
req, e.getCause()));
|
||||
publicRsrc.localizationCompleted(req, false);
|
||||
List<LocalizerResourceRequestEvent> reqs;
|
||||
publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
|
||||
.getCause()));
|
||||
synchronized (attempts) {
|
||||
List<LocalizerResourceRequestEvent> reqs;
|
||||
reqs = attempts.get(req);
|
||||
if (null == reqs) {
|
||||
LOG.error("Missing pending list for " + req);
|
||||
|
@ -724,13 +718,6 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
attempts.remove(req);
|
||||
}
|
||||
// let the other containers know about the localization failure
|
||||
for (LocalizerResourceRequestEvent reqEvent : reqs) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceFailedEvent(
|
||||
reqEvent.getContext().getContainerId(),
|
||||
reqEvent.getResource().getRequest(), e.getCause()));
|
||||
}
|
||||
} catch (CancellationException e) {
|
||||
// ignore; shutting down
|
||||
}
|
||||
|
@ -810,13 +797,14 @@ public class ResourceLocalizationService extends CompositeService
|
|||
return null;
|
||||
}
|
||||
|
||||
// TODO this sucks. Fix it later
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
LocalizerHeartbeatResponse update(
|
||||
List<LocalResourceStatus> remoteResourceStatuses) {
|
||||
LocalizerHeartbeatResponse response =
|
||||
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
|
||||
|
||||
String user = context.getUser();
|
||||
ApplicationId applicationId =
|
||||
context.getContainerId().getApplicationAttemptId().getApplicationId();
|
||||
// The localizer has just spawned. Start giving it resources for
|
||||
// remote-fetching.
|
||||
if (remoteResourceStatuses.isEmpty()) {
|
||||
|
@ -847,6 +835,11 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
ArrayList<ResourceLocalizationSpec> rsrcs =
|
||||
new ArrayList<ResourceLocalizationSpec>();
|
||||
/*
|
||||
* TODO : It doesn't support multiple downloads per ContainerLocalizer
|
||||
* at the same time. We need to think whether we should support this.
|
||||
*/
|
||||
|
||||
for (LocalResourceStatus stat : remoteResourceStatuses) {
|
||||
LocalResource rsrc = stat.getResource();
|
||||
LocalResourceRequest req = null;
|
||||
|
@ -865,11 +858,10 @@ public class ResourceLocalizationService extends CompositeService
|
|||
case FETCH_SUCCESS:
|
||||
// notify resource
|
||||
try {
|
||||
assoc.getResource().handle(
|
||||
new ResourceLocalizedEvent(req,
|
||||
ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
|
||||
stat.getLocalSize()));
|
||||
localizationCompleted(stat);
|
||||
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
|
||||
.handle(
|
||||
new ResourceLocalizedEvent(req, ConverterUtils
|
||||
.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
|
||||
} catch (URISyntaxException e) { }
|
||||
if (pending.isEmpty()) {
|
||||
// TODO: Synchronization
|
||||
|
@ -899,19 +891,16 @@ public class ResourceLocalizationService extends CompositeService
|
|||
LOG.info("DEBUG: FAILED " + req, stat.getException());
|
||||
assoc.getResource().unlock();
|
||||
response.setLocalizerAction(LocalizerAction.DIE);
|
||||
localizationCompleted(stat);
|
||||
// TODO: Why is this event going directly to the container. Why not
|
||||
// the resource itself? What happens to the resource? Is it removed?
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceFailedEvent(context.getContainerId(),
|
||||
req, stat.getException()));
|
||||
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
|
||||
.handle(
|
||||
new ResourceFailedLocalizationEvent(req, stat.getException()));
|
||||
break;
|
||||
default:
|
||||
LOG.info("Unknown status: " + stat.getStatus());
|
||||
response.setLocalizerAction(LocalizerAction.DIE);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceFailedEvent(context.getContainerId(),
|
||||
req, stat.getException()));
|
||||
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
|
||||
.handle(
|
||||
new ResourceFailedLocalizationEvent(req, stat.getException()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -919,27 +908,6 @@ public class ResourceLocalizationService extends CompositeService
|
|||
return response;
|
||||
}
|
||||
|
||||
private void localizationCompleted(LocalResourceStatus stat) {
|
||||
try {
|
||||
LocalResource rsrc = stat.getResource();
|
||||
LocalResourceRequest key = new LocalResourceRequest(rsrc);
|
||||
String user = context.getUser();
|
||||
ApplicationId appId =
|
||||
context.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
LocalResourceVisibility vis = rsrc.getVisibility();
|
||||
LocalResourcesTracker tracker =
|
||||
getLocalResourcesTracker(vis, user, appId);
|
||||
if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
|
||||
tracker.localizationCompleted(key, true);
|
||||
} else {
|
||||
tracker.localizationCompleted(key, false);
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Invalid resource URL specified", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Path getPathForLocalization(LocalResource rsrc) throws IOException,
|
||||
URISyntaxException {
|
||||
String user = context.getUser();
|
||||
|
|
|
@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
|||
enum ResourceState {
|
||||
INIT,
|
||||
DOWNLOADING,
|
||||
LOCALIZED
|
||||
LOCALIZED,
|
||||
FAILED
|
||||
}
|
||||
|
|
|
@ -29,5 +29,7 @@ public enum ResourceEventType {
|
|||
/** See {@link ResourceLocalizedEvent} */
|
||||
LOCALIZED,
|
||||
/** See {@link ResourceReleaseEvent} */
|
||||
RELEASE
|
||||
RELEASE,
|
||||
/** See {@link ResourceFailedLocalizationEvent} */
|
||||
LOCALIZATION_FAILED
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||
|
||||
/**
|
||||
* This event is sent by the localizer in case resource localization fails for
|
||||
* the requested resource.
|
||||
*/
|
||||
public class ResourceFailedLocalizationEvent extends ResourceEvent {
|
||||
|
||||
private Throwable cause;
|
||||
|
||||
public ResourceFailedLocalizationEvent(LocalResourceRequest rsrc,
|
||||
Throwable cause) {
|
||||
super(rsrc, ResourceEventType.LOCALIZATION_FAILED);
|
||||
this.cause = cause;
|
||||
}
|
||||
|
||||
public Throwable getCause() {
|
||||
return cause;
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -41,11 +42,15 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
|
||||
|
@ -224,6 +229,142 @@ public class TestLocalResourcesTrackerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testLocalResourceCache() {
|
||||
String user = "testuser";
|
||||
DrainDispatcher dispatcher = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
dispatcher = createDispatcher(conf);
|
||||
|
||||
EventHandler<LocalizerEvent> localizerEventHandler =
|
||||
mock(EventHandler.class);
|
||||
EventHandler<ContainerEvent> containerEventHandler =
|
||||
mock(EventHandler.class);
|
||||
|
||||
// Registering event handlers.
|
||||
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
||||
dispatcher.register(ContainerEventType.class, containerEventHandler);
|
||||
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
LocalResourcesTracker tracker =
|
||||
new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
|
||||
|
||||
LocalResourceRequest lr =
|
||||
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
|
||||
|
||||
// Creating 2 containers for same application which will be requesting
|
||||
// same local resource.
|
||||
// Container 1 requesting local resource.
|
||||
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
|
||||
ResourceEvent reqEvent1 =
|
||||
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc1);
|
||||
|
||||
// No resource request is initially present in local cache
|
||||
Assert.assertEquals(0, localrsrc.size());
|
||||
|
||||
// Container-1 requesting local resource.
|
||||
tracker.handle(reqEvent1);
|
||||
|
||||
// New localized Resource should have been added to local resource map
|
||||
// and the requesting container will be added to its waiting queue.
|
||||
Assert.assertEquals(1, localrsrc.size());
|
||||
Assert.assertTrue(localrsrc.containsKey(lr));
|
||||
Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
|
||||
Assert.assertTrue(localrsrc.get(lr).ref.contains(cId1));
|
||||
Assert.assertEquals(ResourceState.DOWNLOADING, localrsrc.get(lr)
|
||||
.getState());
|
||||
|
||||
// Container 2 requesting the resource
|
||||
ContainerId cId2 = BuilderUtils.newContainerId(1, 1, 1, 2);
|
||||
LocalizerContext lc2 = new LocalizerContext(user, cId2, null);
|
||||
ResourceEvent reqEvent2 =
|
||||
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
|
||||
tracker.handle(reqEvent2);
|
||||
|
||||
// Container 2 should have been added to the waiting queue of the local
|
||||
// resource
|
||||
Assert.assertEquals(2, localrsrc.get(lr).getRefCount());
|
||||
Assert.assertTrue(localrsrc.get(lr).ref.contains(cId2));
|
||||
|
||||
// Failing resource localization
|
||||
ResourceEvent resourceFailedEvent =
|
||||
new ResourceFailedLocalizationEvent(lr, new Exception("test"));
|
||||
|
||||
// Backing up the resource to track its state change as it will be
|
||||
// removed after the failed event.
|
||||
LocalizedResource localizedResource = localrsrc.get(lr);
|
||||
|
||||
tracker.handle(resourceFailedEvent);
|
||||
|
||||
// After receiving failed resource event; all waiting containers will be
|
||||
// notified with Container Resource Failed Event.
|
||||
Assert.assertEquals(0, localrsrc.size());
|
||||
verify(containerEventHandler, times(2)).handle(
|
||||
isA(ContainerResourceFailedEvent.class));
|
||||
Assert.assertEquals(ResourceState.FAILED, localizedResource.getState());
|
||||
|
||||
// Container 1 trying to release the resource (This resource is already
|
||||
// deleted from the cache. This call should return silently without
|
||||
// exception.
|
||||
ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
|
||||
tracker.handle(relEvent1);
|
||||
|
||||
// Container-3 now requests for the same resource. This request call
|
||||
// is coming prior to Container-2's release call.
|
||||
ContainerId cId3 = BuilderUtils.newContainerId(1, 1, 1, 3);
|
||||
LocalizerContext lc3 = new LocalizerContext(user, cId3, null);
|
||||
ResourceEvent reqEvent3 =
|
||||
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
|
||||
tracker.handle(reqEvent3);
|
||||
|
||||
// Local resource cache now should have the requested resource and the
|
||||
// number of waiting containers should be 1.
|
||||
Assert.assertEquals(1, localrsrc.size());
|
||||
Assert.assertTrue(localrsrc.containsKey(lr));
|
||||
Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
|
||||
Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3));
|
||||
|
||||
// Container-2 Releases the resource
|
||||
ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
|
||||
tracker.handle(relEvent2);
|
||||
|
||||
// Making sure that there is no change in the cache after the release.
|
||||
Assert.assertEquals(1, localrsrc.size());
|
||||
Assert.assertTrue(localrsrc.containsKey(lr));
|
||||
Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
|
||||
Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3));
|
||||
|
||||
// Sending ResourceLocalizedEvent to tracker. In turn resource should
|
||||
// send Container Resource Localized Event to waiting containers.
|
||||
Path localizedPath = new Path("/tmp/file1");
|
||||
ResourceLocalizedEvent localizedEvent =
|
||||
new ResourceLocalizedEvent(lr, localizedPath, 123L);
|
||||
tracker.handle(localizedEvent);
|
||||
|
||||
// Verifying ContainerResourceLocalizedEvent .
|
||||
verify(containerEventHandler, times(1)).handle(
|
||||
isA(ContainerResourceLocalizedEvent.class));
|
||||
Assert.assertEquals(ResourceState.LOCALIZED, localrsrc.get(lr)
|
||||
.getState());
|
||||
Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
|
||||
|
||||
// Container-3 releasing the resource.
|
||||
ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
|
||||
tracker.handle(relEvent3);
|
||||
|
||||
Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
|
||||
|
||||
} finally {
|
||||
if (dispatcher != null) {
|
||||
dispatcher.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testHierarchicalLocalCacheDirectories() {
|
||||
|
@ -266,19 +407,25 @@ public class TestLocalResourcesTrackerImpl {
|
|||
// Simulate the process of localization of lr1
|
||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
||||
// Simulate lr1 getting localized
|
||||
ResourceLocalizedEvent rle =
|
||||
ResourceLocalizedEvent rle1 =
|
||||
new ResourceLocalizedEvent(lr1,
|
||||
new Path(hierarchicalPath1.toUri().toString() +
|
||||
Path.SEPARATOR + "file1"), 120);
|
||||
tracker.handle(rle);
|
||||
tracker.handle(rle1);
|
||||
// Localization successful.
|
||||
tracker.localizationCompleted(lr1, true);
|
||||
|
||||
LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
|
||||
LocalResourceVisibility.PUBLIC);
|
||||
// Container 1 requests lr2 to be localized.
|
||||
ResourceEvent reqEvent2 =
|
||||
new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
|
||||
tracker.handle(reqEvent2);
|
||||
|
||||
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
|
||||
// localization failed.
|
||||
tracker.localizationCompleted(lr2, false);
|
||||
ResourceFailedLocalizationEvent rfe2 =
|
||||
new ResourceFailedLocalizationEvent(lr2, new Exception("Test"));
|
||||
tracker.handle(rfe2);
|
||||
|
||||
/*
|
||||
* The path returned for two localization should be different because we
|
||||
|
@ -292,7 +439,11 @@ public class TestLocalResourcesTrackerImpl {
|
|||
LocalResourceVisibility.PUBLIC, lc1);
|
||||
tracker.handle(reqEvent3);
|
||||
Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
|
||||
tracker.localizationCompleted(lr3, true);
|
||||
// localization successful
|
||||
ResourceLocalizedEvent rle3 =
|
||||
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
|
||||
.toString() + Path.SEPARATOR + "file3"), 120);
|
||||
tracker.handle(rle3);
|
||||
|
||||
// Verifying that path created is inside the subdirectory
|
||||
Assert.assertEquals(hierarchicalPath3.toUri().toString(),
|
||||
|
|
Loading…
Reference in New Issue