Revert "MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles)."
This reverts commit 568f185bf8
.
This commit is contained in:
parent
dcc6d63828
commit
6eb7c28293
|
@ -22,17 +22,17 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
class IndexCache {
|
class IndexCache {
|
||||||
|
|
||||||
private final JobConf conf;
|
private final JobConf conf;
|
||||||
private final int totalMemoryAllowed;
|
private final int totalMemoryAllowed;
|
||||||
private AtomicInteger totalMemoryUsed = new AtomicInteger();
|
private AtomicInteger totalMemoryUsed = new AtomicInteger();
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(IndexCache.class);
|
private static final Log LOG = LogFactory.getLog(IndexCache.class);
|
||||||
|
|
||||||
private final ConcurrentHashMap<String,IndexInformation> cache =
|
private final ConcurrentHashMap<String,IndexInformation> cache =
|
||||||
new ConcurrentHashMap<String,IndexInformation>();
|
new ConcurrentHashMap<String,IndexInformation>();
|
||||||
|
@ -72,12 +72,11 @@ class IndexCache {
|
||||||
try {
|
try {
|
||||||
info.wait();
|
info.wait();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException("Interrupted waiting for construction", e);
|
throw new IOException("Interrupted waiting for construction", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("IndexCache HIT: MapId {} found", mapId);
|
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info.mapSpillRecord.size() == 0 ||
|
if (info.mapSpillRecord.size() == 0 ||
|
||||||
|
@ -107,91 +106,63 @@ class IndexCache {
|
||||||
try {
|
try {
|
||||||
info.wait();
|
info.wait();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException("Interrupted waiting for construction", e);
|
throw new IOException("Interrupted waiting for construction", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("IndexCache HIT: MapId {} found", mapId);
|
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
LOG.debug("IndexCache MISS: MapId {} not found", mapId);
|
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
|
||||||
SpillRecord tmp = null;
|
SpillRecord tmp = null;
|
||||||
boolean success = false;
|
|
||||||
try {
|
try {
|
||||||
tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
|
tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
|
||||||
success = true;
|
} catch (Throwable e) {
|
||||||
} catch (Throwable e) {
|
|
||||||
tmp = new SpillRecord(0);
|
tmp = new SpillRecord(0);
|
||||||
cache.remove(mapId);
|
cache.remove(mapId);
|
||||||
if (e instanceof InterruptedException) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
throw new IOException("Error Reading IndexFile", e);
|
throw new IOException("Error Reading IndexFile", e);
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (newInd) {
|
synchronized (newInd) {
|
||||||
newInd.mapSpillRecord = tmp;
|
newInd.mapSpillRecord = tmp;
|
||||||
if (success) {
|
|
||||||
// Only add mapId to the queue for successful read and after added to
|
|
||||||
// the cache. Once in the queue, it is now eligible for removal once
|
|
||||||
// construction is finished.
|
|
||||||
queue.add(mapId);
|
|
||||||
if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
|
|
||||||
freeIndexInformation();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
newInd.notifyAll();
|
newInd.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
queue.add(mapId);
|
||||||
|
|
||||||
|
if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
|
||||||
|
freeIndexInformation();
|
||||||
|
}
|
||||||
return newInd;
|
return newInd;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method removes the map from the cache if it is present in the queue.
|
* This method removes the map from the cache if index information for this
|
||||||
|
* map is loaded(size>0), index information entry in cache will not be
|
||||||
|
* removed if it is in the loading phrase(size=0), this prevents corruption
|
||||||
|
* of totalMemoryUsed. It should be called when a map output on this tracker
|
||||||
|
* is discarded.
|
||||||
* @param mapId The taskID of this map.
|
* @param mapId The taskID of this map.
|
||||||
*/
|
*/
|
||||||
public void removeMap(String mapId) throws IOException {
|
public void removeMap(String mapId) {
|
||||||
// Successfully removing the mapId from the queue enters into a contract
|
IndexInformation info = cache.get(mapId);
|
||||||
// that this thread will remove the corresponding mapId from the cache.
|
if (info == null || isUnderConstruction(info)) {
|
||||||
if (!queue.remove(mapId)) {
|
|
||||||
LOG.debug("Map ID {} not found in queue", mapId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
removeMapInternal(mapId);
|
info = cache.remove(mapId);
|
||||||
}
|
if (info != null) {
|
||||||
|
totalMemoryUsed.addAndGet(-info.getSize());
|
||||||
/** This method should only be called upon successful removal of mapId from
|
if (!queue.remove(mapId)) {
|
||||||
* the queue. The mapId will be removed from the cache and totalUsedMemory
|
LOG.warn("Map ID" + mapId + " not found in queue!!");
|
||||||
* will be decremented.
|
|
||||||
* @param mapId the cache item to be removed
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void removeMapInternal(String mapId) throws IOException {
|
|
||||||
IndexInformation info = cache.remove(mapId);
|
|
||||||
if (info == null) {
|
|
||||||
// Inconsistent state as presence in queue implies presence in cache
|
|
||||||
LOG.warn("Map ID " + mapId + " not found in cache");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
synchronized(info) {
|
|
||||||
while (isUnderConstruction(info)) {
|
|
||||||
info.wait();
|
|
||||||
}
|
|
||||||
totalMemoryUsed.getAndAdd(-info.getSize());
|
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} else {
|
||||||
totalMemoryUsed.getAndAdd(-info.getSize());
|
LOG.info("Map ID " + mapId + " not found in cache");
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException("Interrupted waiting for construction", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method checks if cache and totalMemoryUsed is consistent.
|
* This method checks if cache and totolMemoryUsed is consistent.
|
||||||
* It is only used for unit test.
|
* It is only used for unit test.
|
||||||
* @return True if cache and totalMemoryUsed is consistent
|
* @return True if cache and totolMemoryUsed is consistent
|
||||||
*/
|
*/
|
||||||
boolean checkTotalMemoryUsed() {
|
boolean checkTotalMemoryUsed() {
|
||||||
int totalSize = 0;
|
int totalSize = 0;
|
||||||
|
@ -204,13 +175,13 @@ class IndexCache {
|
||||||
/**
|
/**
|
||||||
* Bring memory usage below totalMemoryAllowed.
|
* Bring memory usage below totalMemoryAllowed.
|
||||||
*/
|
*/
|
||||||
private synchronized void freeIndexInformation() throws IOException {
|
private synchronized void freeIndexInformation() {
|
||||||
while (totalMemoryUsed.get() > totalMemoryAllowed) {
|
while (totalMemoryUsed.get() > totalMemoryAllowed) {
|
||||||
if(queue.isEmpty()) {
|
String s = queue.remove();
|
||||||
break;
|
IndexInformation info = cache.remove(s);
|
||||||
|
if (info != null) {
|
||||||
|
totalMemoryUsed.addAndGet(-info.getSize());
|
||||||
}
|
}
|
||||||
String mapId = queue.remove();
|
|
||||||
removeMapInternal(mapId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import java.io.DataOutputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.zip.CRC32;
|
import java.util.zip.CRC32;
|
||||||
import java.util.zip.CheckedOutputStream;
|
import java.util.zip.CheckedOutputStream;
|
||||||
|
|
||||||
|
@ -211,32 +210,23 @@ public class TestIndexCache extends TestCase {
|
||||||
final String user =
|
final String user =
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
writeFile(fs, big, bytesPerFile, partsPerMap);
|
writeFile(fs, big, bytesPerFile, partsPerMap);
|
||||||
|
|
||||||
// Capture if any runtime exception occurred
|
|
||||||
final AtomicBoolean failed = new AtomicBoolean();
|
|
||||||
|
|
||||||
// run multiple times
|
// run multiple times
|
||||||
for (int i = 0; i < 20; ++i) {
|
for (int i = 0; i < 20; ++i) {
|
||||||
Thread getInfoThread = new Thread() {
|
Thread getInfoThread = new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
cache.getIndexInformation("bigIndex", 0, big, user);
|
cache.getIndexInformation("bigIndex", partsPerMap, big, user);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// should not be here
|
// should not be here
|
||||||
failed.set(true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Thread removeMapThread = new Thread() {
|
Thread removeMapThread = new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
cache.removeMap("bigIndex");
|
||||||
cache.removeMap("bigIndex");
|
|
||||||
} catch (Exception e) {
|
|
||||||
// should not be here
|
|
||||||
failed.set(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (i%2==0) {
|
if (i%2==0) {
|
||||||
|
@ -248,9 +238,8 @@ public class TestIndexCache extends TestCase {
|
||||||
}
|
}
|
||||||
getInfoThread.join();
|
getInfoThread.join();
|
||||||
removeMapThread.join();
|
removeMapThread.join();
|
||||||
assertFalse("An unexpected exception", failed.get());
|
assertEquals(true, cache.checkTotalMemoryUsed());
|
||||||
assertTrue(cache.checkTotalMemoryUsed());
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCreateRace() throws Exception {
|
public void testCreateRace() throws Exception {
|
||||||
|
@ -265,9 +254,6 @@ public class TestIndexCache extends TestCase {
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
writeFile(fs, racy, bytesPerFile, partsPerMap);
|
writeFile(fs, racy, bytesPerFile, partsPerMap);
|
||||||
|
|
||||||
// Capture if any runtime exception occurred
|
|
||||||
final AtomicBoolean failed = new AtomicBoolean();
|
|
||||||
|
|
||||||
// run multiple instances
|
// run multiple instances
|
||||||
Thread[] getInfoThreads = new Thread[50];
|
Thread[] getInfoThreads = new Thread[50];
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
|
@ -275,15 +261,10 @@ public class TestIndexCache extends TestCase {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while (!Thread.currentThread().isInterrupted()) {
|
cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
|
||||||
cache.getIndexInformation("racyIndex", 0, racy, user);
|
cache.removeMap("racyIndex");
|
||||||
cache.removeMap("racyIndex");
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (!Thread.currentThread().isInterrupted()) {
|
// should not be here
|
||||||
// should not be here
|
|
||||||
failed.set(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -293,12 +274,20 @@ public class TestIndexCache extends TestCase {
|
||||||
getInfoThreads[i].start();
|
getInfoThreads[i].start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// The duration to keep the threads testing
|
final Thread mainTestThread = Thread.currentThread();
|
||||||
Thread.sleep(5000);
|
|
||||||
|
Thread timeoutThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(15000);
|
||||||
|
mainTestThread.interrupt();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
// we are done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
for (int i = 0; i < 50; i++) {
|
|
||||||
getInfoThreads[i].interrupt();
|
|
||||||
}
|
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
try {
|
try {
|
||||||
getInfoThreads[i].join();
|
getInfoThreads[i].join();
|
||||||
|
@ -307,9 +296,10 @@ public class TestIndexCache extends TestCase {
|
||||||
fail("Unexpectedly long delay during concurrent cache entry creations");
|
fail("Unexpectedly long delay during concurrent cache entry creations");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertFalse("An unexpected exception", failed.get());
|
// stop the timeoutThread. If we get interrupted before stopping, there
|
||||||
assertTrue("Total memory used does not represent contents of the cache",
|
// must be something wrong, although it wasn't a deadlock. No need to
|
||||||
cache.checkTotalMemoryUsed());
|
// catch and swallow.
|
||||||
|
timeoutThread.interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void checkRecord(IndexRecord rec, long fill) {
|
private static void checkRecord(IndexRecord rec, long fill) {
|
||||||
|
|
Loading…
Reference in New Issue