diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java index 0e24bbe5330..80cbcca4e27 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java @@ -72,11 +72,12 @@ class IndexCache { try { info.wait(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting for construction", e); } } } - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + LOG.debug("IndexCache HIT: MapId {} found", mapId); } if (info.mapSpillRecord.size() == 0 || @@ -106,63 +107,91 @@ class IndexCache { try { info.wait(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting for construction", e); } } } - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + LOG.debug("IndexCache HIT: MapId {} found", mapId); return info; } - LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; + LOG.debug("IndexCache MISS: MapId {} not found", mapId); SpillRecord tmp = null; + boolean success = false; try { tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner); - } catch (Throwable e) { + success = true; + } catch (Throwable e) { tmp = new SpillRecord(0); cache.remove(mapId); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException("Error Reading IndexFile", e); - } finally { - synchronized (newInd) { + } finally { + synchronized (newInd) { newInd.mapSpillRecord = tmp; + if (success) { + // Only add mapId to the queue for successful read and after added to + // the cache. Once in the queue, it is now eligible for removal once + // construction is finished. + queue.add(mapId); + if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) { + freeIndexInformation(); + } + } newInd.notifyAll(); } } - queue.add(mapId); - - if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) { - freeIndexInformation(); - } + return newInd; } /** - * This method removes the map from the cache if index information for this - * map is loaded(size>0), index information entry in cache will not be - * removed if it is in the loading phrase(size=0), this prevents corruption - * of totalMemoryUsed. It should be called when a map output on this tracker - * is discarded. + * This method removes the map from the cache if it is present in the queue. * @param mapId The taskID of this map. */ - public void removeMap(String mapId) { - IndexInformation info = cache.get(mapId); - if (info == null || isUnderConstruction(info)) { + public void removeMap(String mapId) throws IOException { + // Successfully removing the mapId from the queue enters into a contract + // that this thread will remove the corresponding mapId from the cache. + if (!queue.remove(mapId)) { + LOG.debug("Map ID {} not found in queue", mapId); return; } - info = cache.remove(mapId); - if (info != null) { - totalMemoryUsed.addAndGet(-info.getSize()); - if (!queue.remove(mapId)) { - LOG.warn("Map ID" + mapId + " not found in queue!!"); + removeMapInternal(mapId); + } + + /** This method should only be called upon successful removal of mapId from + * the queue. The mapId will be removed from the cache and totalUsedMemory + * will be decremented. + * @param mapId the cache item to be removed + * @throws IOException + */ + private void removeMapInternal(String mapId) throws IOException { + IndexInformation info = cache.remove(mapId); + if (info == null) { + // Inconsistent state as presence in queue implies presence in cache + LOG.warn("Map ID " + mapId + " not found in cache"); + return; + } + try { + synchronized(info) { + while (isUnderConstruction(info)) { + info.wait(); + } + totalMemoryUsed.getAndAdd(-info.getSize()); } - } else { - LOG.info("Map ID " + mapId + " not found in cache"); + } catch (InterruptedException e) { + totalMemoryUsed.getAndAdd(-info.getSize()); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted waiting for construction", e); } } /** - * This method checks if cache and totolMemoryUsed is consistent. + * This method checks if cache and totalMemoryUsed is consistent. * It is only used for unit test. - * @return True if cache and totolMemoryUsed is consistent + * @return True if cache and totalMemoryUsed is consistent */ boolean checkTotalMemoryUsed() { int totalSize = 0; @@ -175,13 +204,13 @@ class IndexCache { /** * Bring memory usage below totalMemoryAllowed. */ - private synchronized void freeIndexInformation() { + private synchronized void freeIndexInformation() throws IOException { while (totalMemoryUsed.get() > totalMemoryAllowed) { - String s = queue.remove(); - IndexInformation info = cache.remove(s); - if (info != null) { - totalMemoryUsed.addAndGet(-info.getSize()); + if(queue.isEmpty()) { + break; } + String mapId = queue.remove(); + removeMapInternal(mapId); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java index 0cc3c662207..e7b691517ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java @@ -21,6 +21,7 @@ import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.CRC32; import java.util.zip.CheckedOutputStream; @@ -216,23 +217,32 @@ public class TestIndexCache { final String user = UserGroupInformation.getCurrentUser().getShortUserName(); writeFile(fs, big, bytesPerFile, partsPerMap); - + + // Capture if any runtime exception occurred + AtomicBoolean failed = new AtomicBoolean(); + // run multiple times for (int i = 0; i < 20; ++i) { Thread getInfoThread = new Thread() { @Override public void run() { try { - cache.getIndexInformation("bigIndex", partsPerMap, big, user); + cache.getIndexInformation("bigIndex", 0, big, user); } catch (Exception e) { // should not be here + failed.set(true); } } }; Thread removeMapThread = new Thread() { @Override public void run() { - cache.removeMap("bigIndex"); + try { + cache.removeMap("bigIndex"); + } catch (Exception e) { + // should not be here + failed.set(true); + } } }; if (i%2==0) { @@ -245,6 +255,7 @@ public class TestIndexCache { getInfoThread.join(); removeMapThread.join(); assertEquals(true, cache.checkTotalMemoryUsed()); + assertFalse("An unexpected exception", failed.get()); } } @@ -261,6 +272,9 @@ public class TestIndexCache { UserGroupInformation.getCurrentUser().getShortUserName(); writeFile(fs, racy, bytesPerFile, partsPerMap); + // Capture if any runtime exception occurred + AtomicBoolean failed = new AtomicBoolean(); + // run multiple instances Thread[] getInfoThreads = new Thread[50]; for (int i = 0; i < 50; i++) { @@ -268,10 +282,15 @@ public class TestIndexCache { @Override public void run() { try { - cache.getIndexInformation("racyIndex", partsPerMap, racy, user); - cache.removeMap("racyIndex"); + while (!Thread.currentThread().isInterrupted()) { + cache.getIndexInformation("racyIndex", 0, racy, user); + cache.removeMap("racyIndex"); + } } catch (Exception e) { - // should not be here + if (!Thread.currentThread().isInterrupted()) { + // should not be here + failed.set(true); + } } } }; @@ -281,20 +300,12 @@ public class TestIndexCache { getInfoThreads[i].start(); } - final Thread mainTestThread = Thread.currentThread(); - - Thread timeoutThread = new Thread() { - @Override - public void run() { - try { - Thread.sleep(15000); - mainTestThread.interrupt(); - } catch (InterruptedException ie) { - // we are done; - } - } - }; + // The duration to keep the threads testing + Thread.sleep(5000); + for (int i = 0; i < 50; i++) { + getInfoThreads[i].interrupt(); + } for (int i = 0; i < 50; i++) { try { getInfoThreads[i].join(); @@ -303,10 +314,9 @@ public class TestIndexCache { fail("Unexpectedly long delay during concurrent cache entry creations"); } } - // stop the timeoutThread. If we get interrupted before stopping, there - // must be something wrong, although it wasn't a deadlock. No need to - // catch and swallow. - timeoutThread.interrupt(); + assertFalse("An unexpected exception", failed.get()); + assertTrue("Total memory used does not represent contents of the cache", + cache.checkTotalMemoryUsed()); } private static void checkRecord(IndexRecord rec, long fill) {