MAPREDUCE-2541. Fixed a race condition in IndexCache.removeMap. Contributed by Binglin Chang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157346 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
24676e8c2e
commit
2cd4c6ad6d
|
@ -396,6 +396,9 @@ Trunk (unreleased changes)
|
||||||
MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279
|
MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279
|
||||||
merge. (acmurthy)
|
merge. (acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-2541. Fixed a race condition in IndexCache.removeMap. (Binglin
|
||||||
|
Chang via acmurthy)
|
||||||
|
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
|
|
|
@ -130,12 +130,19 @@ class IndexCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method removes the map from the cache. It should be called when
|
* This method removes the map from the cache if index information for this
|
||||||
* a map output on this tracker is discarded.
|
* map is loaded(size>0), index information entry in cache will not be
|
||||||
|
* removed if it is in the loading phrase(size=0), this prevents corruption
|
||||||
|
* of totalMemoryUsed. It should be called when a map output on this tracker
|
||||||
|
* is discarded.
|
||||||
* @param mapId The taskID of this map.
|
* @param mapId The taskID of this map.
|
||||||
*/
|
*/
|
||||||
public void removeMap(String mapId) {
|
public void removeMap(String mapId) {
|
||||||
IndexInformation info = cache.remove(mapId);
|
IndexInformation info = cache.get(mapId);
|
||||||
|
if ((info != null) && (info.getSize() == 0)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
info = cache.remove(mapId);
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
totalMemoryUsed.addAndGet(-info.getSize());
|
totalMemoryUsed.addAndGet(-info.getSize());
|
||||||
if (!queue.remove(mapId)) {
|
if (!queue.remove(mapId)) {
|
||||||
|
@ -146,6 +153,19 @@ class IndexCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method checks if cache and totolMemoryUsed is consistent.
|
||||||
|
* It is only used for unit test.
|
||||||
|
* @return True if cache and totolMemoryUsed is consistent
|
||||||
|
*/
|
||||||
|
boolean checkTotalMemoryUsed() {
|
||||||
|
int totalSize = 0;
|
||||||
|
for (IndexInformation info : cache.values()) {
|
||||||
|
totalSize += info.getSize();
|
||||||
|
}
|
||||||
|
return totalSize == totalMemoryUsed.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bring memory usage below totalMemoryAllowed.
|
* Bring memory usage below totalMemoryAllowed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -193,6 +193,60 @@ public class TestIndexCache extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRemoveMap() throws Exception {
|
||||||
|
// This test case use two thread to call getIndexInformation and
|
||||||
|
// removeMap concurrently, in order to construct race condition.
|
||||||
|
// This test case may not repeatable. But on my macbook this test
|
||||||
|
// fails with probability of 100% on code before MAPREDUCE-2541,
|
||||||
|
// so it is repeatable in practice.
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf).getRaw();
|
||||||
|
Path p = new Path(System.getProperty("test.build.data", "/tmp"),
|
||||||
|
"cache").makeQualified(fs);
|
||||||
|
fs.delete(p, true);
|
||||||
|
conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
|
||||||
|
// Make a big file so removeMapThread almost surely runs faster than
|
||||||
|
// getInfoThread
|
||||||
|
final int partsPerMap = 100000;
|
||||||
|
final int bytesPerFile = partsPerMap * 24;
|
||||||
|
final IndexCache cache = new IndexCache(conf);
|
||||||
|
|
||||||
|
final Path big = new Path(p, "bigIndex");
|
||||||
|
final String user =
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
writeFile(fs, big, bytesPerFile, partsPerMap);
|
||||||
|
|
||||||
|
// run multiple times
|
||||||
|
for (int i = 0; i < 20; ++i) {
|
||||||
|
Thread getInfoThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
cache.getIndexInformation("bigIndex", partsPerMap, big, user);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// should not be here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Thread removeMapThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
cache.removeMap("bigIndex");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if (i%2==0) {
|
||||||
|
getInfoThread.start();
|
||||||
|
removeMapThread.start();
|
||||||
|
} else {
|
||||||
|
removeMapThread.start();
|
||||||
|
getInfoThread.start();
|
||||||
|
}
|
||||||
|
getInfoThread.join();
|
||||||
|
removeMapThread.join();
|
||||||
|
assertEquals(true, cache.checkTotalMemoryUsed());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void checkRecord(IndexRecord rec, long fill) {
|
private static void checkRecord(IndexRecord rec, long fill) {
|
||||||
assertEquals(fill, rec.startOffset);
|
assertEquals(fill, rec.startOffset);
|
||||||
assertEquals(fill, rec.rawLength);
|
assertEquals(fill, rec.rawLength);
|
||||||
|
|
Loading…
Reference in New Issue