HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1499659 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
33fe54a25f
commit
5428bfbf53
|
@ -426,6 +426,8 @@ Release 2.1.0-beta - 2013-07-02
|
|||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-4626. ClientProtocol#getLinkTarget should throw an exception for
|
||||
|
|
|
@ -325,4 +325,9 @@
|
|||
<Field name="modification" />
|
||||
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.server.datanode.ReplicaInfo" />
|
||||
<Method name="setDirInternal" />
|
||||
<Bug pattern="DM_STRING_CTOR" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Date;
|
|||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -49,6 +48,9 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.hdfs.util.GSet;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightGSet;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
|
@ -82,8 +84,9 @@ class BlockPoolSliceScanner {
|
|||
|
||||
private final SortedSet<BlockScanInfo> blockInfoSet
|
||||
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
|
||||
private final Map<Block, BlockScanInfo> blockMap
|
||||
= new HashMap<Block, BlockScanInfo>();
|
||||
private final GSet<Block, BlockScanInfo> blockMap
|
||||
= new LightWeightGSet<Block, BlockScanInfo>(
|
||||
LightWeightGSet.computeCapacity(0.5, "BlockMap"));
|
||||
|
||||
// processedBlocks keeps track of which blocks are scanned
|
||||
// since the last run.
|
||||
|
@ -108,7 +111,13 @@ class BlockPoolSliceScanner {
|
|||
NONE,
|
||||
}
|
||||
|
||||
static class BlockScanInfo {
|
||||
// Extend Block because in the DN process there's a 1-to-1 correspondence of
|
||||
// BlockScanInfo to Block instances, so by extending rather than containing
|
||||
// Block, we can save a bit of Object overhead (about 24 bytes per block
|
||||
// replica.)
|
||||
static class BlockScanInfo extends Block
|
||||
implements LightWeightGSet.LinkedElement {
|
||||
|
||||
/** Compare the info by the last scan time. */
|
||||
static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
|
||||
= new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
|
||||
|
@ -121,18 +130,18 @@ class BlockPoolSliceScanner {
|
|||
}
|
||||
};
|
||||
|
||||
final Block block;
|
||||
long lastScanTime = 0;
|
||||
ScanType lastScanType = ScanType.NONE;
|
||||
boolean lastScanOk = true;
|
||||
private LinkedElement next;
|
||||
|
||||
BlockScanInfo(Block block) {
|
||||
this.block = block;
|
||||
super(block);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return block.hashCode();
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,12 +151,22 @@ class BlockPoolSliceScanner {
|
|||
} else if (that == null || !(that instanceof BlockScanInfo)) {
|
||||
return false;
|
||||
}
|
||||
return block.equals(((BlockScanInfo)that).block);
|
||||
return super.equals(that);
|
||||
}
|
||||
|
||||
long getLastScanTime() {
|
||||
return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNext(LinkedElement next) {
|
||||
this.next = next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkedElement getNext() {
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
BlockPoolSliceScanner(String bpid, DataNode datanode,
|
||||
|
@ -203,19 +222,19 @@ class BlockPoolSliceScanner {
|
|||
|
||||
private synchronized void addBlockInfo(BlockScanInfo info) {
|
||||
boolean added = blockInfoSet.add(info);
|
||||
blockMap.put(info.block, info);
|
||||
blockMap.put(info);
|
||||
|
||||
if (added) {
|
||||
updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
|
||||
updateBytesToScan(info.getNumBytes(), info.lastScanTime);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void delBlockInfo(BlockScanInfo info) {
|
||||
boolean exists = blockInfoSet.remove(info);
|
||||
blockMap.remove(info.block);
|
||||
blockMap.remove(info);
|
||||
|
||||
if (exists) {
|
||||
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
|
||||
updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -464,7 +483,7 @@ class BlockPoolSliceScanner {
|
|||
|
||||
private synchronized boolean isFirstBlockProcessed() {
|
||||
if (!blockInfoSet.isEmpty()) {
|
||||
long blockId = blockInfoSet.first().block.getBlockId();
|
||||
long blockId = blockInfoSet.first().getBlockId();
|
||||
if ((processedBlocks.get(blockId) != null)
|
||||
&& (processedBlocks.get(blockId) == 1)) {
|
||||
return true;
|
||||
|
@ -478,7 +497,7 @@ class BlockPoolSliceScanner {
|
|||
Block block = null;
|
||||
synchronized (this) {
|
||||
if (!blockInfoSet.isEmpty()) {
|
||||
block = blockInfoSet.first().block;
|
||||
block = blockInfoSet.first();
|
||||
}
|
||||
}
|
||||
if ( block != null ) {
|
||||
|
@ -520,7 +539,7 @@ class BlockPoolSliceScanner {
|
|||
entry.genStamp));
|
||||
if (info != null) {
|
||||
if (processedBlocks.get(entry.blockId) == null) {
|
||||
updateBytesLeft(-info.block.getNumBytes());
|
||||
updateBytesLeft(-info.getNumBytes());
|
||||
processedBlocks.put(entry.blockId, 1);
|
||||
}
|
||||
if (logIterator.isPrevious()) {
|
||||
|
@ -712,7 +731,7 @@ class BlockPoolSliceScanner {
|
|||
(info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none";
|
||||
buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
|
||||
" scan time : " +
|
||||
"%-15d %s%n", info.block,
|
||||
"%-15d %s%n", info,
|
||||
(info.lastScanOk ? "ok" : "failed"),
|
||||
scanType, scanTime,
|
||||
(scanTime <= 0) ? "not yet verified" :
|
||||
|
|
|
@ -21,6 +21,10 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
|
@ -29,16 +33,33 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This class is used by datanodes to maintain meta data of its replicas.
|
||||
* It provides a general interface for meta information of a replica.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract public class ReplicaInfo extends Block implements Replica {
|
||||
|
||||
/** volume where the replica belongs */
|
||||
private FsVolumeSpi volume;
|
||||
|
||||
/** directory where block & meta files belong */
|
||||
private File dir;
|
||||
|
||||
/**
|
||||
* Base directory containing numerically-identified sub directories and
|
||||
* possibly blocks.
|
||||
*/
|
||||
private File baseDir;
|
||||
|
||||
/**
|
||||
* Ints representing the sub directory path from base dir to the directory
|
||||
* containing this replica.
|
||||
*/
|
||||
private int[] subDirs;
|
||||
|
||||
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
|
||||
|
||||
/**
|
||||
* Constructor for a zero length replica
|
||||
|
@ -74,7 +95,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp);
|
||||
this.volume = vol;
|
||||
this.dir = dir;
|
||||
setDirInternal(dir);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -122,7 +143,18 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
* @return the parent directory path where this replica is located
|
||||
*/
|
||||
File getDir() {
|
||||
return dir;
|
||||
if (subDirs == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i : subDirs) {
|
||||
sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
|
||||
sb.append(i);
|
||||
sb.append("/");
|
||||
}
|
||||
File ret = new File(baseDir, sb.toString());
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -130,7 +162,59 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
* @param dir the parent directory where the replica is located
|
||||
*/
|
||||
public void setDir(File dir) {
|
||||
this.dir = dir;
|
||||
setDirInternal(dir);
|
||||
}
|
||||
|
||||
private void setDirInternal(File dir) {
|
||||
if (dir == null) {
|
||||
subDirs = null;
|
||||
baseDir = null;
|
||||
return;
|
||||
}
|
||||
|
||||
ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
|
||||
this.subDirs = replicaDirInfo.subDirs;
|
||||
|
||||
synchronized (internedBaseDirs) {
|
||||
if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
|
||||
// Create a new String path of this file and make a brand new File object
|
||||
// to guarantee we drop the reference to the underlying char[] storage.
|
||||
File baseDir = new File(new String(replicaDirInfo.baseDirPath));
|
||||
internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
|
||||
}
|
||||
this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static class ReplicaDirInfo {
|
||||
@VisibleForTesting
|
||||
public String baseDirPath;
|
||||
|
||||
@VisibleForTesting
|
||||
public int[] subDirs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static ReplicaDirInfo parseSubDirs(File dir) {
|
||||
ReplicaDirInfo ret = new ReplicaDirInfo();
|
||||
|
||||
File currentDir = dir;
|
||||
List<Integer> subDirList = new ArrayList<Integer>();
|
||||
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
|
||||
// Prepend the integer into the list.
|
||||
subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
|
||||
DataStorage.BLOCK_SUBDIR_PREFIX, "")));
|
||||
currentDir = currentDir.getParentFile();
|
||||
}
|
||||
ret.subDirs = new int[subDirList.size()];
|
||||
for (int i = 0; i < subDirList.size(); i++) {
|
||||
ret.subDirs[i] = subDirList.get(i);
|
||||
}
|
||||
|
||||
ret.baseDirPath = currentDir.getAbsolutePath();
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -437,4 +439,23 @@ public class TestDatanodeBlockScanner {
|
|||
blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
|
||||
}
|
||||
}
|
||||
|
||||
private static final String BASE_PATH = "/data/current/finalized";
|
||||
|
||||
@Test
|
||||
public void testReplicaInfoParsing() throws Exception {
|
||||
testReplicaInfoParsingSingle(BASE_PATH, new int[0]);
|
||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1", new int[]{1});
|
||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir43", new int[]{43});
|
||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3", new int[]{1, 2, 3});
|
||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir43", new int[]{1, 2, 43});
|
||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir23/subdir3", new int[]{1, 23, 3});
|
||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir13/subdir2/subdir3", new int[]{13, 2, 3});
|
||||
}
|
||||
|
||||
private static void testReplicaInfoParsingSingle(String subDirPath, int[] expectedSubDirs) {
|
||||
File testFile = new File(subDirPath);
|
||||
assertArrayEquals(expectedSubDirs, ReplicaInfo.parseSubDirs(testFile).subDirs);
|
||||
assertEquals(BASE_PATH, ReplicaInfo.parseSubDirs(testFile).baseDirPath);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue