HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1534426 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-10-22 00:20:39 +00:00
parent 4380e480ce
commit df87ed34f2
7 changed files with 182 additions and 21 deletions

View File

@ -332,6 +332,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4511. Cover package org.apache.hadoop.hdfs.tools with unit test
(Andrey Klochkov via jeagles)
HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy.
(Junping Du via szetszwo)
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -95,18 +95,17 @@ DatanodeDescriptor[] chooseTarget(String src,
}
/**
* Verify that the block is replicated on at least minRacks different racks
* if there is more than minRacks rack in the system.
* Verify if the block's placement meets requirement of placement policy,
* i.e. replicas are placed on no less than minRacks racks in the system.
*
* @param srcPath the full pathname of the file to be verified
* @param lBlk block with locations
* @param minRacks number of racks the block should be replicated to
* @return the difference between the required and the actual number of racks
* the block is replicated to.
* @param numOfReplicas replica number of file to be verified
* @return the result of verification
*/
abstract public int verifyBlockPlacement(String srcPath,
LocatedBlock lBlk,
int minRacks);
abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath,
LocatedBlock lBlk,
int numOfReplicas);
/**
* Decide whether deleting the specified replica of the block still makes
* the block conform to the configured block placement policy.

View File

@ -655,22 +655,22 @@ private DatanodeDescriptor[] getPipeline(Node writer,
}
@Override
public int verifyBlockPlacement(String srcPath,
LocatedBlock lBlk,
int minRacks) {
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
LocatedBlock lBlk, int numberOfReplicas) {
DatanodeInfo[] locs = lBlk.getLocations();
if (locs == null)
locs = DatanodeDescriptor.EMPTY_ARRAY;
int numRacks = clusterMap.getNumOfRacks();
if(numRacks <= 1) // only one rack
return 0;
minRacks = Math.min(minRacks, numRacks);
return new BlockPlacementStatusDefault(
Math.min(numRacks, numberOfReplicas), numRacks);
int minRacks = Math.min(2, numberOfReplicas);
// 1. Check that all locations are different.
// 2. Count locations on different racks.
Set<String> racks = new TreeSet<String>();
for (DatanodeInfo dn : locs)
racks.add(dn.getNetworkLocation());
return minRacks - racks.size();
return new BlockPlacementStatusDefault(racks.size(), minRacks);
}
@Override

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface BlockPlacementStatus {
/**
* Boolean value to identify if replicas of this block satisfy requirement of
* placement policy
* @return if replicas satisfy placement policy's requirement
*/
public boolean isPlacementPolicySatisfied();
/**
* Get description info for log or printed in case replicas are failed to meet
* requirement of placement policy
* @return description in case replicas are failed to meet requirement of
* placement policy
*/
public String getErrorDescription();
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
public class BlockPlacementStatusDefault implements BlockPlacementStatus {
private int requiredRacks = 0;
private int currentRacks = 0;
public BlockPlacementStatusDefault(int currentRacks, int requiredRacks){
this.requiredRacks = requiredRacks;
this.currentRacks = currentRacks;
}
@Override
public boolean isPlacementPolicySatisfied() {
return requiredRacks <= currentRacks;
}
@Override
public String getErrorDescription() {
if (isPlacementPolicySatisfied()) {
return null;
}
return "Block should be additionally replicated on " +
(requiredRacks - currentRacks) + " more rack(s).";
}
}

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
@ -374,9 +375,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
locs.length + " replica(s).");
}
// verify block placement policy
int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
if (missingRacks > 0) {
BlockPlacementStatus blockPlacementStatus =
BlockPlacementPolicy.getInstance(conf, null, networktopology).
verifyBlockPlacement(path, lBlk, targetFileReplication);
if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
res.numMisReplicatedBlocks++;
misReplicatedPerFile++;
if (!showFiles) {
@ -385,9 +387,7 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.print(path + ": ");
}
out.println(" Replica placement policy is violated for " +
block +
". Block should be additionally replicated on " +
missingRacks + " more rack(s).");
block + ". " + blockPlacementStatus.getErrorDescription());
}
report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (locs.length == 0) {

View File

@ -83,7 +83,6 @@
import org.junit.Test;
import com.google.common.collect.Sets;
import org.mockito.Mockito;
import static org.mockito.Mockito.*;
/**
@ -892,6 +891,80 @@ public void testFsckMissingReplicas() throws IOException {
}
}
}
/**
* Tests that the # of misreplaced replicas is correct
* @throws IOException
*/
@Test
public void testFsckMisPlacedReplicas() throws IOException {
// Desired replication factor
final short REPL_FACTOR = 2;
// Number of replicas to actually start
short NUM_DN = 2;
// Number of blocks to write
final short NUM_BLOCKS = 3;
// Set a small-ish blocksize
final long blockSize = 512;
String [] racks = {"/rack1", "/rack1"};
String [] hosts = {"host1", "host2"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
try {
// Startup a minicluster
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = (DistributedFileSystem) cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
// Create a file that will be intentionally under-replicated
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
long fileLen = blockSize * NUM_BLOCKS;
DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1);
// Create an under-replicated file
NameNode namenode = cluster.getNameNode();
NetworkTopology nettop = cluster.getNamesystem().getBlockManager()
.getDatanodeManager().getNetworkTopology();
// Add a new node on different rack, so previous blocks' replicas
// are considered to be misplaced
nettop.add(DFSTestUtil.getDatanodeDescriptor("/rack2", "/host3"));
NUM_DN++;
Map<String,String[]> pmap = new HashMap<String, String[]>();
Writer result = new StringWriter();
PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost();
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
NUM_DN, (short)REPL_FACTOR, remoteAddress);
// Run the fsck and check the Result
final HdfsFileStatus file =
namenode.getRpcServer().getFileInfo(pathString);
assertNotNull(file);
Result res = new Result(conf);
fsck.check(pathString, file, res);
// check misReplicatedBlock number.
assertEquals(res.numMisReplicatedBlocks, NUM_BLOCKS);
} finally {
if(dfs != null) {
dfs.close();
}
if(cluster != null) {
cluster.shutdown();
}
}
}
/** Test fsck with FileNotFound */
@Test