HDFS-7969. Erasure coding: NameNode support for lease recovery of striped block groups. Contributed by Zhe Zhang.

This commit is contained in:
Zhe Zhang 2015-04-06 12:52:44 -07:00 committed by Zhe Zhang
parent c243319eab
commit 146ce7a978
7 changed files with 163 additions and 55 deletions

View File

@ -31,7 +31,8 @@
* Represents a block that is currently being constructed.<br> * Represents a block that is currently being constructed.<br>
* This is usually the last block of a file opened for write or append. * This is usually the last block of a file opened for write or append.
*/ */
public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
implements BlockInfoUnderConstruction{
/** Block state. See {@link BlockUCState} */ /** Block state. See {@link BlockUCState} */
private BlockUCState blockUCState; private BlockUCState blockUCState;
@ -94,7 +95,7 @@ assert getBlockUCState() != BlockUCState.COMPLETE :
return new BlockInfoContiguous(this); return new BlockInfoContiguous(this);
} }
/** Set expected locations */ @Override
public void setExpectedLocations(DatanodeStorageInfo[] targets) { public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length; int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<>(numLocations); this.replicas = new ArrayList<>(numLocations);
@ -104,10 +105,7 @@ public void setExpectedLocations(DatanodeStorageInfo[] targets) {
} }
} }
/** @Override
* Create array of expected replica locations
* (as has been assigned by chooseTargets()).
*/
public DatanodeStorageInfo[] getExpectedStorageLocations() { public DatanodeStorageInfo[] getExpectedStorageLocations() {
int numLocations = replicas == null ? 0 : replicas.size(); int numLocations = replicas == null ? 0 : replicas.size();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
@ -117,7 +115,7 @@ public DatanodeStorageInfo[] getExpectedStorageLocations() {
return storages; return storages;
} }
/** Get the number of expected locations */ @Override
public int getNumExpectedLocations() { public int getNumExpectedLocations() {
return replicas == null ? 0 : replicas.size(); return replicas == null ? 0 : replicas.size();
} }
@ -135,25 +133,26 @@ void setBlockUCState(BlockUCState s) {
blockUCState = s; blockUCState = s;
} }
/** Get block recovery ID */ @Override
public long getBlockRecoveryId() { public long getBlockRecoveryId() {
return blockRecoveryId; return blockRecoveryId;
} }
/** Get recover block */ @Override
public Block getTruncateBlock() { public Block getTruncateBlock() {
return truncateBlock; return truncateBlock;
} }
@Override
public Block toBlock(){
return this;
}
public void setTruncateBlock(Block recoveryBlock) { public void setTruncateBlock(Block recoveryBlock) {
this.truncateBlock = recoveryBlock; this.truncateBlock = recoveryBlock;
} }
/** @Override
* Process the recorded replicas. When about to commit or finish the
* pipeline recovery sort out bad replicas.
* @param genStamp The final generation stamp for the block.
*/
public void setGenerationStampAndVerifyReplicas(long genStamp) { public void setGenerationStampAndVerifyReplicas(long genStamp) {
// Set the generation stamp for the block. // Set the generation stamp for the block.
setGenerationStamp(genStamp); setGenerationStamp(genStamp);
@ -187,11 +186,7 @@ void commitBlock(Block block) throws IOException {
setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
} }
/** @Override
* Initialize lease recovery for this block.
* Find the first alive data-node starting from the previous primary and
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId) { public void initializeBlockRecovery(long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY); setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId; blockRecoveryId = recoveryId;

View File

@ -31,7 +31,8 @@
* Represents a striped block that is currently being constructed. * Represents a striped block that is currently being constructed.
* This is usually the last block of a file opened for write or append. * This is usually the last block of a file opened for write or append.
*/ */
public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
implements BlockInfoUnderConstruction{
private BlockUCState blockUCState; private BlockUCState blockUCState;
/** /**
@ -39,6 +40,12 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
*/ */
private ReplicaUnderConstruction[] replicas; private ReplicaUnderConstruction[] replicas;
/**
* Index of the primary data node doing the recovery. Useful for log
* messages.
*/
private int primaryNodeIndex = -1;
/** /**
* The new generation stamp, which this block will have * The new generation stamp, which this block will have
* after the recovery succeeds. Also used as a recovery id to identify * after the recovery succeeds. Also used as a recovery id to identify
@ -82,6 +89,7 @@ assert getBlockUCState() != COMPLETE :
} }
/** Set expected locations */ /** Set expected locations */
@Override
public void setExpectedLocations(DatanodeStorageInfo[] targets) { public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length; int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ReplicaUnderConstruction[numLocations]; this.replicas = new ReplicaUnderConstruction[numLocations];
@ -98,6 +106,7 @@ public void setExpectedLocations(DatanodeStorageInfo[] targets) {
* Create array of expected replica locations * Create array of expected replica locations
* (as has been assigned by chooseTargets()). * (as has been assigned by chooseTargets()).
*/ */
@Override
public DatanodeStorageInfo[] getExpectedStorageLocations() { public DatanodeStorageInfo[] getExpectedStorageLocations() {
int numLocations = getNumExpectedLocations(); int numLocations = getNumExpectedLocations();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
@ -117,7 +126,7 @@ public int[] getBlockIndices() {
return indices; return indices;
} }
/** Get the number of expected locations */ @Override
public int getNumExpectedLocations() { public int getNumExpectedLocations() {
return replicas == null ? 0 : replicas.length; return replicas == null ? 0 : replicas.length;
} }
@ -135,16 +144,22 @@ void setBlockUCState(BlockUCState s) {
blockUCState = s; blockUCState = s;
} }
/** Get block recovery ID */ @Override
public long getBlockRecoveryId() { public long getBlockRecoveryId() {
return blockRecoveryId; return blockRecoveryId;
} }
/** @Override
* Process the recorded replicas. When about to commit or finish the public Block getTruncateBlock() {
* pipeline recovery sort out bad replicas. return null;
* @param genStamp The final generation stamp for the block. }
*/
@Override
public Block toBlock(){
return this;
}
@Override
public void setGenerationStampAndVerifyReplicas(long genStamp) { public void setGenerationStampAndVerifyReplicas(long genStamp) {
// Set the generation stamp for the block. // Set the generation stamp for the block.
setGenerationStamp(genStamp); setGenerationStamp(genStamp);
@ -178,18 +193,53 @@ void commitBlock(Block block) throws IOException {
setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
} }
/** @Override
* Initialize lease recovery for this striped block.
*/
public void initializeBlockRecovery(long recoveryId) { public void initializeBlockRecovery(long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY); setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId; blockRecoveryId = recoveryId;
if (replicas == null || replicas.length == 0) { if (replicas == null || replicas.length == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*" + NameNode.blockStateChangeLog.warn("BLOCK*" +
" BlockInfoUnderConstruction.initLeaseRecovery:" + " BlockInfoStripedUnderConstruction.initLeaseRecovery:" +
" No blocks found, lease removed."); " No blocks found, lease removed.");
} }
// TODO we need to implement different recovery logic here boolean allLiveReplicasTriedAsPrimary = true;
for (ReplicaUnderConstruction replica : replicas) {
// Check if all replicas have been tried or not.
if (replica.isAlive()) {
allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
replica.getChosenAsPrimary());
}
}
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
for (ReplicaUnderConstruction replica : replicas) {
replica.setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
ReplicaUnderConstruction primary = null;
primaryNodeIndex = -1;
for(int i = 0; i < replicas.length; i++) {
// Skip alive replicas which have been chosen for recovery.
if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) {
continue;
}
final ReplicaUnderConstruction ruc = replicas[i];
final long lastUpdate = ruc.getExpectedStorageLocation()
.getDatanodeDescriptor().getLastUpdateMonotonic();
if (lastUpdate > mostRecentLastUpdate) {
primaryNodeIndex = i;
primary = ruc;
mostRecentLastUpdate = lastUpdate;
}
}
if (primary != null) {
primary.getExpectedStorageLocation().getDatanodeDescriptor()
.addBlockToBeRecovered(this);
primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.info(
"BLOCK* {} recovery started, primary={}", this, primary);
}
} }
void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block reportedBlock, void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block reportedBlock,
@ -238,7 +288,9 @@ public void appendStringTo(StringBuilder sb) {
} }
private void appendUCParts(StringBuilder sb) { private void appendUCParts(StringBuilder sb) {
sb.append("{UCState=").append(blockUCState).append(", replicas=["); sb.append("{UCState=").append(blockUCState).
append(", primaryNodeIndex=").append(primaryNodeIndex).
append(", replicas=[");
if (replicas != null) { if (replicas != null) {
int i = 0; int i = 0;
for (ReplicaUnderConstruction r : replicas) { for (ReplicaUnderConstruction r : replicas) {

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
public interface BlockInfoUnderConstruction {
/**
* Create array of expected replica locations
* (as has been assigned by chooseTargets()).
*/
public DatanodeStorageInfo[] getExpectedStorageLocations();
/** Get recover block */
public Block getTruncateBlock();
/** Convert to a Block object */
public Block toBlock();
/** Get block recovery ID */
public long getBlockRecoveryId();
/** Get the number of expected locations */
public int getNumExpectedLocations();
/** Set expected locations */
public void setExpectedLocations(DatanodeStorageInfo[] targets);
/**
* Process the recorded replicas. When about to commit or finish the
* pipeline recovery sort out bad replicas.
* @param genStamp The final generation stamp for the block.
*/
public void setGenerationStampAndVerifyReplicas(long genStamp);
/**
* Initialize lease recovery for this block.
* Find the first alive data-node starting from the previous primary and
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId);
}

View File

@ -253,8 +253,8 @@ public CachedBlocksList getPendingUncached() {
private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks = private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
new BlockQueue<>(); new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */ /** A queue of blocks to be recovered by this datanode */
private final BlockQueue<BlockInfoContiguousUnderConstruction> private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
recoverBlocks = new BlockQueue<>(); new BlockQueue<>();
/** A set of blocks to be invalidated by this datanode */ /** A set of blocks to be invalidated by this datanode */
private final LightWeightHashSet<Block> invalidateBlocks = private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>(); new LightWeightHashSet<>();
@ -649,7 +649,7 @@ void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
/** /**
* Store block recovery work. * Store block recovery work.
*/ */
void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) { void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
if(recoverBlocks.contains(block)) { if(recoverBlocks.contains(block)) {
// this prevents adding the same block twice to the recovery queue // this prevents adding the same block twice to the recovery queue
BlockManager.LOG.info(block + " is already in the recovery queue"); BlockManager.LOG.info(block + " is already in the recovery queue");
@ -703,11 +703,11 @@ public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
return erasurecodeBlocks.poll(maxTransfers); return erasurecodeBlocks.poll(maxTransfers);
} }
public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers); List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
if(blocks == null) if(blocks == null)
return null; return null;
return blocks.toArray(new BlockInfoContiguousUnderConstruction[blocks.size()]); return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]);
} }
/** /**

View File

@ -1379,12 +1379,12 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
} }
//check lease recovery //check lease recovery
BlockInfoContiguousUnderConstruction[] blocks = nodeinfo BlockInfoUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE); .getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) { if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand( BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length); blocks.length);
for (BlockInfoContiguousUnderConstruction b : blocks) { for (BlockInfoUnderConstruction b : blocks) {
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default). // Skip stale nodes during recovery - not heart beated for some time (30s by default).
final List<DatanodeStorageInfo> recoveryLocations = final List<DatanodeStorageInfo> recoveryLocations =
@ -1398,10 +1398,10 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
// to old block. // to old block.
boolean truncateRecovery = b.getTruncateBlock() != null; boolean truncateRecovery = b.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery && boolean copyOnTruncateRecovery = truncateRecovery &&
b.getTruncateBlock().getBlockId() != b.getBlockId(); b.getTruncateBlock().getBlockId() != b.toBlock().getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b); new ExtendedBlock(blockPoolId, b.toBlock());
// If we only get 1 replica after eliminating stale nodes, then choose all // If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures. // replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos; DatanodeInfo[] recoveryInfos;
@ -1418,7 +1418,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
} }
if(truncateRecovery) { if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b : Block recoveryBlock = (copyOnTruncateRecovery) ? b.toBlock() :
b.getTruncateBlock(); b.getTruncateBlock();
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock)); recoveryBlock));

View File

@ -208,6 +208,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -3567,18 +3568,19 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
throw new AlreadyBeingCreatedException(message); throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION: case UNDER_CONSTRUCTION:
case UNDER_RECOVERY: case UNDER_RECOVERY:
// TODO support Striped block's recovery // TODO support truncate of striped blocks
final BlockInfoContiguousUnderConstruction uc = final BlockInfoUnderConstruction uc =
(BlockInfoContiguousUnderConstruction)lastBlock; (BlockInfoUnderConstruction)lastBlock;
// determine if last block was intended to be truncated // determine if last block was intended to be truncated
Block recoveryBlock = uc.getTruncateBlock(); Block recoveryBlock = uc.getTruncateBlock();
boolean truncateRecovery = recoveryBlock != null; boolean truncateRecovery = recoveryBlock != null;
boolean copyOnTruncate = truncateRecovery && boolean copyOnTruncate = truncateRecovery &&
recoveryBlock.getBlockId() != uc.getBlockId(); recoveryBlock.getBlockId() != uc.toBlock().getBlockId();
assert !copyOnTruncate || assert !copyOnTruncate ||
recoveryBlock.getBlockId() < uc.getBlockId() && recoveryBlock.getBlockId() < uc.toBlock().getBlockId() &&
recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() && recoveryBlock.getGenerationStamp() < uc.toBlock().
recoveryBlock.getNumBytes() > uc.getNumBytes() : getGenerationStamp() &&
recoveryBlock.getNumBytes() > uc.toBlock().getNumBytes() :
"wrong recoveryBlock"; "wrong recoveryBlock";
// setup the last block locations from the blockManager if not known // setup the last block locations from the blockManager if not known
@ -3586,7 +3588,8 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
uc.setExpectedLocations(blockManager.getStorages(lastBlock)); uc.setExpectedLocations(blockManager.getStorages(lastBlock));
} }
if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) { if (uc.getNumExpectedLocations() == 0 &&
uc.toBlock().getNumBytes() == 0) {
// There is no datanode reported to this block. // There is no datanode reported to this block.
// may be client have crashed before writing data to pipeline. // may be client have crashed before writing data to pipeline.
// This blocks doesn't need any recovery. // This blocks doesn't need any recovery.
@ -3599,10 +3602,11 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
return true; return true;
} }
// start recovery of the last block for this file // start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc)); long blockRecoveryId =
nextGenerationStamp(blockIdManager.isLegacyBlock(uc.toBlock()));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
if(copyOnTruncate) { if(copyOnTruncate) {
uc.setGenerationStamp(blockRecoveryId); uc.toBlock().setGenerationStamp(blockRecoveryId);
} else if(truncateRecovery) { } else if(truncateRecovery) {
recoveryBlock.setGenerationStamp(blockRecoveryId); recoveryBlock.setGenerationStamp(blockRecoveryId);
} }

View File

@ -51,7 +51,7 @@ public void testInitializeBlockRecovery() throws Exception {
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000); DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000); DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
blockInfo.initializeBlockRecovery(1); blockInfo.initializeBlockRecovery(1);
BlockInfoContiguousUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo); assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #2. // Recovery attempt #2.