HDFS-7853. Erasure coding: extend LocatedBlocks to support reading from striped files. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-03-09 14:59:58 -07:00 committed by Zhe Zhang
parent 8f89d7489d
commit f05c21285e
20 changed files with 428 additions and 133 deletions

View File

@ -49,14 +49,14 @@ public class LocatedBlock {
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
private Token<BlockTokenIdentifier> blockToken = new Token<>();
/**
* List of cached datanode locations
*/
private DatanodeInfo[] cachedLocs;
// Used when there are no locations
private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
static final DatanodeInfoWithStorage[] EMPTY_LOCS =
new DatanodeInfoWithStorage[0];
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import java.util.Arrays;
/**
* {@link LocatedBlock} with striped block support. For a striped block, each
* datanode storage is associated with a block in the block group. We need to
* record the index (in the striped block group) for each of them.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class LocatedStripedBlock extends LocatedBlock {
private int[] blockIndices;
public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes, int[] indices,
long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) {
super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs);
assert indices != null && indices.length == locs.length;
this.blockIndices = new int[indices.length];
System.arraycopy(indices, 0, blockIndices, 0, indices.length);
}
public LocatedStripedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
int[] indices, long startOffset, boolean corrupt) {
this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
DatanodeStorageInfo.toStorageIDs(storages),
DatanodeStorageInfo.toStorageTypes(storages), indices,
startOffset, corrupt, EMPTY_LOCS);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{" + getBlock()
+ "; getBlockSize()=" + getBlockSize()
+ "; corrupt=" + isCorrupt()
+ "; offset=" + getStartOffset()
+ "; locs=" + Arrays.asList(getLocations())
+ "; indices=" + Arrays.asList(blockIndices)
+ "}";
}
public int[] getBlockIndices() {
return this.blockIndices;
}
}

View File

@ -419,7 +419,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
req.getClientName(), flags);
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
if (result.getLastBlock() != null) {
builder.setBlock(PBHelper.convert(result.getLastBlock()));
builder.setBlock(PBHelper.convertLocatedBlock(result.getLastBlock()));
}
if (result.getFileStatus() != null) {
builder.setStat(PBHelper.convert(result.getFileStatus()));
@ -495,7 +495,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
(favor == null || favor.size() == 0) ? null : favor
.toArray(new String[favor.size()]));
return AddBlockResponseProto.newBuilder()
.setBlock(PBHelper.convert(result)).build();
.setBlock(PBHelper.convertLocatedBlock(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -519,7 +519,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
new DatanodeInfoProto[excludesList.size()])),
req.getNumAdditionalNodes(), req.getClientName());
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
PBHelper.convert(result))
PBHelper.convertLocatedBlock(result))
.build();
} catch (IOException e) {
throw new ServiceException(e);
@ -545,8 +545,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
ReportBadBlocksRequestProto req) throws ServiceException {
try {
List<LocatedBlockProto> bl = req.getBlocksList();
server.reportBadBlocks(PBHelper.convertLocatedBlock(
bl.toArray(new LocatedBlockProto[bl.size()])));
server.reportBadBlocks(PBHelper.convertLocatedBlocks(
bl.toArray(new LocatedBlockProto[bl.size()])));
} catch (IOException e) {
throw new ServiceException(e);
}
@ -950,8 +950,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, UpdateBlockForPipelineRequestProto req)
throws ServiceException {
try {
LocatedBlockProto result = PBHelper.convert(server
.updateBlockForPipeline(PBHelper.convert(req.getBlock()),
LocatedBlockProto result = PBHelper.convertLocatedBlock(
server.updateBlockForPipeline(PBHelper.convert(req.getBlock()),
req.getClientName()));
return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
.build();

View File

@ -327,7 +327,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
AppendResponseProto res = rpcProxy.append(null, req);
LocatedBlock lastBlock = res.hasBlock() ? PBHelper
.convert(res.getBlock()) : null;
.convertLocatedBlockProto(res.getBlock()) : null;
HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat())
: null;
return new LastBlockWithStatus(lastBlock, stat);
@ -415,7 +415,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
try {
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
return PBHelper.convertLocatedBlockProto(
rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -440,8 +441,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName)
.build();
try {
return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req)
.getBlock());
return PBHelper.convertLocatedBlockProto(
rpcProxy.getAdditionalDatanode(null, req).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -468,7 +469,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder()
.addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlock(blocks)))
.addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlocks(blocks)))
.build();
try {
rpcProxy.reportBadBlocks(null, req);
@ -900,7 +901,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName)
.build();
try {
return PBHelper.convert(
return PBHelper.convertLocatedBlockProto(
rpcProxy.updateBlockForPipeline(null, req).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);

View File

@ -279,7 +279,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
.newBuilder();
for (int i = 0; i < blocks.length; i++) {
builder.addBlocks(i, PBHelper.convert(blocks[i]));
builder.addBlocks(i, PBHelper.convertLocatedBlock(blocks[i]));
}
ReportBadBlocksRequestProto req = builder.build();
try {

View File

@ -258,7 +258,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
List<LocatedBlockProto> lbps = request.getBlocksList();
LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
for(int i=0; i<lbps.size(); i++) {
blocks[i] = PBHelper.convert(lbps.get(i));
blocks[i] = PBHelper.convertLocatedBlockProto(lbps.get(i));
}
try {
impl.reportBadBlocks(blocks);

View File

@ -81,6 +81,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@ -626,7 +627,7 @@ public class PBHelper {
if (b == null) {
return null;
}
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
LocatedBlockProto lb = PBHelper.convertLocatedBlock(b);
RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
@ -776,7 +777,7 @@ public class PBHelper {
}
}
public static LocatedBlockProto convert(LocatedBlock b) {
public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
if (b == null) return null;
Builder builder = LocatedBlockProto.newBuilder();
DatanodeInfo[] locs = b.getLocations();
@ -797,21 +798,27 @@ public class PBHelper {
StorageType[] storageTypes = b.getStorageTypes();
if (storageTypes != null) {
for (int i = 0; i < storageTypes.length; ++i) {
builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
for (StorageType storageType : storageTypes) {
builder.addStorageTypes(PBHelper.convertStorageType(storageType));
}
}
final String[] storageIDs = b.getStorageIDs();
if (storageIDs != null) {
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
if (b instanceof LocatedStripedBlock) {
int[] indices = ((LocatedStripedBlock) b).getBlockIndices();
for (int index : indices) {
builder.addBlockIndex(index);
}
}
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
public static LocatedBlock convert(LocatedBlockProto proto) {
public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
if (proto == null) return null;
List<DatanodeInfoProto> locs = proto.getLocsList();
DatanodeInfo[] targets = new DatanodeInfo[locs.size()];
@ -831,6 +838,15 @@ public class PBHelper {
storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
}
int[] indices = null;
final int indexCount = proto.getBlockIndexCount();
if (indexCount > 0) {
indices = new int[indexCount];
for (int i = 0; i < indexCount; i++) {
indices[i] = proto.getBlockIndex(i);
}
}
// Set values from the isCached list, re-using references from loc
List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
List<Boolean> isCachedList = proto.getIsCachedList();
@ -840,9 +856,17 @@ public class PBHelper {
}
}
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[0]));
final LocatedBlock lb;
if (indices == null) {
lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs,
storageTypes, proto.getOffset(), proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
} else {
lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets,
storageIDs, storageTypes, indices, proto.getOffset(),
proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
}
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
return lb;
@ -1258,36 +1282,36 @@ public class PBHelper {
}
// Located Block Arrays and Lists
public static LocatedBlockProto[] convertLocatedBlock(LocatedBlock[] lb) {
public static LocatedBlockProto[] convertLocatedBlocks(LocatedBlock[] lb) {
if (lb == null) return null;
return convertLocatedBlock2(Arrays.asList(lb)).toArray(
new LocatedBlockProto[lb.length]);
return convertLocatedBlocks2(Arrays.asList(lb))
.toArray(new LocatedBlockProto[lb.length]);
}
public static LocatedBlock[] convertLocatedBlock(LocatedBlockProto[] lb) {
public static LocatedBlock[] convertLocatedBlocks(LocatedBlockProto[] lb) {
if (lb == null) return null;
return convertLocatedBlock(Arrays.asList(lb)).toArray(
new LocatedBlock[lb.length]);
return convertLocatedBlocks(Arrays.asList(lb))
.toArray(new LocatedBlock[lb.length]);
}
public static List<LocatedBlock> convertLocatedBlock(
public static List<LocatedBlock> convertLocatedBlocks(
List<LocatedBlockProto> lb) {
if (lb == null) return null;
final int len = lb.size();
List<LocatedBlock> result =
new ArrayList<LocatedBlock>(len);
for (int i = 0; i < len; ++i) {
result.add(PBHelper.convert(lb.get(i)));
List<LocatedBlock> result = new ArrayList<>(len);
for (LocatedBlockProto aLb : lb) {
result.add(PBHelper.convertLocatedBlockProto(aLb));
}
return result;
}
public static List<LocatedBlockProto> convertLocatedBlock2(List<LocatedBlock> lb) {
public static List<LocatedBlockProto> convertLocatedBlocks2(
List<LocatedBlock> lb) {
if (lb == null) return null;
final int len = lb.size();
List<LocatedBlockProto> result = new ArrayList<LocatedBlockProto>(len);
for (int i = 0; i < len; ++i) {
result.add(PBHelper.convert(lb.get(i)));
List<LocatedBlockProto> result = new ArrayList<>(len);
for (LocatedBlock aLb : lb) {
result.add(PBHelper.convertLocatedBlock(aLb));
}
return result;
}
@ -1297,8 +1321,9 @@ public class PBHelper {
public static LocatedBlocks convert(LocatedBlocksProto lb) {
return new LocatedBlocks(
lb.getFileLength(), lb.getUnderConstruction(),
PBHelper.convertLocatedBlock(lb.getBlocksList()),
lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null,
PBHelper.convertLocatedBlocks(lb.getBlocksList()),
lb.hasLastBlock() ?
PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null,
lb.getIsLastBlockComplete(),
lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) :
null);
@ -1311,14 +1336,15 @@ public class PBHelper {
LocatedBlocksProto.Builder builder =
LocatedBlocksProto.newBuilder();
if (lb.getLastLocatedBlock() != null) {
builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock()));
builder.setLastBlock(
PBHelper.convertLocatedBlock(lb.getLastLocatedBlock()));
}
if (lb.getFileEncryptionInfo() != null) {
builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
}
return builder.setFileLength(lb.getFileLength())
.setUnderConstruction(lb.isUnderConstruction())
.addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks()))
.addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks()))
.setIsLastBlockComplete(lb.isLastBlockComplete()).build();
}

View File

@ -123,6 +123,11 @@ public class BlockInfoStriped extends BlockInfo {
return -1;
}
int getStorageBlockIndex(DatanodeStorageInfo storage) {
int i = this.findStorageInfo(storage);
return i == -1 ? -1 : indices[i];
}
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfoFromEnd(storage);

View File

@ -23,9 +23,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
@ -39,12 +36,8 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
/**
* Block replicas as assigned when the block was allocated.
*
* TODO: we need to update this attribute, along with the return type of
* getExpectedStorageLocations and LocatedBlock. For striped blocks, clients
* need to understand the index of each striped block in the block group.
*/
private List<ReplicaUnderConstruction> replicas;
private ReplicaUnderConstruction[] replicas;
/**
* The new generation stamp, which this block will have
@ -75,12 +68,12 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
/**
* Convert an under construction striped block to a complete striped block.
*
*
* @return BlockInfoStriped - a complete block.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
* reported from data-nodes.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
* reported from data-nodes.
*/
BlockInfoStriped convertToCompleteBlock() throws IOException {
assert getBlockUCState() != COMPLETE :
@ -91,10 +84,13 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
/** Set expected locations */
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<>(numLocations);
this.replicas = new ReplicaUnderConstruction[numLocations];
for(int i = 0; i < numLocations; i++) {
replicas.add(new ReplicaUnderConstruction(this, targets[i],
ReplicaState.RBW));
// when creating a new block we simply sequentially assign block index to
// each storage
Block blk = new Block(this.getBlockId() + i, this.getGenerationStamp(), 0);
replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
ReplicaState.RBW);
}
}
@ -106,14 +102,24 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
int numLocations = getNumExpectedLocations();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
for (int i = 0; i < numLocations; i++) {
storages[i] = replicas.get(i).getExpectedStorageLocation();
storages[i] = replicas[i].getExpectedStorageLocation();
}
return storages;
}
/** @return the index array indicating the block index in each storage */
public int[] getBlockIndices() {
int numLocations = getNumExpectedLocations();
int[] indices = new int[numLocations];
for (int i = 0; i < numLocations; i++) {
indices[i] = BlockIdManager.getBlockIndex(replicas[i]);
}
return indices;
}
/** Get the number of expected locations */
public int getNumExpectedLocations() {
return replicas == null ? 0 : replicas.size();
return replicas == null ? 0 : replicas.length;
}
/**
@ -178,7 +184,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
public void initializeBlockRecovery(long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId;
if (replicas == null || replicas.size() == 0) {
if (replicas == null || replicas.length == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*" +
" BlockInfoUnderConstruction.initLeaseRecovery:" +
" No blocks found, lease removed.");
@ -186,28 +192,36 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
// TODO we need to implement different recovery logic here
}
void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block,
void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block reportedBlock,
ReplicaState rState) {
Iterator<ReplicaUnderConstruction> it = replicas.iterator();
while (it.hasNext()) {
ReplicaUnderConstruction r = it.next();
DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
if (expectedLocation == storage) {
// Record the gen stamp from the report
r.setGenerationStamp(block.getGenerationStamp());
return;
} else if (expectedLocation != null &&
expectedLocation.getDatanodeDescriptor() ==
storage.getDatanodeDescriptor()) {
// The Datanode reported that the block is on a different storage
// than the one chosen by BlockPlacementPolicy. This can occur as
// we allow Datanodes to choose the target storage. Update our
// state by removing the stale entry and adding a new one.
it.remove();
break;
if (replicas == null) {
replicas = new ReplicaUnderConstruction[1];
replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState);
} else {
for (int i = 0; i < replicas.length; i++) {
DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation();
if (expected == storage) {
replicas[i].setBlockId(reportedBlock.getBlockId());
replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp());
return;
} else if (expected != null && expected.getDatanodeDescriptor() ==
storage.getDatanodeDescriptor()) {
// The Datanode reported that the block is on a different storage
// than the one chosen by BlockPlacementPolicy. This can occur as
// we allow Datanodes to choose the target storage. Update our
// state by removing the stale entry and adding a new one.
replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage,
rState);
return;
}
}
ReplicaUnderConstruction[] newReplicas =
new ReplicaUnderConstruction[replicas.length + 1];
System.arraycopy(replicas, 0, newReplicas, 0, replicas.length);
newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction(
reportedBlock, storage, rState);
replicas = newReplicas;
}
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
}
@Override
@ -226,12 +240,11 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
private void appendUCParts(StringBuilder sb) {
sb.append("{UCState=").append(blockUCState).append(", replicas=[");
if (replicas != null) {
Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
if (iter.hasNext()) {
iter.next().appendStringTo(sb);
while (iter.hasNext()) {
int i = 0;
for (ReplicaUnderConstruction r : replicas) {
r.appendStringTo(sb);
if (++i < replicas.length) {
sb.append(", ");
iter.next().appendStringTo(sb);
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@ -841,13 +842,24 @@ public class BlockManager {
}
/** @return a LocatedBlock for the given block */
private LocatedBlock createLocatedBlock(final BlockInfo blk,
final long pos) throws IOException {
if (blk instanceof BlockInfoContiguousUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
"blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
+ ", blk=" + blk);
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) {
if (!blk.isComplete()) {
if (blk.isStriped()) {
final BlockInfoStripedUnderConstruction uc =
(BlockInfoStripedUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk);
return new LocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
false);
} else {
assert blk instanceof BlockInfoContiguousUnderConstruction;
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk);
return new LocatedBlock(eb, storages, pos, false);
}
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction) blk;
@ -855,7 +867,6 @@ public class BlockManager {
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return newLocatedBlock(eb, storages, pos, false);
}
// TODO support BlockInfoStripedUC
// get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas();
@ -871,13 +882,21 @@ public class BlockManager {
numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
final int[] blockIndices = blk.isStriped() ? new int[numMachines] : null;
int j = 0, i = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!replicaCorrupt))
if (isCorrupt || (!replicaCorrupt)) {
machines[j++] = storage;
// TODO this can be more efficient
if (blockIndices != null) {
int index = ((BlockInfoStriped) blk).getStorageBlockIndex(storage);
assert index >= 0;
blockIndices[i++] = index;
}
}
}
}
assert j == machines.length :
@ -887,7 +906,9 @@ public class BlockManager {
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return newLocatedBlock(eb, machines, pos, isCorrupt);
return blockIndices == null ?
newLocatedBlock(eb, machines, pos, isCorrupt) :
new LocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
}
/** Create a LocatedBlocks. */
@ -2501,7 +2522,8 @@ public class BlockManager {
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
DatanodeStorageInfo storageInfo) throws IOException {
BlockInfo block = ucBlock.storedBlock;
BlockInfo.addReplica(block, storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
BlockInfo.addReplica(block, storageInfo, ucBlock.reportedBlock,
ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
(block.findStorageInfo(storageInfo) < 0)) {

View File

@ -276,7 +276,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
return storageMap.get(storageID);
}
}
DatanodeStorageInfo[] getStorageInfos() {
@VisibleForTesting
public DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]);

View File

@ -206,6 +206,7 @@ public class DatanodeStorageInfo {
return getState() == State.FAILED && numBlocks != 0;
}
@VisibleForTesting
public String getStorageID() {
return storageID;
}

View File

@ -366,16 +366,19 @@ public final class FSImageFormatPBINode {
INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
BlockInfo lastBlk = file.getLastBlock();
// replace the last block of file
final BlockInfo ucBlk;
if (stripeFeature != null) {
BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
ucBlk = new BlockInfoStripedUnderConstruction(striped,
striped.getDataBlockNum(), striped.getParityBlockNum());
} else {
ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk, replication);
if (lastBlk != null) {
// replace the last block of file
final BlockInfo ucBlk;
if (stripeFeature != null) {
BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
ucBlk = new BlockInfoStripedUnderConstruction(striped,
striped.getDataBlockNum(), striped.getParityBlockNum());
} else {
ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk,
replication);
}
file.setBlock(file.numBlocks() - 1, ucBlk);
}
file.setBlock(file.numBlocks() - 1, ucBlk);
}
return file;
}

View File

@ -189,6 +189,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
@ -206,6 +207,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -1780,8 +1782,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LocatedBlocks blocks = res.blocks;
if (blocks != null) {
List<LocatedBlock> blkList = blocks.getLocatedBlocks();
if (blkList == null || blkList.size() == 0 ||
blkList.get(0) instanceof LocatedStripedBlock) {
// no need to sort locations for striped blocks
return blocks;
}
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
clientMachine, blkList);
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();

View File

@ -220,6 +220,7 @@ message LocatedBlockProto {
repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
repeated StorageTypeProto storageTypes = 7;
repeated string storageIDs = 8;
repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage
}
message DataEncryptionKeyProto {

View File

@ -109,7 +109,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@ -133,6 +132,10 @@ import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -1811,7 +1814,7 @@ public class DFSTestUtil {
dn.setLastUpdate(Time.now() + offset);
dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
}
/**
* This method takes a set of block locations and fills the provided buffer
* with expected bytes based on simulated content from
@ -1835,4 +1838,12 @@ public class DFSTestUtil {
}
}
public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
Block block, BlockStatus blockStatus, DatanodeStorage storage) {
ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
return reports;
}
}

View File

@ -489,16 +489,16 @@ public class TestPBHelper {
@Test
public void testConvertLocatedBlock() {
LocatedBlock lb = createLocatedBlock();
LocatedBlockProto lbProto = PBHelper.convert(lb);
LocatedBlock lb2 = PBHelper.convert(lbProto);
LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
compare(lb,lb2);
}
@Test
public void testConvertLocatedBlockNoStorageMedia() {
LocatedBlock lb = createLocatedBlockNoStorageMedia();
LocatedBlockProto lbProto = PBHelper.convert(lb);
LocatedBlock lb2 = PBHelper.convert(lbProto);
LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
compare(lb,lb2);
}
@ -508,8 +508,8 @@ public class TestPBHelper {
for (int i=0;i<3;i++) {
lbl.add(createLocatedBlock());
}
List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlock2(lbl);
List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlock(lbpl);
List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl);
List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl);
assertEquals(lbl.size(), lbl2.size());
for (int i=0;i<lbl.size();i++) {
compare(lbl.get(i), lbl2.get(2));
@ -522,8 +522,8 @@ public class TestPBHelper {
for (int i=0;i<3;i++) {
lbl[i] = createLocatedBlock();
}
LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl);
LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl);
LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl);
LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl);
assertEquals(lbl.length, lbl2.length);
for (int i=0;i<lbl.length;i++) {
compare(lbl[i], lbl2[i]);

View File

@ -227,15 +227,6 @@ public class TestIncrementalBrVariations {
return new Block(10000000L, 100L, 1048576L);
}
private static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
Block block, DatanodeStorage storage) {
ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, BlockStatus.RECEIVED_BLOCK, null);
StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
return reports;
}
/**
* Verify that the NameNode can learn about new storages from incremental
* block reports.
@ -251,8 +242,9 @@ public class TestIncrementalBrVariations {
// Generate a report for a fake block on a fake storage.
final String newStorageUuid = UUID.randomUUID().toString();
final DatanodeStorage newStorage = new DatanodeStorage(newStorageUuid);
StorageReceivedDeletedBlocks[] reports = makeReportForReceivedBlock(
getDummyBlock(), newStorage);
StorageReceivedDeletedBlocks[] reports = DFSTestUtil.
makeReportForReceivedBlock(getDummyBlock(), BlockStatus.RECEIVED_BLOCK,
newStorage);
// Send the report to the NN.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);

View File

@ -19,18 +19,29 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Assert;
@ -38,6 +49,9 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
@ -143,4 +157,131 @@ public class TestAddStripedBlocks {
}
return false;
}
@Test
public void testGetLocatedStripedBlocks() throws Exception {
final Path file = new Path("/file1");
// create an empty file
FSDataOutputStream out = null;
try {
out = dfs.create(file, (short) 1);
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
cluster.getNamesystem().getAdditionalBlock(file.toString(),
fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
BlockInfoStripedUnderConstruction lastBlk =
(BlockInfoStripedUnderConstruction) fileNode.getLastBlock();
DatanodeInfo[] expectedDNs = DatanodeStorageInfo
.toDatanodeInfos(lastBlk.getExpectedStorageLocations());
int[] indices = lastBlk.getBlockIndices();
LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
Assert.assertEquals(1, blks.locatedBlockCount());
LocatedBlock lblk = blks.get(0);
Assert.assertTrue(lblk instanceof LocatedStripedBlock);
DatanodeInfo[] datanodes = lblk.getLocations();
int[] blockIndices = ((LocatedStripedBlock) lblk).getBlockIndices();
Assert.assertEquals(GROUP_SIZE, datanodes.length);
Assert.assertEquals(GROUP_SIZE, blockIndices.length);
Assert.assertArrayEquals(indices, blockIndices);
Assert.assertArrayEquals(expectedDNs, datanodes);
} finally {
IOUtils.cleanup(null, out);
}
}
/**
* Test BlockInfoStripedUnderConstruction#addReplicaIfNotPresent in different
* scenarios.
*/
@Test
public void testAddUCReplica() throws Exception {
final Path file = new Path("/file1");
final List<String> storageIDs = new ArrayList<>();
// create an empty file
FSDataOutputStream out = null;
try {
out = dfs.create(file, (short) 1);
// 1. create the UC striped block
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
cluster.getNamesystem().getAdditionalBlock(file.toString(),
fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
BlockInfo lastBlock = fileNode.getLastBlock();
BlockInfoStripedUnderConstruction ucBlock =
(BlockInfoStripedUnderConstruction) lastBlock;
DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations();
int[] indices = ucBlock.getBlockIndices();
Assert.assertEquals(GROUP_SIZE, locs.length);
Assert.assertEquals(GROUP_SIZE, indices.length);
// 2. mimic incremental block reports and make sure the uc-replica list in
// the BlockStripedUC is correct
int i = 0;
for (DataNode dn : cluster.getDataNodes()) {
final Block block = new Block(lastBlock.getBlockId() + i++,
lastBlock.getGenerationStamp(), 0);
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
storageIDs.add(storage.getStorageID());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
.makeReportForReceivedBlock(block, BlockStatus.RECEIVING_BLOCK,
storage);
for (StorageReceivedDeletedBlocks report : reports) {
cluster.getNamesystem().processIncrementalBlockReport(
dn.getDatanodeId(), report);
}
}
// make sure lastBlock is correct and the storages have been updated
locs = ucBlock.getExpectedStorageLocations();
indices = ucBlock.getBlockIndices();
Assert.assertEquals(GROUP_SIZE, locs.length);
Assert.assertEquals(GROUP_SIZE, indices.length);
for (DatanodeStorageInfo newstorage : locs) {
Assert.assertTrue(storageIDs.contains(newstorage.getStorageID()));
}
} finally {
IOUtils.cleanup(null, out);
}
// 3. restart the namenode. mimic the full block reports and check the
// uc-replica list again
cluster.restartNameNode(true);
final String bpId = cluster.getNamesystem().getBlockPoolId();
INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(file.toString()).asFile();
BlockInfo lastBlock = fileNode.getLastBlock();
int i = GROUP_SIZE - 1;
for (DataNode dn : cluster.getDataNodes()) {
String storageID = storageIDs.get(i);
final Block block = new Block(lastBlock.getBlockId() + i--,
lastBlock.getGenerationStamp(), 0);
DatanodeStorage storage = new DatanodeStorage(storageID);
List<ReplicaBeingWritten> blocks = new ArrayList<>();
ReplicaBeingWritten replica = new ReplicaBeingWritten(block, null, null,
null);
blocks.add(replica);
BlockListAsLongs bll = new BlockListAsLongs(null, blocks);
StorageBlockReport[] reports = {new StorageBlockReport(storage,
bll.getBlockListAsLongs())};
cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
bpId, reports);
}
BlockInfoStripedUnderConstruction ucBlock =
(BlockInfoStripedUnderConstruction) lastBlock;
DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations();
int[] indices = ucBlock.getBlockIndices();
Assert.assertEquals(GROUP_SIZE, locs.length);
Assert.assertEquals(GROUP_SIZE, indices.length);
for (i = 0; i < GROUP_SIZE; i++) {
Assert.assertEquals(storageIDs.get(i),
locs[GROUP_SIZE - 1 - i].getStorageID());
Assert.assertEquals(GROUP_SIZE - i - 1, indices[i]);
}
}
}

View File

@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@ -219,8 +221,7 @@ public class TestFSImage {
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.waitSafeMode(false)
.startupOption(StartupOption.UPGRADE)
.waitSafeMode(false).startupOption(StartupOption.UPGRADE)
.build();
try {
FileSystem fs = cluster.getFileSystem();