HDFS-12349. Improve log message when it could not alloc enough blocks for EC. (lei)

This commit is contained in:
Lei Xu 2017-09-07 18:01:37 -07:00
parent 3b3be355b3
commit 3e6d0ca2b2
5 changed files with 54 additions and 36 deletions

View File

@ -260,6 +260,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
private final Coordinator coordinator;
private final CellBuffers cellBuffers;
private final ErasureCodingPolicy ecPolicy;
private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers;
private final DFSPacket[] currentPackets; // current Packet of each streamer
@ -286,7 +287,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
ecPolicy = stat.getErasureCodingPolicy();
final int numParityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
@ -478,11 +479,6 @@ public class DFSStripedOutputStream extends DFSOutputStream
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
assert lb.isStriped();
if (lb.getLocations().length < numDataBlocks) {
throw new IOException("Failed to get " + numDataBlocks
+ " nodes from namenode: blockGroupSize= " + numAllBlocks
+ ", blocks.length= " + lb.getLocations().length);
}
// assign the new block to the current block group
currentBlockGroup = lb.getBlock();
blockGroupIndex++;
@ -494,11 +490,16 @@ public class DFSStripedOutputStream extends DFSOutputStream
StripedDataStreamer si = getStripedDataStreamer(i);
assert si.isHealthy();
if (blocks[i] == null) {
// allocBlock() should guarantee that all data blocks are successfully
// allocated.
assert i >= numDataBlocks;
// Set exception and close streamer as there is no block locations
// found for the parity block.
LOG.warn("Failed to get block location for parity block, index=" + i);
LOG.warn("Cannot allocate parity block(index={}, policy={}). " +
"Not enough datanodes? Excluded nodes={}", i, ecPolicy.getName(),
excludedNodes);
si.getLastException().set(
new IOException("Failed to get following block, i=" + i));
new IOException("Failed to get parity block, index=" + i));
si.getErrorState().setInternalError();
si.close(true);
} else {

View File

@ -2057,6 +2057,7 @@ public class BlockManager implements BlockStatsMXBean {
final List<String> favoredNodes,
final byte storagePolicyID,
final BlockType blockType,
final ErasureCodingPolicy ecPolicy,
final EnumSet<AddBlockFlag> flags) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
@ -2067,14 +2068,23 @@ public class BlockManager implements BlockStatsMXBean {
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors, storagePolicy, flags);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
+ minReplication + "). There are "
+ getDatanodeManager().getNetworkTopology().getNumOfLeaves()
+ " datanode(s) running and "
+ (excludedNodes == null? "no": excludedNodes.size())
+ " node(s) are excluded in this operation.");
final String ERROR_MESSAGE = "File %s could only be written to %d of " +
"the %d %s. There are %d datanode(s) running and %s "
+ "node(s) are excluded in this operation.";
if (blockType == BlockType.CONTIGUOUS && targets.length < minReplication) {
throw new IOException(String.format(ERROR_MESSAGE, src,
targets.length, minReplication, "minReplication", minReplication,
getDatanodeManager().getNetworkTopology().getNumOfLeaves(),
(excludedNodes == null? "no": excludedNodes.size())));
} else if (blockType == BlockType.STRIPED &&
targets.length < ecPolicy.getNumDataUnits()) {
throw new IOException(
String.format(ERROR_MESSAGE, src, targets.length,
ecPolicy.getNumDataUnits(),
String.format("required nodes for %s", ecPolicy.getName()),
getDatanodeManager().getNetworkTopology().getNumOfLeaves(),
(excludedNodes == null ? "no" : excludedNodes.size())));
}
return targets;
}

View File

@ -201,7 +201,7 @@ class FSDirWriteFileOp {
}
storagePolicyID = pendingFile.getStoragePolicyID();
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
clientMachine, blockType);
clientMachine, blockType, ecPolicy);
}
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
@ -286,7 +286,7 @@ class FSDirWriteFileOp {
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID,
r.blockType, flags);
r.blockType, r.ecPolicy, flags);
}
/**
@ -836,15 +836,23 @@ class FSDirWriteFileOp {
final byte storagePolicyID;
final String clientMachine;
final BlockType blockType;
final ErasureCodingPolicy ecPolicy;
ValidateAddBlockResult(
long blockSize, int numTargets, byte storagePolicyID,
String clientMachine, BlockType blockType) {
String clientMachine, BlockType blockType,
ErasureCodingPolicy ecPolicy) {
this.blockSize = blockSize;
this.numTargets = numTargets;
this.storagePolicyID = storagePolicyID;
this.clientMachine = clientMachine;
this.blockType = blockType;
this.ecPolicy = ecPolicy;
if (blockType == BlockType.STRIPED) {
Preconditions.checkArgument(ecPolicy != null,
"ecPolicy is not specified for striped block");
}
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
@ -282,7 +283,7 @@ public class TestDFSStripedOutputStreamWithFailure {
@Test(timeout = 90000)
public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
throws IOException {
throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
try {
@ -301,20 +302,18 @@ public class TestDFSStripedOutputStreamWithFailure {
DatanodeReportType.LIVE);
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
final Path dirFile = new Path(dir, "ecfile");
FSDataOutputStream out;
try {
out = dfs.create(dirFile, true);
LambdaTestUtils.intercept(
IOException.class,
"File " + dirFile + " could only be written to " +
numDatanodes + " of the " + dataBlocks + " required nodes for " +
getEcPolicy().getName(),
() -> {
try (FSDataOutputStream out = dfs.create(dirFile, true)) {
out.write("something".getBytes());
out.flush();
out.close();
Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException ioe) {
// expected
GenericTestUtils.assertExceptionContains("Failed to get " +
dataBlocks + " nodes from namenode: blockGroupSize= " +
(dataBlocks + parityBlocks) + ", blocks.length= " +
numDatanodes, ioe);
}
return 0;
});
} finally {
tearDown();
}

View File

@ -175,8 +175,8 @@ public class TestDeadDatanode {
// choose the targets, but local node should not get selected as this is not
// part of the cluster anymore
DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3,
clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7,
BlockType.CONTIGUOUS, null);
clientNode, new HashSet<>(), 256 * 1024 * 1024L, null, (byte) 7,
BlockType.CONTIGUOUS, null, null);
for (DatanodeStorageInfo datanodeStorageInfo : results) {
assertFalse("Dead node should not be choosen", datanodeStorageInfo
.getDatanodeDescriptor().equals(clientNode));