Merge -r 1367351:1367352 from trunk to branch. FIXES: MAPREDUCE-4342
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1367353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b539d225d
commit
f86353ba09
|
@ -30,6 +30,9 @@ Branch-2 ( Unreleased changes )
|
|||
MAPREDUCE-4465. Update description of yarn.nodemanager.address property.
|
||||
(bowang via tucu)
|
||||
|
||||
MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files
|
||||
get deleted from tasktracker. (mayank_bansal via tucu)
|
||||
|
||||
Release 2.1.0-alpha - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
|
||||
|
||||
/**
|
||||
* A collection of {@link LocalizedResource}s all of same
|
||||
|
@ -67,6 +69,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
switch (event.getType()) {
|
||||
case REQUEST:
|
||||
case LOCALIZED:
|
||||
if (rsrc != null && (!isResourcePresent(rsrc))) {
|
||||
LOG.info("Resource " + rsrc.getLocalPath()
|
||||
+ " is missing, localizing it again");
|
||||
localrsrc.remove(req);
|
||||
rsrc = null;
|
||||
}
|
||||
if (null == rsrc) {
|
||||
rsrc = new LocalizedResource(req, dispatcher);
|
||||
localrsrc.put(req, rsrc);
|
||||
|
@ -82,6 +90,24 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
rsrc.handle(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* This module checks if the resource which was localized is already present
|
||||
* or not
|
||||
*
|
||||
* @param rsrc
|
||||
* @return true/false based on resource is present or not
|
||||
*/
|
||||
public boolean isResourcePresent(LocalizedResource rsrc) {
|
||||
boolean ret = true;
|
||||
if (rsrc.getState() == ResourceState.LOCALIZED) {
|
||||
File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
|
||||
if (!file.exists()) {
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(LocalResourceRequest resource) {
|
||||
return localrsrc.containsKey(resource);
|
||||
|
|
|
@ -5,6 +5,8 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
public class TestLocalResourcesTrackerImpl {
|
||||
|
||||
|
@ -131,6 +134,86 @@ public class TestLocalResourcesTrackerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConsistency() {
|
||||
String user = "testuser";
|
||||
DrainDispatcher dispatcher = null;
|
||||
try {
|
||||
dispatcher = createDispatcher(new Configuration());
|
||||
EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
|
||||
EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
|
||||
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
||||
dispatcher.register(ContainerEventType.class, containerEventHandler);
|
||||
|
||||
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
|
||||
LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
|
||||
LocalResourceVisibility.PUBLIC);
|
||||
LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
localrsrc.put(req1, lr1);
|
||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||
dispatcher, localrsrc);
|
||||
|
||||
ResourceEvent req11Event = new ResourceRequestEvent(req1,
|
||||
LocalResourceVisibility.PUBLIC, lc1);
|
||||
|
||||
ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1);
|
||||
|
||||
// Localize R1 for C1
|
||||
tracker.handle(req11Event);
|
||||
|
||||
dispatcher.await();
|
||||
|
||||
// Verify refCount for R1 is 1
|
||||
Assert.assertEquals(1, lr1.getRefCount());
|
||||
|
||||
dispatcher.await();
|
||||
verifyTrackedResourceCount(tracker, 1);
|
||||
|
||||
// Localize resource1
|
||||
ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path(
|
||||
"file:///tmp/r1"), 1);
|
||||
lr1.handle(rle);
|
||||
Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
|
||||
Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1")));
|
||||
LocalizedResource rsrcbefore = tracker.iterator().next();
|
||||
File resFile = new File(lr1.getLocalPath().toUri().getRawPath()
|
||||
.toString());
|
||||
Assert.assertTrue(resFile.exists());
|
||||
Assert.assertTrue(resFile.delete());
|
||||
|
||||
// Localize R1 for C1
|
||||
tracker.handle(req11Event);
|
||||
|
||||
dispatcher.await();
|
||||
lr1.handle(rle);
|
||||
Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
|
||||
LocalizedResource rsrcafter = tracker.iterator().next();
|
||||
if (rsrcbefore == rsrcafter) {
|
||||
Assert.fail("Localized resource should not be equal");
|
||||
}
|
||||
// Release resource1
|
||||
tracker.handle(rel11Event);
|
||||
} finally {
|
||||
if (dispatcher != null) {
|
||||
dispatcher.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean createdummylocalizefile(Path path) {
|
||||
boolean ret = false;
|
||||
File file = new File(path.toUri().getRawPath().toString());
|
||||
try {
|
||||
ret = file.createNewFile();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private void verifyTrackedResourceCount(LocalResourcesTracker tracker,
|
||||
int expected) {
|
||||
int count = 0;
|
||||
|
|
Loading…
Reference in New Issue