From 6eb7c282930c2d55c20b5aac84bfcabf3e98c000 Mon Sep 17 00:00:00 2001 From: Eric Badger Date: Mon, 8 Jun 2020 20:03:02 +0000 Subject: [PATCH] Revert "MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles)." This reverts commit 568f185bf8c27887d3f4c2559b301275d4bfa486. --- .../org/apache/hadoop/mapred/IndexCache.java | 101 +++++++----------- .../apache/hadoop/mapred/TestIndexCache.java | 60 +++++------ 2 files changed, 61 insertions(+), 100 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java index 80cbcca4e27..c3db9514d31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java @@ -22,17 +22,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class IndexCache { private final JobConf conf; private final int totalMemoryAllowed; private AtomicInteger totalMemoryUsed = new AtomicInteger(); - private static final Logger LOG = LoggerFactory.getLogger(IndexCache.class); + private static final Log LOG = LogFactory.getLog(IndexCache.class); private final ConcurrentHashMap cache = new ConcurrentHashMap(); @@ -72,12 +72,11 @@ class IndexCache { try { info.wait(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting for construction", e); } } } - LOG.debug("IndexCache HIT: MapId {} found", mapId); + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); } if (info.mapSpillRecord.size() == 0 || @@ -107,91 +106,63 @@ class IndexCache { try { info.wait(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting for construction", e); } } } - LOG.debug("IndexCache HIT: MapId {} found", mapId); + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); return info; } - LOG.debug("IndexCache MISS: MapId {} not found", mapId); + LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; SpillRecord tmp = null; - boolean success = false; try { tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner); - success = true; - } catch (Throwable e) { + } catch (Throwable e) { tmp = new SpillRecord(0); cache.remove(mapId); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } throw new IOException("Error Reading IndexFile", e); - } finally { - synchronized (newInd) { + } finally { + synchronized (newInd) { newInd.mapSpillRecord = tmp; - if (success) { - // Only add mapId to the queue for successful read and after added to - // the cache. Once in the queue, it is now eligible for removal once - // construction is finished. - queue.add(mapId); - if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) { - freeIndexInformation(); - } - } newInd.notifyAll(); } } - + queue.add(mapId); + + if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) { + freeIndexInformation(); + } return newInd; } /** - * This method removes the map from the cache if it is present in the queue. + * This method removes the map from the cache if index information for this + * map is loaded(size>0), index information entry in cache will not be + * removed if it is in the loading phrase(size=0), this prevents corruption + * of totalMemoryUsed. It should be called when a map output on this tracker + * is discarded. * @param mapId The taskID of this map. */ - public void removeMap(String mapId) throws IOException { - // Successfully removing the mapId from the queue enters into a contract - // that this thread will remove the corresponding mapId from the cache. - if (!queue.remove(mapId)) { - LOG.debug("Map ID {} not found in queue", mapId); + public void removeMap(String mapId) { + IndexInformation info = cache.get(mapId); + if (info == null || isUnderConstruction(info)) { return; } - removeMapInternal(mapId); - } - - /** This method should only be called upon successful removal of mapId from - * the queue. The mapId will be removed from the cache and totalUsedMemory - * will be decremented. - * @param mapId the cache item to be removed - * @throws IOException - */ - private void removeMapInternal(String mapId) throws IOException { - IndexInformation info = cache.remove(mapId); - if (info == null) { - // Inconsistent state as presence in queue implies presence in cache - LOG.warn("Map ID " + mapId + " not found in cache"); - return; - } - try { - synchronized(info) { - while (isUnderConstruction(info)) { - info.wait(); - } - totalMemoryUsed.getAndAdd(-info.getSize()); + info = cache.remove(mapId); + if (info != null) { + totalMemoryUsed.addAndGet(-info.getSize()); + if (!queue.remove(mapId)) { + LOG.warn("Map ID" + mapId + " not found in queue!!"); } - } catch (InterruptedException e) { - totalMemoryUsed.getAndAdd(-info.getSize()); - Thread.currentThread().interrupt(); - throw new IOException("Interrupted waiting for construction", e); + } else { + LOG.info("Map ID " + mapId + " not found in cache"); } } /** - * This method checks if cache and totalMemoryUsed is consistent. + * This method checks if cache and totolMemoryUsed is consistent. * It is only used for unit test. - * @return True if cache and totalMemoryUsed is consistent + * @return True if cache and totolMemoryUsed is consistent */ boolean checkTotalMemoryUsed() { int totalSize = 0; @@ -204,13 +175,13 @@ class IndexCache { /** * Bring memory usage below totalMemoryAllowed. */ - private synchronized void freeIndexInformation() throws IOException { + private synchronized void freeIndexInformation() { while (totalMemoryUsed.get() > totalMemoryAllowed) { - if(queue.isEmpty()) { - break; + String s = queue.remove(); + IndexInformation info = cache.remove(s); + if (info != null) { + totalMemoryUsed.addAndGet(-info.getSize()); } - String mapId = queue.remove(); - removeMapInternal(mapId); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java index ae97d97c6a2..b6a2df08833 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java @@ -21,7 +21,6 @@ import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.CRC32; import java.util.zip.CheckedOutputStream; @@ -211,32 +210,23 @@ public class TestIndexCache extends TestCase { final String user = UserGroupInformation.getCurrentUser().getShortUserName(); writeFile(fs, big, bytesPerFile, partsPerMap); - - // Capture if any runtime exception occurred - final AtomicBoolean failed = new AtomicBoolean(); - + // run multiple times for (int i = 0; i < 20; ++i) { Thread getInfoThread = new Thread() { @Override public void run() { try { - cache.getIndexInformation("bigIndex", 0, big, user); + cache.getIndexInformation("bigIndex", partsPerMap, big, user); } catch (Exception e) { // should not be here - failed.set(true); } } }; Thread removeMapThread = new Thread() { @Override public void run() { - try { - cache.removeMap("bigIndex"); - } catch (Exception e) { - // should not be here - failed.set(true); - } + cache.removeMap("bigIndex"); } }; if (i%2==0) { @@ -248,9 +238,8 @@ public class TestIndexCache extends TestCase { } getInfoThread.join(); removeMapThread.join(); - assertFalse("An unexpected exception", failed.get()); - assertTrue(cache.checkTotalMemoryUsed()); - } + assertEquals(true, cache.checkTotalMemoryUsed()); + } } public void testCreateRace() throws Exception { @@ -265,9 +254,6 @@ public class TestIndexCache extends TestCase { UserGroupInformation.getCurrentUser().getShortUserName(); writeFile(fs, racy, bytesPerFile, partsPerMap); - // Capture if any runtime exception occurred - final AtomicBoolean failed = new AtomicBoolean(); - // run multiple instances Thread[] getInfoThreads = new Thread[50]; for (int i = 0; i < 50; i++) { @@ -275,15 +261,10 @@ public class TestIndexCache extends TestCase { @Override public void run() { try { - while (!Thread.currentThread().isInterrupted()) { - cache.getIndexInformation("racyIndex", 0, racy, user); - cache.removeMap("racyIndex"); - } + cache.getIndexInformation("racyIndex", partsPerMap, racy, user); + cache.removeMap("racyIndex"); } catch (Exception e) { - if (!Thread.currentThread().isInterrupted()) { - // should not be here - failed.set(true); - } + // should not be here } } }; @@ -293,12 +274,20 @@ public class TestIndexCache extends TestCase { getInfoThreads[i].start(); } - // The duration to keep the threads testing - Thread.sleep(5000); + final Thread mainTestThread = Thread.currentThread(); + + Thread timeoutThread = new Thread() { + @Override + public void run() { + try { + Thread.sleep(15000); + mainTestThread.interrupt(); + } catch (InterruptedException ie) { + // we are done; + } + } + }; - for (int i = 0; i < 50; i++) { - getInfoThreads[i].interrupt(); - } for (int i = 0; i < 50; i++) { try { getInfoThreads[i].join(); @@ -307,9 +296,10 @@ public class TestIndexCache extends TestCase { fail("Unexpectedly long delay during concurrent cache entry creations"); } } - assertFalse("An unexpected exception", failed.get()); - assertTrue("Total memory used does not represent contents of the cache", - cache.checkTotalMemoryUsed()); + // stop the timeoutThread. If we get interrupted before stopping, there + // must be something wrong, although it wasn't a deadlock. No need to + // catch and swallow. + timeoutThread.interrupt(); } private static void checkRecord(IndexRecord rec, long fill) {