HDFS-8567. Erasure Coding: SafeMode handles file smaller than a full stripe. Contributed by Walter Su.

This commit is contained in:
Jing Zhao 2015-06-23 22:29:28 -07:00
parent 8c423a8163
commit d920cdab91
6 changed files with 181 additions and 33 deletions

View File

@ -317,3 +317,6 @@
HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have
corrupt blocks. (jing9 and Kai Sasaki) corrupt blocks. (jing9 and Kai Sasaki)
HDFS-8567. Erasure Coding: SafeMode handles file smaller than a full stripe.
(Walter Su via jing9)

View File

@ -73,6 +73,24 @@ public class BlockInfoStriped extends BlockInfo {
return (short) this.schema.getNumParityUnits(); return (short) this.schema.getNumParityUnits();
} }
/**
* If the block is committed/completed and its length is less than a full
* stripe, it returns the the number of actual data blocks.
* Otherwise it returns the number of data units specified by schema.
*/
public short getRealDataBlockNum() {
if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
return (short) Math.min(getDataBlockNum(),
(getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
} else {
return getDataBlockNum();
}
}
public short getRealTotalBlockNum() {
return (short) (getRealDataBlockNum() + getParityBlockNum());
}
public ECSchema getSchema() { public ECSchema getSchema() {
return schema; return schema;
} }

View File

@ -592,13 +592,16 @@ public class BlockManager {
} }
public int getDefaultStorageNum(BlockInfo block) { public int getDefaultStorageNum(BlockInfo block) {
return block.isStriped() ? if (block.isStriped()) {
((BlockInfoStriped) block).getTotalBlockNum() : defaultReplication; return ((BlockInfoStriped) block).getRealTotalBlockNum();
} else {
return defaultReplication;
}
} }
public short getMinStorageNum(BlockInfo block) { public short getMinStorageNum(BlockInfo block) {
if (block.isStriped()) { if (block.isStriped()) {
return getStripedDataBlockNum(block); return ((BlockInfoStriped) block).getRealDataBlockNum();
} else { } else {
return minReplication; return minReplication;
} }
@ -707,7 +710,7 @@ public class BlockManager {
// OP_CLOSE edit on the standby). // OP_CLOSE edit on the standby).
namesystem.adjustSafeModeBlockTotals(0, 1); namesystem.adjustSafeModeBlockTotals(0, 1);
final int minStorage = curBlock.isStriped() ? final int minStorage = curBlock.isStriped() ?
((BlockInfoStriped) curBlock).getDataBlockNum() : minReplication; ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
namesystem.incrementSafeBlockCount( namesystem.incrementSafeBlockCount(
Math.min(numNodes, minStorage), curBlock); Math.min(numNodes, minStorage), curBlock);
@ -3854,27 +3857,12 @@ public class BlockManager {
public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) { public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
if (block.isStriped()) { if (block.isStriped()) {
return (short) (getStripedDataBlockNum(block) + return ((BlockInfoStriped) block).getRealTotalBlockNum();
((BlockInfoStriped) block).getParityBlockNum());
} else { } else {
return bc.getPreferredBlockReplication(); return bc.getPreferredBlockReplication();
} }
} }
short getStripedDataBlockNum(BlockInfo block) {
assert block.isStriped();
final BlockInfoStriped sblock = (BlockInfoStriped) block;
short dataBlockNum = sblock.getDataBlockNum();
if (sblock.isComplete() ||
sblock.getBlockUCState() == BlockUCState.COMMITTED) {
// if the sblock is committed/completed and its length is less than a
// full stripe, the minimum storage number needs to be adjusted
dataBlockNum = (short) Math.min(dataBlockNum,
(sblock.getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
}
return dataBlockNum;
}
public long getMissingBlocksCount() { public long getMissingBlocksCount() {
// not locking // not locking
return this.neededReplications.getCorruptBlockSize(); return this.neededReplications.getCorruptBlockSize();

View File

@ -4878,7 +4878,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private synchronized void incrementSafeBlockCount(short storageNum, private synchronized void incrementSafeBlockCount(short storageNum,
BlockInfo storedBlock) { BlockInfo storedBlock) {
final int safe = storedBlock.isStriped() ? final int safe = storedBlock.isStriped() ?
((BlockInfoStriped) storedBlock).getDataBlockNum() : safeReplication; ((BlockInfoStriped) storedBlock).getRealDataBlockNum() : safeReplication;
if (storageNum == safe) { if (storageNum == safe) {
this.blockSafe++; this.blockSafe++;

View File

@ -553,17 +553,6 @@ public class TestSafeMode {
} }
} }
//TODO : test should be added to check safeMode with stripedBloks after stripedBlock related functions have been added in class MiniDFSCluster
@Test
public void testSafeModeWithCorruptSripedBlock() throws IOException {
try {
} finally {
if(fs != null) fs.close();
if(cluster!= null) cluster.shutdown();
}
}
void checkGetBlockLocationsWorks(FileSystem fs, Path fileName) throws IOException { void checkGetBlockLocationsWorks(FileSystem fs, Path fileName) throws IOException {
FileStatus stat = fs.getFileStatus(fileName); FileStatus stat = fs.getFileStatus(fileName);
try { try {

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestSafeModeWithStripedFile {
static final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
static final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
static final int numDNs = DATA_BLK_NUM + PARITY_BLK_NUM;
static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
static final int blockSize = cellSize * 2;
static MiniDFSCluster cluster;
static Configuration conf;
@Before
public void setup() throws IOException {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/",
null, cellSize);
cluster.waitActive();
}
@After
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testStripedFile0() throws IOException {
doTest(cellSize, 1);
}
@Test
public void testStripedFile1() throws IOException {
doTest(cellSize * 5, 5);
}
/**
* This util writes a small block group whose size is given by caller.
* Then write another 2 full stripe blocks.
* Then shutdown all DNs and start again one by one. and verify the safemode
* status accordingly.
*
* @param smallSize file size of the small block group
* @param minStorages minimum replicas needed by the block so it can be safe
*/
private void doTest(int smallSize, int minStorages) throws IOException {
FileSystem fs = cluster.getFileSystem();
// add 1 block
byte[] data = StripedFileTestUtil.generateBytes(smallSize);
Path smallFilePath = new Path("/testStripedFile_" + smallSize);
DFSTestUtil.writeFile(fs, smallFilePath, data);
// If we only have 1 block, NN won't enter safemode in the first place
// because the threshold is 0 blocks.
// So we need to add another 2 blocks.
int bigSize = blockSize * DATA_BLK_NUM * 2;
Path bigFilePath = new Path("/testStripedFile_" + bigSize);
data = StripedFileTestUtil.generateBytes(bigSize);
DFSTestUtil.writeFile(fs, bigFilePath, data);
// now we have 3 blocks. NN needs 2 blocks to reach the threshold 0.9 of
// total blocks 3.
// stopping all DNs
List<MiniDFSCluster.DataNodeProperties> dnprops = Lists.newArrayList();
LocatedBlocks lbs = cluster.getNameNodeRpc()
.getBlockLocations(smallFilePath.toString(), 0, smallSize);
DatanodeInfo[] locations = lbs.get(0).getLocations();
for (DatanodeInfo loc : locations) {
// keep the DNs that have smallFile in the head of dnprops
dnprops.add(cluster.stopDataNode(loc.getName()));
}
for (int i = 0; i < numDNs - locations.length; i++) {
dnprops.add(cluster.stopDataNode(0));
}
cluster.restartNameNode(0);
NameNode nn = cluster.getNameNode();
assertTrue(cluster.getNameNode().isInSafeMode());
assertEquals(0, NameNodeAdapter.getSafeModeSafeBlocks(nn));
// the block of smallFile doesn't reach minStorages,
// so the safe blocks count doesn't increment.
for (int i = 0; i < minStorages - 1; i++) {
cluster.restartDataNode(dnprops.remove(0));
cluster.triggerBlockReports();
assertEquals(0, NameNodeAdapter.getSafeModeSafeBlocks(nn));
}
// the block of smallFile reaches minStorages,
// so the safe blocks count increment.
cluster.restartDataNode(dnprops.remove(0));
cluster.triggerBlockReports();
assertEquals(1, NameNodeAdapter.getSafeModeSafeBlocks(nn));
// the 2 blocks of bigFile need DATA_BLK_NUM storages to be safe
for (int i = minStorages; i < DATA_BLK_NUM - 1; i++) {
cluster.restartDataNode(dnprops.remove(0));
cluster.triggerBlockReports();
assertTrue(nn.isInSafeMode());
}
cluster.restartDataNode(dnprops.remove(0));
cluster.triggerBlockReports();
assertFalse(nn.isInSafeMode());
}
}