HDFS-8928. Improvements for BlockUnderConstructionFeature: ReplicaUnderConstruction as a separate class and replicas as an array. Contributed by Jing Zhao.

(cherry picked from commit bdd79388f3)
This commit is contained in:
Jing Zhao 2015-08-24 15:53:34 -07:00
parent 7a0a31586a
commit eefc1c174b
6 changed files with 195 additions and 152 deletions

View File

@ -484,6 +484,10 @@ Release 2.8.0 - UNRELEASED
HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9) HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9)
HDFS-8928. Improvements for BlockUnderConstructionFeature:
ReplicaUnderConstruction as a separate class and replicas as an array.
(jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -24,7 +24,6 @@ import java.util.List;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature.ReplicaUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.LightWeightGSet;
@ -366,7 +365,7 @@ public abstract class BlockInfo extends Block
} else { } else {
// the block is already under construction // the block is already under construction
uc.setBlockUCState(s); uc.setBlockUCState(s);
uc.setExpectedLocations(this.getGenerationStamp(), targets); uc.setExpectedLocations(this, targets);
} }
} }

View File

@ -17,28 +17,27 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
/** /**
* Represents a block that is currently being constructed.<br> * Represents the under construction feature of a Block.
* This is usually the last block of a file opened for write or append. * This is usually the last block of a file opened for write or append.
*/ */
public class BlockUnderConstructionFeature { public class BlockUnderConstructionFeature {
/** Block state. See {@link BlockUCState} */
private BlockUCState blockUCState; private BlockUCState blockUCState;
/** /**
* Block replicas as assigned when the block was allocated. * Block replicas as assigned when the block was allocated.
* This defines the pipeline order.
*/ */
private List<ReplicaUnderConstruction> replicas; private ReplicaUnderConstruction[] replicas;
/** /**
* Index of the primary data node doing the recovery. Useful for log * Index of the primary data node doing the recovery. Useful for log
@ -58,112 +57,21 @@ public class BlockUnderConstructionFeature {
*/ */
private Block truncateBlock; private Block truncateBlock;
/** public BlockUnderConstructionFeature(Block blk,
* ReplicaUnderConstruction contains information about replicas while BlockUCState state, DatanodeStorageInfo[] targets) {
* they are under construction. assert getBlockUCState() != COMPLETE :
* The GS, the length and the state of the replica is as reported by
* the data-node.
* It is not guaranteed, but expected, that data-nodes actually have
* corresponding replicas.
*/
static class ReplicaUnderConstruction {
private long generationStamp;
private final DatanodeStorageInfo expectedLocation;
private ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(long generationStamp, DatanodeStorageInfo target,
ReplicaState state) {
this.generationStamp = generationStamp;
this.expectedLocation = target;
this.state = state;
this.chosenAsPrimary = false;
}
long getGenerationStamp() {
return this.generationStamp;
}
void setGenerationStamp(long generationStamp) {
this.generationStamp = generationStamp;
}
/**
* Expected block replica location as assigned when the block was allocated.
* This defines the pipeline order.
* It is not guaranteed, but expected, that the data-node actually has
* the replica.
*/
DatanodeStorageInfo getExpectedStorageLocation() {
return expectedLocation;
}
/**
* Get replica state as reported by the data-node.
*/
ReplicaState getState() {
return state;
}
/**
* Whether the replica was chosen for recovery.
*/
boolean getChosenAsPrimary() {
return chosenAsPrimary;
}
/**
* Set replica state.
*/
void setState(ReplicaState s) {
state = s;
}
/**
* Set whether this replica was chosen for recovery.
*/
void setChosenAsPrimary(boolean chosenAsPrimary) {
this.chosenAsPrimary = chosenAsPrimary;
}
/**
* Is data-node the replica belongs to alive.
*/
boolean isAlive() {
return expectedLocation.getDatanodeDescriptor().isAlive;
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(50)
.append("ReplicaUC[")
.append(expectedLocation)
.append("|")
.append(state)
.append("]");
return b.toString();
}
}
/**
* Create a block that is currently being constructed.
*/
public BlockUnderConstructionFeature(Block block, BlockUCState state,
DatanodeStorageInfo[] targets) {
assert getBlockUCState() != BlockUCState.COMPLETE :
"BlockUnderConstructionFeature cannot be in COMPLETE state"; "BlockUnderConstructionFeature cannot be in COMPLETE state";
this.blockUCState = state; this.blockUCState = state;
setExpectedLocations(block.getGenerationStamp(), targets); setExpectedLocations(blk, targets);
} }
/** Set expected locations */ /** Set expected locations */
public void setExpectedLocations(long generationStamp, public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets) {
DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length; int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<>(numLocations); this.replicas = new ReplicaUnderConstruction[numLocations];
for(int i = 0; i < numLocations; i++) { for(int i = 0; i < numLocations; i++) {
replicas.add(new ReplicaUnderConstruction(generationStamp, targets[i], replicas[i] = new ReplicaUnderConstruction(block, targets[i],
ReplicaState.RBW)); ReplicaState.RBW);
} }
} }
@ -172,17 +80,17 @@ public class BlockUnderConstructionFeature {
* (as has been assigned by chooseTargets()). * (as has been assigned by chooseTargets()).
*/ */
public DatanodeStorageInfo[] getExpectedStorageLocations() { public DatanodeStorageInfo[] getExpectedStorageLocations() {
int numLocations = replicas == null ? 0 : replicas.size(); int numLocations = getNumExpectedLocations();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
for (int i = 0; i < numLocations; i++) { for (int i = 0; i < numLocations; i++) {
storages[i] = replicas.get(i).getExpectedStorageLocation(); storages[i] = replicas[i].getExpectedStorageLocation();
} }
return storages; return storages;
} }
/** Get the number of expected locations */ /** Get the number of expected locations */
public int getNumExpectedLocations() { public int getNumExpectedLocations() {
return replicas == null ? 0 : replicas.size(); return replicas == null ? 0 : replicas.length;
} }
/** /**
@ -197,7 +105,6 @@ public class BlockUnderConstructionFeature {
blockUCState = s; blockUCState = s;
} }
/** Get block recovery ID */
public long getBlockRecoveryId() { public long getBlockRecoveryId() {
return blockRecoveryId; return blockRecoveryId;
} }
@ -236,13 +143,16 @@ public class BlockUnderConstructionFeature {
* Find the first alive data-node starting from the previous primary and * Find the first alive data-node starting from the previous primary and
* make it primary. * make it primary.
*/ */
public void initializeBlockRecovery(BlockInfo block, long recoveryId) { public void initializeBlockRecovery(BlockInfo blockInfo, long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY); setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId; blockRecoveryId = recoveryId;
if (replicas.size() == 0) { if (replicas == null || replicas.length == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*" NameNode.blockStateChangeLog.warn("BLOCK*" +
+ " BlockUnderConstructionFeature.initLeaseRecovery:" " BlockUnderConstructionFeature.initializeBlockRecovery:" +
+ " No blocks found, lease removed."); " No blocks found, lease removed.");
// sets primary node index and return.
primaryNodeIndex = -1;
return;
} }
boolean allLiveReplicasTriedAsPrimary = true; boolean allLiveReplicasTriedAsPrimary = true;
for (ReplicaUnderConstruction replica : replicas) { for (ReplicaUnderConstruction replica : replicas) {
@ -261,12 +171,12 @@ public class BlockUnderConstructionFeature {
long mostRecentLastUpdate = 0; long mostRecentLastUpdate = 0;
ReplicaUnderConstruction primary = null; ReplicaUnderConstruction primary = null;
primaryNodeIndex = -1; primaryNodeIndex = -1;
for(int i = 0; i < replicas.size(); i++) { for (int i = 0; i < replicas.length; i++) {
// Skip alive replicas which have been chosen for recovery. // Skip alive replicas which have been chosen for recovery.
if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) {
continue; continue;
} }
final ReplicaUnderConstruction ruc = replicas.get(i); final ReplicaUnderConstruction ruc = replicas[i];
final long lastUpdate = ruc.getExpectedStorageLocation() final long lastUpdate = ruc.getExpectedStorageLocation()
.getDatanodeDescriptor().getLastUpdateMonotonic(); .getDatanodeDescriptor().getLastUpdateMonotonic();
if (lastUpdate > mostRecentLastUpdate) { if (lastUpdate > mostRecentLastUpdate) {
@ -277,35 +187,45 @@ public class BlockUnderConstructionFeature {
} }
if (primary != null) { if (primary != null) {
primary.getExpectedStorageLocation().getDatanodeDescriptor() primary.getExpectedStorageLocation().getDatanodeDescriptor()
.addBlockToBeRecovered(block); .addBlockToBeRecovered(blockInfo);
primary.setChosenAsPrimary(true); primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.debug( NameNode.blockStateChangeLog.debug(
"BLOCK* {} recovery started, primary={}", this, primary); "BLOCK* {} recovery started, primary={}", this, primary);
} }
} }
void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block, /** Add the reported replica if it is not already in the replica list. */
ReplicaState rState) { void addReplicaIfNotPresent(DatanodeStorageInfo storage,
Iterator<ReplicaUnderConstruction> it = replicas.iterator(); Block reportedBlock, ReplicaState rState) {
while (it.hasNext()) { if (replicas == null) {
ReplicaUnderConstruction r = it.next(); replicas = new ReplicaUnderConstruction[1];
DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation(); replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage,
if (expectedLocation == storage) { rState);
// Record the gen stamp from the report } else {
r.setGenerationStamp(block.getGenerationStamp()); for (int i = 0; i < replicas.length; i++) {
DatanodeStorageInfo expected =
replicas[i].getExpectedStorageLocation();
if (expected == storage) {
replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp());
return; return;
} else if (expectedLocation != null && } else if (expected != null && expected.getDatanodeDescriptor() ==
expectedLocation.getDatanodeDescriptor() ==
storage.getDatanodeDescriptor()) { storage.getDatanodeDescriptor()) {
// The Datanode reported that the block is on a different storage // The Datanode reported that the block is on a different storage
// than the one chosen by BlockPlacementPolicy. This can occur as // than the one chosen by BlockPlacementPolicy. This can occur as
// we allow Datanodes to choose the target storage. Update our // we allow Datanodes to choose the target storage. Update our
// state by removing the stale entry and adding a new one. // state by removing the stale entry and adding a new one.
it.remove(); replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage,
break; rState);
return;
} }
} }
replicas.add(new ReplicaUnderConstruction(block.getGenerationStamp(), storage, rState)); ReplicaUnderConstruction[] newReplicas =
new ReplicaUnderConstruction[replicas.length + 1];
System.arraycopy(replicas, 0, newReplicas, 0, replicas.length);
newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction(
reportedBlock, storage, rState);
replicas = newReplicas;
}
} }
@Override @Override
@ -321,12 +241,11 @@ public class BlockUnderConstructionFeature {
.append(", primaryNodeIndex=").append(primaryNodeIndex) .append(", primaryNodeIndex=").append(primaryNodeIndex)
.append(", replicas=["); .append(", replicas=[");
if (replicas != null) { if (replicas != null) {
Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); int i = 0;
if (iter.hasNext()) { for (ReplicaUnderConstruction r : replicas) {
sb.append(iter.next()); r.appendStringTo(sb);
while (iter.hasNext()) { if (++i < replicas.length) {
sb.append(", "); sb.append(", ");
sb.append(iter.next());
} }
} }
} }

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
/**
* ReplicaUnderConstruction contains information about replicas (or blocks
* belonging to a block group) while they are under construction.
*
* The GS, the length and the state of the replica is as reported by the
* datanode.
*
* It is not guaranteed, but expected, that datanodes actually have
* corresponding replicas.
*/
class ReplicaUnderConstruction extends Block {
private final DatanodeStorageInfo expectedLocation;
private HdfsServerConstants.ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(Block block,
DatanodeStorageInfo target,
HdfsServerConstants.ReplicaState state) {
super(block);
this.expectedLocation = target;
this.state = state;
this.chosenAsPrimary = false;
}
/**
* Expected block replica location as assigned when the block was allocated.
* This defines the pipeline order.
* It is not guaranteed, but expected, that the data-node actually has
* the replica.
*/
DatanodeStorageInfo getExpectedStorageLocation() {
return expectedLocation;
}
/**
* Get replica state as reported by the data-node.
*/
HdfsServerConstants.ReplicaState getState() {
return state;
}
/**
* Whether the replica was chosen for recovery.
*/
boolean getChosenAsPrimary() {
return chosenAsPrimary;
}
/**
* Set replica state.
*/
void setState(HdfsServerConstants.ReplicaState s) {
state = s;
}
/**
* Set whether this replica was chosen for recovery.
*/
void setChosenAsPrimary(boolean chosenAsPrimary) {
this.chosenAsPrimary = chosenAsPrimary;
}
/**
* Is data-node the replica belongs to alive.
*/
boolean isAlive() {
return expectedLocation.getDatanodeDescriptor().isAlive;
}
@Override // Block
public int hashCode() {
return super.hashCode();
}
@Override // Block
public boolean equals(Object obj) {
// Sufficient to rely on super's implementation
return (this == obj) || super.equals(obj);
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(50);
appendStringTo(b);
return b.toString();
}
@Override
public void appendStringTo(StringBuilder sb) {
sb.append("ReplicaUC[")
.append(expectedLocation)
.append("|")
.append(state)
.append("]");
}
}

View File

@ -237,7 +237,8 @@ class FSDirWriteFileOp {
} else { } else {
// add new chosen targets to already allocated block and return // add new chosen targets to already allocated block and return
BlockInfo lastBlockInFile = pendingFile.getLastBlock(); BlockInfo lastBlockInFile = pendingFile.getLastBlock();
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(lastBlockInFile.getGenerationStamp(), targets); lastBlockInFile.getUnderConstructionFeature().
setExpectedLocations(lastBlockInFile, targets);
offset = pendingFile.computeFileSize(); offset = pendingFile.computeFileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
} }

View File

@ -3085,7 +3085,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new AlreadyBeingCreatedException(message); throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION: case UNDER_CONSTRUCTION:
case UNDER_RECOVERY: case UNDER_RECOVERY:
BlockUnderConstructionFeature uc = lastBlock.getUnderConstructionFeature(); BlockUnderConstructionFeature uc =
lastBlock.getUnderConstructionFeature();
// determine if last block was intended to be truncated // determine if last block was intended to be truncated
Block recoveryBlock = uc.getTruncateBlock(); Block recoveryBlock = uc.getTruncateBlock();
boolean truncateRecovery = recoveryBlock != null; boolean truncateRecovery = recoveryBlock != null;
@ -3099,7 +3100,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// setup the last block locations from the blockManager if not known // setup the last block locations from the blockManager if not known
if (uc.getNumExpectedLocations() == 0) { if (uc.getNumExpectedLocations() == 0) {
uc.setExpectedLocations(lastBlock.getGenerationStamp(), uc.setExpectedLocations(lastBlock,
blockManager.getStorages(lastBlock)); blockManager.getStorages(lastBlock));
} }
@ -5368,7 +5369,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager() final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
.getDatanodeStorageInfos(newNodes, newStorageIDs); .getDatanodeStorageInfos(newNodes, newStorageIDs);
blockinfo.getUnderConstructionFeature().setExpectedLocations( blockinfo.getUnderConstructionFeature().setExpectedLocations(
blockinfo.getGenerationStamp(), storages); blockinfo, storages);
String src = pendingFile.getFullPathName(); String src = pendingFile.getFullPathName();
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache); FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);