MAPREDUCE-7332. Fix SpillCallBackPathsFinder to use JDK7 on branch-2.10. Contributed by Ahmed Hussein.

This commit is contained in:
Jim Brennan 2021-03-26 15:39:59 +00:00
parent 4829e1b7ed
commit 7a3a749663
1 changed files with 21 additions and 26 deletions

View File

@ -24,7 +24,6 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@ -60,13 +59,24 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector {
/**
* Index spill files.
*/
private final Set<Path> indexSpillFiles =
Collections.newSetFromMap(new WeakHashMap<Path, Boolean>());
private final Set<Path> indexSpillFiles = getPosConcSet();
/**
* Paths that were not found in the maps.
*/
private final Set<Path> negativeCache =
Collections.newSetFromMap(new WeakHashMap<Path, Boolean>());
private final Set<Path> negativeCache = getPosConcSet();
private static <T> Set<T> getPosConcSet() {
return Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
}
private static Set<Long> getConcMapValue(
ConcurrentHashMap<Path, Set<Long>> currMap, Path path) {
if (!currMap.containsKey(path)) {
Set<Long> newPositions = getPosConcSet();
currMap.putIfAbsent(path, newPositions);
}
return currMap.get(path);
}
protected ConcurrentHashMap<Path, Set<Long>> getFilesMap(
Configuration config) {
@ -82,15 +92,7 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector {
long outPos = 0;
try {
outPos = out.getPos();
Set<Long> positions = getFilesMap(conf).get(path);
if (positions == null) {
Set<Long> newPositions =
Collections.newSetFromMap(new WeakHashMap<Long, Boolean>());
positions = getFilesMap(conf).putIfAbsent(path, newPositions);
if (positions == null) {
positions = newPositions;
}
}
Set<Long> positions = getConcMapValue(getFilesMap(conf), path);
positions.add(outPos);
} catch (IOException e) {
LOG.debug("writeSpillFileCB.. exception getting position of the stream."
@ -104,23 +106,16 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector {
if (path == null) {
return;
}
Set<Long> pathEntries = getFilesMap(conf).get(path);
if (pathEntries != null) {
ConcurrentHashMap<Path, Set<Long>> currMap = getFilesMap(conf);
if (currMap.containsKey(path)) {
try {
long isPos = CryptoStreamUtils.getInputStreamOffset(is);
Set<Long> pathEntries = currMap.get(path);
if (pathEntries.contains(isPos)) {
LOG.debug("getSpillFileCB... Path {}; Pos: {}", path, isPos);
return;
}
Set<Long> positions = invalidAccessMap.get(path);
if (positions == null) {
Set<Long> newPositions =
Collections.newSetFromMap(new WeakHashMap<Long, Boolean>());
positions = invalidAccessMap.putIfAbsent(path, newPositions);
if (positions == null) {
positions = newPositions;
}
}
Set<Long> positions = getConcMapValue(invalidAccessMap, path);
positions.add(isPos);
LOG.debug("getSpillFileCB... access incorrect position.. "
+ "Path {}; Pos: {}", path, isPos);
@ -181,7 +176,7 @@ public class SpillCallBackPathsFinder extends SpillCallBackInjector {
}
public Set<Path> getEncryptedSpilledFiles() {
return Collections.unmodifiableSet(encryptedSpillFiles.keySet());
return Collections.unmodifiableSet(((Map) encryptedSpillFiles).keySet());
}
/**