HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1499660 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dbfa01ff96
commit
0886f841c5
|
@ -208,6 +208,8 @@ Release 2.1.0-beta - 2013-07-02
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-4626. ClientProtocol#getLinkTarget should throw an exception for
|
HDFS-4626. ClientProtocol#getLinkTarget should throw an exception for
|
||||||
|
|
|
@ -325,4 +325,9 @@
|
||||||
<Field name="modification" />
|
<Field name="modification" />
|
||||||
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.server.datanode.ReplicaInfo" />
|
||||||
|
<Method name="setDirInternal" />
|
||||||
|
<Bug pattern="DM_STRING_CTOR" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -49,6 +48,9 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
|
import org.apache.hadoop.hdfs.util.GSet;
|
||||||
|
import org.apache.hadoop.hdfs.util.LightWeightGSet;
|
||||||
|
import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
@ -82,8 +84,9 @@ class BlockPoolSliceScanner {
|
||||||
|
|
||||||
private final SortedSet<BlockScanInfo> blockInfoSet
|
private final SortedSet<BlockScanInfo> blockInfoSet
|
||||||
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
|
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
|
||||||
private final Map<Block, BlockScanInfo> blockMap
|
private final GSet<Block, BlockScanInfo> blockMap
|
||||||
= new HashMap<Block, BlockScanInfo>();
|
= new LightWeightGSet<Block, BlockScanInfo>(
|
||||||
|
LightWeightGSet.computeCapacity(0.5, "BlockMap"));
|
||||||
|
|
||||||
// processedBlocks keeps track of which blocks are scanned
|
// processedBlocks keeps track of which blocks are scanned
|
||||||
// since the last run.
|
// since the last run.
|
||||||
|
@ -108,7 +111,13 @@ class BlockPoolSliceScanner {
|
||||||
NONE,
|
NONE,
|
||||||
}
|
}
|
||||||
|
|
||||||
static class BlockScanInfo {
|
// Extend Block because in the DN process there's a 1-to-1 correspondence of
|
||||||
|
// BlockScanInfo to Block instances, so by extending rather than containing
|
||||||
|
// Block, we can save a bit of Object overhead (about 24 bytes per block
|
||||||
|
// replica.)
|
||||||
|
static class BlockScanInfo extends Block
|
||||||
|
implements LightWeightGSet.LinkedElement {
|
||||||
|
|
||||||
/** Compare the info by the last scan time. */
|
/** Compare the info by the last scan time. */
|
||||||
static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
|
static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
|
||||||
= new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
|
= new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
|
||||||
|
@ -121,18 +130,18 @@ class BlockPoolSliceScanner {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final Block block;
|
|
||||||
long lastScanTime = 0;
|
long lastScanTime = 0;
|
||||||
ScanType lastScanType = ScanType.NONE;
|
ScanType lastScanType = ScanType.NONE;
|
||||||
boolean lastScanOk = true;
|
boolean lastScanOk = true;
|
||||||
|
private LinkedElement next;
|
||||||
|
|
||||||
BlockScanInfo(Block block) {
|
BlockScanInfo(Block block) {
|
||||||
this.block = block;
|
super(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return block.hashCode();
|
return super.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -142,12 +151,22 @@ class BlockPoolSliceScanner {
|
||||||
} else if (that == null || !(that instanceof BlockScanInfo)) {
|
} else if (that == null || !(that instanceof BlockScanInfo)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return block.equals(((BlockScanInfo)that).block);
|
return super.equals(that);
|
||||||
}
|
}
|
||||||
|
|
||||||
long getLastScanTime() {
|
long getLastScanTime() {
|
||||||
return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
|
return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNext(LinkedElement next) {
|
||||||
|
this.next = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LinkedElement getNext() {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockPoolSliceScanner(String bpid, DataNode datanode,
|
BlockPoolSliceScanner(String bpid, DataNode datanode,
|
||||||
|
@ -203,19 +222,19 @@ class BlockPoolSliceScanner {
|
||||||
|
|
||||||
private synchronized void addBlockInfo(BlockScanInfo info) {
|
private synchronized void addBlockInfo(BlockScanInfo info) {
|
||||||
boolean added = blockInfoSet.add(info);
|
boolean added = blockInfoSet.add(info);
|
||||||
blockMap.put(info.block, info);
|
blockMap.put(info);
|
||||||
|
|
||||||
if (added) {
|
if (added) {
|
||||||
updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
|
updateBytesToScan(info.getNumBytes(), info.lastScanTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void delBlockInfo(BlockScanInfo info) {
|
private synchronized void delBlockInfo(BlockScanInfo info) {
|
||||||
boolean exists = blockInfoSet.remove(info);
|
boolean exists = blockInfoSet.remove(info);
|
||||||
blockMap.remove(info.block);
|
blockMap.remove(info);
|
||||||
|
|
||||||
if (exists) {
|
if (exists) {
|
||||||
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
|
updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -464,7 +483,7 @@ class BlockPoolSliceScanner {
|
||||||
|
|
||||||
private synchronized boolean isFirstBlockProcessed() {
|
private synchronized boolean isFirstBlockProcessed() {
|
||||||
if (!blockInfoSet.isEmpty()) {
|
if (!blockInfoSet.isEmpty()) {
|
||||||
long blockId = blockInfoSet.first().block.getBlockId();
|
long blockId = blockInfoSet.first().getBlockId();
|
||||||
if ((processedBlocks.get(blockId) != null)
|
if ((processedBlocks.get(blockId) != null)
|
||||||
&& (processedBlocks.get(blockId) == 1)) {
|
&& (processedBlocks.get(blockId) == 1)) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -478,7 +497,7 @@ class BlockPoolSliceScanner {
|
||||||
Block block = null;
|
Block block = null;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!blockInfoSet.isEmpty()) {
|
if (!blockInfoSet.isEmpty()) {
|
||||||
block = blockInfoSet.first().block;
|
block = blockInfoSet.first();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ( block != null ) {
|
if ( block != null ) {
|
||||||
|
@ -520,7 +539,7 @@ class BlockPoolSliceScanner {
|
||||||
entry.genStamp));
|
entry.genStamp));
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
if (processedBlocks.get(entry.blockId) == null) {
|
if (processedBlocks.get(entry.blockId) == null) {
|
||||||
updateBytesLeft(-info.block.getNumBytes());
|
updateBytesLeft(-info.getNumBytes());
|
||||||
processedBlocks.put(entry.blockId, 1);
|
processedBlocks.put(entry.blockId, 1);
|
||||||
}
|
}
|
||||||
if (logIterator.isPrevious()) {
|
if (logIterator.isPrevious()) {
|
||||||
|
@ -712,7 +731,7 @@ class BlockPoolSliceScanner {
|
||||||
(info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none";
|
(info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none";
|
||||||
buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
|
buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
|
||||||
" scan time : " +
|
" scan time : " +
|
||||||
"%-15d %s%n", info.block,
|
"%-15d %s%n", info,
|
||||||
(info.lastScanOk ? "ok" : "failed"),
|
(info.lastScanOk ? "ok" : "failed"),
|
||||||
scanType, scanTime,
|
scanType, scanTime,
|
||||||
(scanTime <= 0) ? "not yet verified" :
|
(scanTime <= 0) ? "not yet verified" :
|
||||||
|
|
|
@ -21,6 +21,10 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -29,16 +33,33 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is used by datanodes to maintain meta data of its replicas.
|
* This class is used by datanodes to maintain meta data of its replicas.
|
||||||
* It provides a general interface for meta information of a replica.
|
* It provides a general interface for meta information of a replica.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
abstract public class ReplicaInfo extends Block implements Replica {
|
abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
|
|
||||||
/** volume where the replica belongs */
|
/** volume where the replica belongs */
|
||||||
private FsVolumeSpi volume;
|
private FsVolumeSpi volume;
|
||||||
|
|
||||||
/** directory where block & meta files belong */
|
/** directory where block & meta files belong */
|
||||||
private File dir;
|
|
||||||
|
/**
|
||||||
|
* Base directory containing numerically-identified sub directories and
|
||||||
|
* possibly blocks.
|
||||||
|
*/
|
||||||
|
private File baseDir;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ints representing the sub directory path from base dir to the directory
|
||||||
|
* containing this replica.
|
||||||
|
*/
|
||||||
|
private int[] subDirs;
|
||||||
|
|
||||||
|
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for a zero length replica
|
* Constructor for a zero length replica
|
||||||
|
@ -74,7 +95,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
FsVolumeSpi vol, File dir) {
|
FsVolumeSpi vol, File dir) {
|
||||||
super(blockId, len, genStamp);
|
super(blockId, len, genStamp);
|
||||||
this.volume = vol;
|
this.volume = vol;
|
||||||
this.dir = dir;
|
setDirInternal(dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -122,7 +143,18 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
* @return the parent directory path where this replica is located
|
* @return the parent directory path where this replica is located
|
||||||
*/
|
*/
|
||||||
File getDir() {
|
File getDir() {
|
||||||
return dir;
|
if (subDirs == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (int i : subDirs) {
|
||||||
|
sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
|
||||||
|
sb.append(i);
|
||||||
|
sb.append("/");
|
||||||
|
}
|
||||||
|
File ret = new File(baseDir, sb.toString());
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,7 +162,59 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
* @param dir the parent directory where the replica is located
|
* @param dir the parent directory where the replica is located
|
||||||
*/
|
*/
|
||||||
public void setDir(File dir) {
|
public void setDir(File dir) {
|
||||||
this.dir = dir;
|
setDirInternal(dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setDirInternal(File dir) {
|
||||||
|
if (dir == null) {
|
||||||
|
subDirs = null;
|
||||||
|
baseDir = null;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
|
||||||
|
this.subDirs = replicaDirInfo.subDirs;
|
||||||
|
|
||||||
|
synchronized (internedBaseDirs) {
|
||||||
|
if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
|
||||||
|
// Create a new String path of this file and make a brand new File object
|
||||||
|
// to guarantee we drop the reference to the underlying char[] storage.
|
||||||
|
File baseDir = new File(new String(replicaDirInfo.baseDirPath));
|
||||||
|
internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
|
||||||
|
}
|
||||||
|
this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static class ReplicaDirInfo {
|
||||||
|
@VisibleForTesting
|
||||||
|
public String baseDirPath;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int[] subDirs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static ReplicaDirInfo parseSubDirs(File dir) {
|
||||||
|
ReplicaDirInfo ret = new ReplicaDirInfo();
|
||||||
|
|
||||||
|
File currentDir = dir;
|
||||||
|
List<Integer> subDirList = new ArrayList<Integer>();
|
||||||
|
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
|
||||||
|
// Prepend the integer into the list.
|
||||||
|
subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
|
||||||
|
DataStorage.BLOCK_SUBDIR_PREFIX, "")));
|
||||||
|
currentDir = currentDir.getParentFile();
|
||||||
|
}
|
||||||
|
ret.subDirs = new int[subDirList.size()];
|
||||||
|
for (int i = 0; i < subDirList.size(); i++) {
|
||||||
|
ret.subDirs[i] = subDirList.get(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret.baseDirPath = currentDir.getAbsolutePath();
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
@ -437,4 +439,23 @@ public class TestDatanodeBlockScanner {
|
||||||
blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
|
blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final String BASE_PATH = "/data/current/finalized";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicaInfoParsing() throws Exception {
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH, new int[0]);
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1", new int[]{1});
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir43", new int[]{43});
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3", new int[]{1, 2, 3});
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir43", new int[]{1, 2, 43});
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir23/subdir3", new int[]{1, 23, 3});
|
||||||
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir13/subdir2/subdir3", new int[]{13, 2, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void testReplicaInfoParsingSingle(String subDirPath, int[] expectedSubDirs) {
|
||||||
|
File testFile = new File(subDirPath);
|
||||||
|
assertArrayEquals(expectedSubDirs, ReplicaInfo.parseSubDirs(testFile).subDirs);
|
||||||
|
assertEquals(BASE_PATH, ReplicaInfo.parseSubDirs(testFile).baseDirPath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue