HDFS-11047. Remove deep copies of FinalizedReplica to alleviate heap consumption on DataNode. Contributed by Xiaobing Zhou

(cherry picked from commit 2848a27700)
This commit is contained in:
Mingliang Liu 2016-10-28 11:11:31 -07:00
parent a8a99f3423
commit 616c1194fe
3 changed files with 28 additions and 12 deletions

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -593,14 +594,13 @@ public class DirectoryScanner implements Runnable {
diffs.put(bpid, diffRecord);
statsRecord.totalBlocks = blockpoolReport.length;
List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
Arrays.sort(memReport); // Sort based on blockId
final List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
Collections.sort(bl); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) {
FinalizedReplica memBlock = memReport[m];
while (m < bl.size() && d < blockpoolReport.length) {
FinalizedReplica memBlock = bl.get(m);
ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
@ -647,8 +647,8 @@ public class DirectoryScanner implements Runnable {
++m;
}
}
while (m < memReport.length) {
FinalizedReplica current = memReport[m++];
while (m < bl.size()) {
FinalizedReplica current = bl.get(m++);
addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume());
}

View File

@ -230,7 +230,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
VolumeFailureSummary getVolumeFailureSummary();
/** @return a list of finalized blocks for the given block pool. */
/**
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool.
*/
List<FinalizedReplica> getFinalizedBlocks(String bpid);
/** @return a list of finalized blocks for the given block pool. */

View File

@ -1840,16 +1840,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
* Get the list of finalized blocks from in-memory blockmap for a block pool.
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool.
*/
@Override
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ArrayList<FinalizedReplica> finalized =
final ArrayList<FinalizedReplica> finalized =
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if (b.getState() == ReplicaState.FINALIZED) {
finalized.add(new FinalizedReplica((FinalizedReplica) b));
finalized.add((FinalizedReplica)b);
}
}
return finalized;