HDFS-9173. Erasure Coding: Lease recovery for striped file. Contributed by Walter Su and Jing Zhao.

Change-Id: I51703a61c9d8454f883028f3f6acb5729fde1b15
This commit is contained in:
Zhe Zhang 2015-12-18 15:57:48 -08:00
parent 85c2466048
commit 61ab0440f7
13 changed files with 634 additions and 68 deletions

View File

@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
@ -970,7 +971,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
private void enqueueAllCurrentPackets() throws IOException {
@VisibleForTesting
void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = setCurrentStreamer(i);

View File

@ -873,6 +873,9 @@ Trunk (Unreleased)
HDFS-9373. Erasure coding: friendly log information for write operations
with some failed streamers. (Li Bo via zhz)
HDFS-9173. Erasure Coding: Lease recovery for striped file. (Walter Su and
Jing Zhao via zhz)
HDFS-9451. Clean up depreated umasks and related unit tests.
(Wei-Chiu Chuang via wheat9)

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
@ -359,6 +360,12 @@ public class PBHelper {
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
builder.setTruncateBlock(PBHelperClient.convert(b.getNewBlock()));
if (b instanceof RecoveringStripedBlock) {
RecoveringStripedBlock sb = (RecoveringStripedBlock) b;
builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
sb.getErasureCodingPolicy()));
builder.addAllBlockIndices(asList(sb.getBlockIndices()));
}
return builder.build();
}
@ -372,6 +379,16 @@ public class PBHelper {
rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
b.getNewGenStamp());
}
if (b.hasEcPolicy()) {
List<Integer> BlockIndicesList = b.getBlockIndicesList();
int[] indices = new int[BlockIndicesList.size()];
for (int i = 0; i < BlockIndicesList.size(); i++) {
indices[i] = BlockIndicesList.get(i).shortValue();
}
rBlock = new RecoveringStripedBlock(rBlock, indices,
PBHelperClient.convertErasureCodingPolicy(b.getEcPolicy()));
}
return rBlock;
}
@ -823,12 +840,20 @@ public class PBHelper {
build();
}
private static List<Integer> convertIntArray(short[] liveBlockIndices) {
List<Integer> liveBlockIndicesList = new ArrayList<>();
for (short s : liveBlockIndices) {
liveBlockIndicesList.add((int) s);
private static List<Integer> asList(int[] arr) {
List<Integer> list = new ArrayList<>(arr.length);
for (int s : arr) {
list.add(s);
}
return liveBlockIndicesList;
return list;
}
private static List<Integer> asList(short[] arr) {
List<Integer> list = new ArrayList<>(arr.length);
for (int s : arr) {
list.add(s);
}
return list;
}
private static StorageTypesProto convertStorageTypesProto(
@ -925,7 +950,7 @@ public class PBHelper {
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
builder.addAllLiveBlockIndices(asList(liveBlockIndices));
builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
blockEcRecoveryInfo.getErasureCodingPolicy()));

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@ -504,6 +505,7 @@ public class DatanodeManager {
public DatanodeStorageInfo[] getDatanodeStorageInfos(
DatanodeID[] datanodeID, String[] storageIDs,
String format, Object... args) throws UnregisteredNodeException {
storageIDs = storageIDs == null ? new String[0] : storageIDs;
if (datanodeID.length != storageIDs.length) {
final String err = (storageIDs.length == 0?
"Missing storageIDs: It is likely that the HDFS client,"
@ -524,9 +526,11 @@ public class DatanodeManager {
continue;
}
final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
storages[i] = dd.getStorageInfo(storageIDs[i]);
if (dd != null) {
storages[i] = dd.getStorageInfo(storageIDs[i]);
}
}
return storages;
return storages;
}
/** Prints information about all datanodes. */
@ -1366,6 +1370,10 @@ public class DatanodeManager {
} else {
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
if (b.isStriped()) {
rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
((BlockInfoStriped) b).getErasureCodingPolicy());
}
}
brCommand.add(rBlock);
}

View File

@ -17,16 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.ipc.RemoteException;
@ -37,7 +42,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
/**
* This class handles the block recovery work commands.
@ -78,6 +88,10 @@ public class BlockRecoveryWorker {
newLength);
}
public ReplicaRecoveryInfo getReplicaRecoveryInfo(){
return rInfo;
}
@Override
public String toString() {
return "block:" + rInfo + " node:" + id;
@ -294,12 +308,8 @@ public class BlockRecoveryWorker {
// we never know the actual state of the replica on failed data-nodes.
// The recovery should be started over.
if (!failedList.isEmpty()) {
StringBuilder b = new StringBuilder();
for(DatanodeID id : failedList) {
b.append("\n " + id);
}
throw new IOException("Cannot recover " + block + ", the following "
+ failedList.size() + " data-nodes failed {" + b + "\n}");
throw new IOException("Cannot recover " + block
+ ", the following datanodes failed: " + failedList);
}
// Notify the name-node about successfully recovered replicas.
@ -323,6 +333,215 @@ public class BlockRecoveryWorker {
}
}
/**
* blk_0 blk_1 blk_2 blk_3 blk_4 blk_5 blk_6 blk_7 blk_8
* 64k 64k 64k 64k 64k 64k 64k 64k 64k <-- stripe_0
* 64k 64k 64k 64k 64k 64k 64k 64k 64k
* 64k 64k 64k 64k 64k 64k 64k 61k <-- startStripeIdx
* 64k 64k 64k 64k 64k 64k 64k
* 64k 64k 64k 64k 64k 64k 59k
* 64k 64k 64k 64k 64k 64k
* 64k 64k 64k 64k 64k 64k <-- last full stripe
* 64k 64k 13k 64k 55k 3k <-- target last stripe
* 64k 64k 64k 1k
* 64k 64k 58k
* 64k 64k
* 64k 19k
* 64k <-- total visible stripe
*
* Due to different speed of streamers, the internal blocks in a block group
* could have different lengths when the block group isn't ended normally.
* The purpose of this class is to recover the UnderConstruction block group,
* so all internal blocks end at the same stripe.
*
* The steps:
* 1. get all blocks lengths from DataNodes.
* 2. calculate safe length, which is at the target last stripe.
* 3. decode and feed blk_6~8, make them end at last full stripe. (the last
* full stripe means the last decodable stripe.)
* 4. encode the target last stripe, with the remaining sequential data. In
* this case, the sequential data is 64k+64k+13k. Feed blk_6~8 the parity cells.
* Overwrite the parity cell if have to.
* 5. truncate the stripes from visible stripe, to target last stripe.
* TODO: implement step 3,4
*/
public class RecoveryTaskStriped {
private final RecoveringBlock rBlock;
private final ExtendedBlock block;
private final String bpid;
private final DatanodeInfo[] locs;
private final long recoveryId;
private final int[] blockIndices;
private final ErasureCodingPolicy ecPolicy;
RecoveryTaskStriped(RecoveringStripedBlock rBlock) {
this.rBlock = rBlock;
// TODO: support truncate
Preconditions.checkArgument(rBlock.getNewBlock() == null);
block = rBlock.getBlock();
bpid = block.getBlockPoolId();
locs = rBlock.getLocations();
recoveryId = rBlock.getNewGenerationStamp();
blockIndices = rBlock.getBlockIndices();
ecPolicy = rBlock.getErasureCodingPolicy();
}
protected void recover() throws IOException {
checkLocations(locs.length);
Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
final int dataBlkNum = ecPolicy.getNumDataUnits();
final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
//check generation stamps
for (int i = 0; i < locs.length; i++) {
DatanodeID id = locs[i];
try {
DatanodeID bpReg = new DatanodeID(
datanode.getBPOfferService(bpid).bpRegistration);
InterDatanodeProtocol proxyDN = bpReg.equals(id) ?
datanode : DataNode.createInterDataNodeProtocolProxy(id, conf,
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ExtendedBlock internalBlk = new ExtendedBlock(block);
final long blockId = block.getBlockId() + blockIndices[i];
internalBlk.setBlockId(blockId);
ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN,
new RecoveringBlock(internalBlk, null, recoveryId));
if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() &&
info.getNumBytes() > 0) {
final BlockRecord existing = syncBlocks.get(blockId);
if (existing == null ||
info.getNumBytes() > existing.rInfo.getNumBytes()) {
// if we have >1 replicas for the same internal block, we
// simply choose the one with larger length.
// TODO: better usage of redundant replicas
syncBlocks.put(blockId, new BlockRecord(id, proxyDN, info));
}
}
} catch (RecoveryInProgressException ripE) {
InterDatanodeProtocol.LOG.warn(
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
return;
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn(
"Failed to obtain replica info for block (=" + block
+ ") from datanode (=" + id + ")", e);
}
}
checkLocations(syncBlocks.size());
final long safeLength = getSafeLength(syncBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering block " + block
+ ", length=" + block.getNumBytes() + ", safeLength=" + safeLength
+ ", syncList=" + syncBlocks);
}
// If some internal blocks reach the safe length, convert them to RUR
List<BlockRecord> rurList = new ArrayList<>(locs.length);
for (BlockRecord r : syncBlocks.values()) {
int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK);
long newSize = getInternalBlockLength(safeLength, ecPolicy.getCellSize(),
dataBlkNum, blockIndex);
if (r.rInfo.getNumBytes() >= newSize) {
rurList.add(r);
}
}
assert rurList.size() >= dataBlkNum : "incorrect safe length";
// Recovery the striped block by truncating internal blocks to the safe
// length. Abort if there is any failure in this step.
truncatePartialBlock(rurList, safeLength);
// notify Namenode the new size and locations
final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
final String[] newStorages = new String[totalBlkNum];
for (int i = 0; i < totalBlkNum; i++) {
newLocs[blockIndices[i]] = DatanodeID.EMPTY_DATANODE_ID;
newStorages[blockIndices[i]] = "";
}
for (BlockRecord r : rurList) {
int index = (int) (r.rInfo.getBlockId() &
HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
newLocs[index] = r.id;
newStorages[index] = r.storageID;
}
ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
safeLength, recoveryId);
DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
newBlock.getNumBytes(), true, false, newLocs, newStorages);
}
private void truncatePartialBlock(List<BlockRecord> rurList,
long safeLength) throws IOException {
int cellSize = ecPolicy.getCellSize();
int dataBlkNum = ecPolicy.getNumDataUnits();
List<DatanodeID> failedList = new ArrayList<>();
for (BlockRecord r : rurList) {
int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK);
long newSize = getInternalBlockLength(safeLength, cellSize, dataBlkNum,
blockIndex);
try {
r.updateReplicaUnderRecovery(bpid, recoveryId, r.rInfo.getBlockId(),
newSize);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ ", datanode=" + r.id + ")", e);
failedList.add(r.id);
}
}
// If any of the data-nodes failed, the recovery fails, because
// we never know the actual state of the replica on failed data-nodes.
// The recovery should be started over.
if (!failedList.isEmpty()) {
throw new IOException("Cannot recover " + block
+ ", the following datanodes failed: " + failedList);
}
}
/**
* TODO: the current implementation depends on the assumption that the
* parity cells are only generated based on the full stripe. This is not
* true after we support hflush.
*/
@VisibleForTesting
long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
final int cellSize = ecPolicy.getCellSize();
final int dataBlkNum = ecPolicy.getNumDataUnits();
Preconditions.checkArgument(syncBlocks.size() >= dataBlkNum);
final int stripeSize = dataBlkNum * cellSize;
long[] blockLengths = new long[syncBlocks.size()];
int i = 0;
for (BlockRecord r : syncBlocks.values()) {
ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo();
blockLengths[i++] = rInfo.getNumBytes();
}
Arrays.sort(blockLengths);
// full stripe is a stripe has at least dataBlkNum full cells.
// lastFullStripeIdx is the index of the last full stripe.
int lastFullStripeIdx =
(int) (blockLengths[blockLengths.length - dataBlkNum] / cellSize);
return lastFullStripeIdx * stripeSize; // return the safeLength
// TODO: Include lastFullStripeIdx+1 stripe in safeLength, if there exists
// such a stripe (and it must be partial).
}
private void checkLocations(int locationCount)
throws IOException {
if (locationCount < ecPolicy.getNumDataUnits()) {
throw new IOException(block + " has no enough internal blocks" +
", unable to start recovery. Locations=" + Arrays.asList(locs));
}
}
}
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
@ -379,8 +598,11 @@ public class BlockRecoveryWorker {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
RecoveryTaskContiguous task = new RecoveryTaskContiguous(b);
task.recover();
if (b.isStriped()) {
new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
} else {
new RecoveryTaskContiguous(b).recover();
}
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
}

View File

@ -197,7 +197,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
@ -3285,57 +3284,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
storedBlock.setNumBytes(newlength);
}
// find the DatanodeDescriptor objects
ArrayList<DatanodeDescriptor> trimmedTargets =
new ArrayList<DatanodeDescriptor>(newtargets.length);
ArrayList<String> trimmedStorages =
new ArrayList<String>(newtargets.length);
if (newtargets.length > 0) {
for (int i = 0; i < newtargets.length; ++i) {
// try to get targetNode
DatanodeDescriptor targetNode =
blockManager.getDatanodeManager().getDatanode(newtargets[i]);
if (targetNode != null) {
trimmedTargets.add(targetNode);
trimmedStorages.add(newtargetstorages[i]);
} else if (LOG.isDebugEnabled()) {
LOG.debug("DatanodeDescriptor (=" + newtargets[i] + ") not found");
}
}
}
if ((closeFile) && !trimmedTargets.isEmpty()) {
// Find the target DatanodeStorageInfos. If not found because of invalid
// or empty DatanodeID/StorageID, the slot of same offset in dsInfos is
// null
final DatanodeStorageInfo[] dsInfos = blockManager.getDatanodeManager().
getDatanodeStorageInfos(newtargets, newtargetstorages,
"src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
src, oldBlock, newgenerationstamp, newlength);
if (closeFile && dsInfos != null) {
// the file is getting closed. Insert block locations into blockManager.
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < trimmedTargets.size(); i++) {
DatanodeStorageInfo storageInfo =
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
for (int i = 0; i < dsInfos.length; i++) {
if (dsInfos[i] != null) {
if(copyTruncate) {
storageInfo.addBlock(truncatedBlock, truncatedBlock);
dsInfos[i].addBlock(truncatedBlock, truncatedBlock);
} else {
storageInfo.addBlock(storedBlock, storedBlock);
Block bi = new Block(storedBlock);
if (storedBlock.isStriped()) {
bi.setBlockId(bi.getBlockId() + i);
}
dsInfos[i].addBlock(storedBlock, bi);
}
}
}
}
// add pipeline locations into the INodeUnderConstruction
DatanodeStorageInfo[] trimmedStorageInfos =
blockManager.getDatanodeManager().getDatanodeStorageInfos(
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
trimmedStorages.toArray(new String[trimmedStorages.size()]),
"src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
src, oldBlock, newgenerationstamp, newlength);
if(copyTruncate) {
iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
iFile.convertLastBlockToUC(truncatedBlock, dsInfos);
} else {
iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
iFile.convertLastBlockToUC(storedBlock, dsInfos);
if (closeFile) {
blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
storedBlock, oldGenerationStamp, oldNumBytes,
trimmedStorageInfos);
dsInfos);
}
}
}
@ -3343,7 +3327,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (closeFile) {
if(copyTruncate) {
closeFileCommitBlocks(src, iFile, truncatedBlock);
if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) {
if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
blockManager.removeBlock(storedBlock);
}
} else {

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -76,6 +77,13 @@ public class BlockRecoveryCommand extends DatanodeCommand {
this.recoveryBlock = recoveryBlock;
}
public RecoveringBlock(RecoveringBlock rBlock) {
super(rBlock.getBlock(), rBlock.getLocations(), rBlock.getStorageIDs(),
rBlock.getStorageTypes());
this.newGenerationStamp = rBlock.newGenerationStamp;
this.recoveryBlock = rBlock.recoveryBlock;
}
/**
* Return the new generation stamp of the block,
* which also plays role of the recovery id.
@ -92,6 +100,31 @@ public class BlockRecoveryCommand extends DatanodeCommand {
}
}
public static class RecoveringStripedBlock extends RecoveringBlock {
private final int[] blockIndices;
private final ErasureCodingPolicy ecPolicy;
public RecoveringStripedBlock(RecoveringBlock rBlock, int[] blockIndices,
ErasureCodingPolicy ecPolicy) {
super(rBlock);
this.blockIndices = blockIndices;
this.ecPolicy = ecPolicy;
}
public int[] getBlockIndices() {
return blockIndices;
}
public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy;
}
@Override
public boolean isStriped() {
return true;
}
}
/**
* Create empty BlockRecoveryCommand.
*/

View File

@ -120,6 +120,10 @@ message RecoveringBlockProto {
required uint64 newGenStamp = 1; // New genstamp post recovery
required LocatedBlockProto block = 2; // Block to be recovered
optional BlockProto truncateBlock = 3; // New block for recovery (truncate)
optional ErasureCodingPolicyProto ecPolicy = 4;
// block indices of striped internal blocks for each storage in LocatedBlock
repeated uint32 blockIndices = 5;
}
/**
@ -195,4 +199,4 @@ message NamenodeRegistrationProto {
}
required StorageInfoProto storageInfo = 3; // Node information
optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role
}
}

View File

@ -59,13 +59,11 @@ public class StripedFileTestUtil {
public static final short NUM_DATA_BLOCKS = (short) 6;
public static final short NUM_PARITY_BLOCKS = (short) 3;
public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
public static final int stripesPerBlock = 4;
public static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
public static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
public static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
static int stripesPerBlock = 4;
public static int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
static int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
static int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];

View File

@ -0,0 +1,257 @@
package org.apache.hadoop.hdfs;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class TestLeaseRecoveryStriped {
public static final Log LOG = LogFactory.getLog(TestLeaseRecoveryStriped.class);
private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private static final int STRIPE_SIZE = NUM_DATA_BLOCKS * CELL_SIZE;
private static final int STRIPES_PER_BLOCK = 15;
private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
private static final int bytesPerChecksum = 512;
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
StripedFileTestUtil.stripesPerBlock = STRIPES_PER_BLOCK;
StripedFileTestUtil.blockSize = BLOCK_SIZE;
StripedFileTestUtil.BLOCK_GROUP_SIZE = BLOCK_GROUP_SIZE;
}
static private final String fakeUsername = "fakeUser1";
static private final String fakeGroup = "supergroup";
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private Configuration conf;
private final Path dir = new Path("/" + this.getClass().getSimpleName());
final Path p = new Path(dir, "testfile");
@Before
public void setup() throws IOException {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
dfs.setErasureCodingPolicy(dir, null);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
public static final int[][][] BLOCK_LENGTHS_SUITE = {
{{ 11 * CELL_SIZE,10 * CELL_SIZE, 9 * CELL_SIZE,
8 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
5 * CELL_SIZE, 4 * CELL_SIZE, 3 * CELL_SIZE},
{36 * CELL_SIZE}},
{{ 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE,
6 * CELL_SIZE, 7 * CELL_SIZE, 8 * CELL_SIZE,
9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
{36 * CELL_SIZE}},
{{ 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
5 * CELL_SIZE, 4 * CELL_SIZE, 2 * CELL_SIZE,
9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
{36 * CELL_SIZE}},
{{ 8 * CELL_SIZE + bytesPerChecksum,
7 * CELL_SIZE + bytesPerChecksum * 2,
6 * CELL_SIZE + bytesPerChecksum * 2,
5 * CELL_SIZE - bytesPerChecksum * 3,
4 * CELL_SIZE - bytesPerChecksum * 4,
3 * CELL_SIZE - bytesPerChecksum * 4,
9 * CELL_SIZE, 10 * CELL_SIZE, 11 * CELL_SIZE},
{36 * CELL_SIZE}},
};
@Test
public void testLeaseRecovery() throws Exception {
for(int i=0; i < BLOCK_LENGTHS_SUITE.length; i++){
int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
try {
runTest(blockLengths, safeLength);
} catch (Throwable e){
String msg = "failed testCase at i=" + i + ", blockLengths="
+ Arrays.toString(blockLengths) + "\n"
+ StringUtils.stringifyException(e);
Assert.fail(msg);
}
}
}
private void runTest(int[] blockLengths, int safeLength) throws Exception {
writePartialBlocks(blockLengths);
recoverLease();
List<Long> oldGS = new ArrayList<>();
oldGS.add(1001L);
StripedFileTestUtil.checkData(dfs, p, safeLength,
new ArrayList<DatanodeInfo>(), oldGS);
// After recovery, storages are reported by primary DN. we should verify
// storages reported by blockReport.
cluster.restartNameNode(true);
StripedFileTestUtil.checkData(dfs, p, safeLength,
new ArrayList<DatanodeInfo>(), oldGS);
}
private void writePartialBlocks(int[] blockLengths) throws Exception {
final FSDataOutputStream out = dfs.create(p);
final DFSStripedOutputStream stripedOut
= (DFSStripedOutputStream) out.getWrappedStream();
int length = (STRIPES_PER_BLOCK - 1) * STRIPE_SIZE;
int[] posToKill = getPosToKill(blockLengths);
int checkingPos = nextCheckingPos(posToKill, 0);
try {
for (int pos = 0; pos < length; pos++) {
out.write(StripedFileTestUtil.getByte(pos));
if (pos == checkingPos) {
for (int index : getIndexToStop(posToKill, pos)) {
out.flush();
stripedOut.enqueueAllCurrentPackets();
StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
waitStreamerAllAcked(s);
waitByteSent(s, blockLengths[index]);
stopBlockStream(s);
}
checkingPos = nextCheckingPos(posToKill, pos);
}
}
} finally {
DFSTestUtil.abortStream(stripedOut);
}
}
private int nextCheckingPos(int[] posToKill, int curPos) {
int checkingPos = Integer.MAX_VALUE;
for (int i = 0; i < posToKill.length; i++) {
if (posToKill[i] > curPos) {
checkingPos = Math.min(checkingPos, posToKill[i]);
}
}
return checkingPos;
}
private int[] getPosToKill(int[] blockLengths) {
int[] posToKill = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS];
for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
int numStripe = (blockLengths[i] - 1) / CELL_SIZE;
posToKill[i] = numStripe * STRIPE_SIZE
+ i * CELL_SIZE + blockLengths[i] % CELL_SIZE;
if (blockLengths[i] % CELL_SIZE == 0) {
posToKill[i] += CELL_SIZE;
}
}
for (int i = NUM_DATA_BLOCKS; i < NUM_DATA_BLOCKS+NUM_PARITY_BLOCKS; i++) {
Preconditions.checkArgument(blockLengths[i] % CELL_SIZE == 0);
int numStripe = (blockLengths[i]) / CELL_SIZE;
posToKill[i] = numStripe * STRIPE_SIZE;
}
return posToKill;
}
private List<Integer> getIndexToStop(int[] posToKill, int pos){
List<Integer> indices=new LinkedList<>();
for(int i=0;i<posToKill.length;i++){
if(pos==posToKill[i]){
indices.add(i);
}
}
return indices;
}
private void waitByteSent(final StripedDataStreamer s, final long byteSent)
throws Exception {
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return s.bytesSent >= byteSent;
}
}, 100, 3000);
} catch (TimeoutException e) {
throw new IOException("Timeout waiting for streamer " + s +". Sent="
+ s.bytesSent + ", expected="+byteSent);
}
}
private void stopBlockStream(StripedDataStreamer s) throws Exception {
IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
Whitebox.setInternalState(s, "blockStream",
new DataOutputStream(nullOutputStream));
}
private void recoverLease() throws Exception {
final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(conf);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return dfs2.recoverLease(p);
} catch (IOException e) {
return false;
}
}
}, 5000, 24000);
} catch (TimeoutException e) {
throw new IOException("Timeout waiting for recoverLease()");
}
}
private FileSystem getFSAsAnotherUser(final Configuration c)
throws IOException, InterruptedException {
return FileSystem.get(FileSystem.getDefaultUri(c), c,
UserGroupInformation.createUserForTesting(fakeUsername,
new String[]{fakeGroup}).getUserName());
}
public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
long toWaitFor = s.getLastQueuedSeqno();
s.waitForAckedSeqno(toWaitFor);
}
}

View File

@ -39,7 +39,9 @@ import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -61,6 +63,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@ -68,8 +71,10 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -93,6 +98,8 @@ import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
import static org.apache.hadoop.hdfs.TestLeaseRecoveryStriped.BLOCK_LENGTHS_SUITE;
/**
* This tests if sync all replicas in block recovery works correctly
*/
@ -243,8 +250,7 @@ public class TestBlockRecovery {
DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
RecoveringBlock rBlock = new RecoveringBlock(block,
locs, RECOVERY_ID);
RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
BlockRecord record1 = new BlockRecord(
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1);
@ -745,4 +751,28 @@ public class TestBlockRecovery {
assertTrue(exceptionThrown);
}
}
@Test
public void testSafeLength() throws Exception {
ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager
.getSystemDefaultPolicy();
RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock,
new int[9], ecPolicy);
BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn);
BlockRecoveryWorker.RecoveryTaskStriped recoveryTask =
recoveryWorker.new RecoveryTaskStriped(rBlockStriped);
for (int i = 0; i < BLOCK_LENGTHS_SUITE.length; i++) {
int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
Map<Long, BlockRecord> syncList = new HashMap<>();
for (int id = 0; id < blockLengths.length; id++) {
ReplicaRecoveryInfo rInfo = new ReplicaRecoveryInfo(id,
blockLengths[id], 0, null);
syncList.put((long) id, new BlockRecord(null, null, rInfo));
}
Assert.assertEquals("BLOCK_LENGTHS_SUITE[" + i + "]", safeLength,
recoveryTask.getSafeLength(syncList));
}
}
}

View File

@ -199,14 +199,15 @@ public class TestCommitBlockSynchronization {
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[]{
new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)};
String[] storageIDs = new String[]{"fake-storage-ID"};
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true,
false, newTargets, null);
false, newTargets, storageIDs);
// Repeat the call to make sure it returns true
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true, false, newTargets, null);
lastBlock, genStamp, length, true, false, newTargets, storageIDs);
}
}

View File

@ -46,7 +46,6 @@ import java.util.List;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -183,7 +182,7 @@ public class TestRecoverStripedBlocks {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, StripedFileTestUtil.blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2)
.build();
try {
@ -191,7 +190,7 @@ public class TestRecoverStripedBlocks {
DistributedFileSystem fs = cluster.getFileSystem();
BlockManager bm = cluster.getNamesystem().getBlockManager();
fs.getClient().setErasureCodingPolicy("/", null);
int fileLen = NUM_DATA_BLOCKS * blockSize;
int fileLen = NUM_DATA_BLOCKS * StripedFileTestUtil.blockSize;
Path p = new Path("/test2RecoveryTasksForSameBlockGroup");
final byte[] data = new byte[fileLen];
DFSTestUtil.writeFile(fs, p, data);