From 7a3a749663c0a5eb98be718064307b98e9f97d8c Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Fri, 26 Mar 2021 15:39:59 +0000 Subject: [PATCH] MAPREDUCE-7332. Fix SpillCallBackPathsFinder to use JDK7 on branch-2.10. Contributed by Ahmed Hussein. --- .../security/SpillCallBackPathsFinder.java | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java index 68a6d7360da..3359dbc1a11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java @@ -24,7 +24,6 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -60,13 +59,24 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector { /** * Index spill files. */ - private final Set indexSpillFiles = - Collections.newSetFromMap(new WeakHashMap()); + private final Set indexSpillFiles = getPosConcSet(); /** * Paths that were not found in the maps. */ - private final Set negativeCache = - Collections.newSetFromMap(new WeakHashMap()); + private final Set negativeCache = getPosConcSet(); + + private static Set getPosConcSet() { + return Collections.newSetFromMap(new ConcurrentHashMap()); + } + + private static Set getConcMapValue( + ConcurrentHashMap> currMap, Path path) { + if (!currMap.containsKey(path)) { + Set newPositions = getPosConcSet(); + currMap.putIfAbsent(path, newPositions); + } + return currMap.get(path); + } protected ConcurrentHashMap> getFilesMap( Configuration config) { @@ -82,15 +92,7 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector { long outPos = 0; try { outPos = out.getPos(); - Set positions = getFilesMap(conf).get(path); - if (positions == null) { - Set newPositions = - Collections.newSetFromMap(new WeakHashMap()); - positions = getFilesMap(conf).putIfAbsent(path, newPositions); - if (positions == null) { - positions = newPositions; - } - } + Set positions = getConcMapValue(getFilesMap(conf), path); positions.add(outPos); } catch (IOException e) { LOG.debug("writeSpillFileCB.. exception getting position of the stream." @@ -104,23 +106,16 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector { if (path == null) { return; } - Set pathEntries = getFilesMap(conf).get(path); - if (pathEntries != null) { + ConcurrentHashMap> currMap = getFilesMap(conf); + if (currMap.containsKey(path)) { try { long isPos = CryptoStreamUtils.getInputStreamOffset(is); + Set pathEntries = currMap.get(path); if (pathEntries.contains(isPos)) { LOG.debug("getSpillFileCB... Path {}; Pos: {}", path, isPos); return; } - Set positions = invalidAccessMap.get(path); - if (positions == null) { - Set newPositions = - Collections.newSetFromMap(new WeakHashMap()); - positions = invalidAccessMap.putIfAbsent(path, newPositions); - if (positions == null) { - positions = newPositions; - } - } + Set positions = getConcMapValue(invalidAccessMap, path); positions.add(isPos); LOG.debug("getSpillFileCB... access incorrect position.. " + "Path {}; Pos: {}", path, isPos); @@ -181,7 +176,7 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector { } public Set getEncryptedSpilledFiles() { - return Collections.unmodifiableSet(encryptedSpillFiles.keySet()); + return Collections.unmodifiableSet(((Map) encryptedSpillFiles).keySet()); } /**