HDFS-9575. Use byte array for internal block indices in a striped block. Contributed by jing9

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-12-22 14:46:48 +08:00
parent e88422df45
commit 70d6f20126
24 changed files with 72 additions and 97 deletions

View File

@ -33,22 +33,22 @@ import java.util.Arrays;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class LocatedStripedBlock extends LocatedBlock { public class LocatedStripedBlock extends LocatedBlock {
private static final int[] EMPTY_INDICES = {}; private static final byte[] EMPTY_INDICES = {};
private static final Token<BlockTokenIdentifier> EMPTY_TOKEN = new Token<>(); private static final Token<BlockTokenIdentifier> EMPTY_TOKEN = new Token<>();
private int[] blockIndices; private final byte[] blockIndices;
private Token<BlockTokenIdentifier>[] blockTokens; private Token<BlockTokenIdentifier>[] blockTokens;
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes, int[] indices, String[] storageIDs, StorageType[] storageTypes, byte[] indices,
long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) {
super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs); super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs);
if (indices == null) { if (indices == null) {
this.blockIndices = EMPTY_INDICES; this.blockIndices = EMPTY_INDICES;
} else { } else {
this.blockIndices = new int[indices.length]; this.blockIndices = new byte[indices.length];
System.arraycopy(indices, 0, blockIndices, 0, indices.length); System.arraycopy(indices, 0, blockIndices, 0, indices.length);
} }
blockTokens = new Token[blockIndices.length]; blockTokens = new Token[blockIndices.length];
@ -68,7 +68,7 @@ public class LocatedStripedBlock extends LocatedBlock {
+ "}"; + "}";
} }
public int[] getBlockIndices() { public byte[] getBlockIndices() {
return this.blockIndices; return this.blockIndices;
} }

View File

@ -526,13 +526,9 @@ public class PBHelperClient {
.toArray(new String[storageIDsCount]); .toArray(new String[storageIDsCount]);
} }
int[] indices = null; byte[] indices = null;
final int indexCount = proto.getBlockIndexCount(); if (proto.hasBlockIndices()) {
if (indexCount > 0) { indices = proto.getBlockIndices().toByteArray();
indices = new int[indexCount];
for (int i = 0; i < indexCount; i++) {
indices[i] = proto.getBlockIndex(i);
}
} }
// Set values from the isCached list, re-using references from loc // Set values from the isCached list, re-using references from loc
@ -814,10 +810,10 @@ public class PBHelperClient {
} }
if (b instanceof LocatedStripedBlock) { if (b instanceof LocatedStripedBlock) {
LocatedStripedBlock sb = (LocatedStripedBlock) b; LocatedStripedBlock sb = (LocatedStripedBlock) b;
int[] indices = sb.getBlockIndices(); byte[] indices = sb.getBlockIndices();
builder.setBlockIndices(PBHelperClient.getByteString(indices));
Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens(); Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
for (int i = 0; i < indices.length; i++) { for (int i = 0; i < indices.length; i++) {
builder.addBlockIndex(indices[i]);
builder.addBlockTokens(PBHelperClient.convert(blockTokens[i])); builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
} }
} }

View File

@ -55,6 +55,6 @@ message BlockECRecoveryInfoProto {
required DatanodeInfosProto targetDnInfos = 3; required DatanodeInfosProto targetDnInfos = 3;
required StorageUuidsProto targetStorageUuids = 4; required StorageUuidsProto targetStorageUuids = 4;
required StorageTypesProto targetStorageTypes = 5; required StorageTypesProto targetStorageTypes = 5;
repeated uint32 liveBlockIndices = 6; required bytes liveBlockIndices = 6;
required ErasureCodingPolicyProto ecPolicy = 7; required ErasureCodingPolicyProto ecPolicy = 7;
} }

View File

@ -217,7 +217,7 @@ message LocatedBlockProto {
repeated string storageIDs = 8; repeated string storageIDs = 8;
// striped block related fields // striped block related fields
repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage optional bytes blockIndices = 9; // used for striped block to indicate block index for each storage
repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token
} }

View File

@ -885,6 +885,9 @@ Trunk (Unreleased)
HDFS-9451. Clean up depreated umasks and related unit tests. HDFS-9451. Clean up depreated umasks and related unit tests.
(Wei-Chiu Chuang via wheat9) (Wei-Chiu Chuang via wheat9)
HDFS-9575. Use byte array for internal block indices in a striped block.
(jing9 via szetszwo)
Release 2.9.0 - UNRELEASED Release 2.9.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -364,7 +364,7 @@ public class PBHelper {
RecoveringStripedBlock sb = (RecoveringStripedBlock) b; RecoveringStripedBlock sb = (RecoveringStripedBlock) b;
builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
sb.getErasureCodingPolicy())); sb.getErasureCodingPolicy()));
builder.addAllBlockIndices(asList(sb.getBlockIndices())); builder.setBlockIndices(PBHelperClient.getByteString(sb.getBlockIndices()));
} }
return builder.build(); return builder.build();
} }
@ -381,11 +381,8 @@ public class PBHelper {
} }
if (b.hasEcPolicy()) { if (b.hasEcPolicy()) {
List<Integer> BlockIndicesList = b.getBlockIndicesList(); assert b.hasBlockIndices();
int[] indices = new int[BlockIndicesList.size()]; byte[] indices = b.getBlockIndices().toByteArray();
for (int i = 0; i < BlockIndicesList.size(); i++) {
indices[i] = BlockIndicesList.get(i).shortValue();
}
rBlock = new RecoveringStripedBlock(rBlock, indices, rBlock = new RecoveringStripedBlock(rBlock, indices,
PBHelperClient.convertErasureCodingPolicy(b.getEcPolicy())); PBHelperClient.convertErasureCodingPolicy(b.getEcPolicy()));
} }
@ -840,22 +837,6 @@ public class PBHelper {
build(); build();
} }
private static List<Integer> asList(int[] arr) {
List<Integer> list = new ArrayList<>(arr.length);
for (int s : arr) {
list.add(s);
}
return list;
}
private static List<Integer> asList(short[] arr) {
List<Integer> list = new ArrayList<>(arr.length);
for (int s : arr) {
list.add(s);
}
return list;
}
private static StorageTypesProto convertStorageTypesProto( private static StorageTypesProto convertStorageTypesProto(
StorageType[] targetStorageTypes) { StorageType[] targetStorageTypes) {
StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
@ -914,17 +895,11 @@ public class PBHelper {
targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
.getStorageTypesList().size()); .getStorageTypesList().size());
List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto byte[] liveBlkIndices = blockEcRecoveryInfoProto.getLiveBlockIndices()
.getLiveBlockIndicesList(); .toByteArray();
short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
for (int i = 0; i < liveBlockIndicesList.size(); i++) {
liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
}
ErasureCodingPolicy ecPolicy = ErasureCodingPolicy ecPolicy =
PBHelperClient.convertErasureCodingPolicy( PBHelperClient.convertErasureCodingPolicy(
blockEcRecoveryInfoProto.getEcPolicy()); blockEcRecoveryInfoProto.getEcPolicy());
return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
} }
@ -949,8 +924,8 @@ public class PBHelper {
.getTargetStorageTypes(); .getTargetStorageTypes();
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); byte[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
builder.addAllLiveBlockIndices(asList(liveBlockIndices)); builder.setLiveBlockIndices(PBHelperClient.getByteString(liveBlockIndices));
builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
blockEcRecoveryInfo.getErasureCodingPolicy())); blockEcRecoveryInfo.getErasureCodingPolicy()));

View File

@ -244,8 +244,8 @@ public class BlockIdManager {
return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
} }
public static int getBlockIndex(Block reportedBlock) { public static byte getBlockIndex(Block reportedBlock) {
return (int) (reportedBlock.getBlockId() & return (byte) (reportedBlock.getBlockId() &
HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
} }

View File

@ -146,7 +146,7 @@ public class BlockInfoStriped extends BlockInfo {
return -1; return -1;
} }
int getStorageBlockIndex(DatanodeStorageInfo storage) { byte getStorageBlockIndex(DatanodeStorageInfo storage) {
int i = this.findStorageInfo(storage); int i = this.findStorageInfo(storage);
return i == -1 ? -1 : indices[i]; return i == -1 ? -1 : indices[i];
} }

View File

@ -594,7 +594,7 @@ public class BlockManager implements BlockStatsMXBean {
// source node returned is not used // source node returned is not used
chooseSourceDatanodes(getStoredBlock(block), containingNodes, chooseSourceDatanodes(getStoredBlock(block), containingNodes,
containingLiveReplicasNodes, numReplicas, containingLiveReplicasNodes, numReplicas,
new LinkedList<Short>(), UnderReplicatedBlocks.LEVEL); new LinkedList<Byte>(), UnderReplicatedBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count // not included in the numReplicas.liveReplicas() count
@ -951,7 +951,7 @@ public class BlockManager implements BlockStatsMXBean {
numCorruptNodes == numNodes; numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
final int[] blockIndices = blk.isStriped() ? new int[numMachines] : null; final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
int j = 0, i = 0; int j = 0, i = 0;
if (numMachines > 0) { if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
@ -961,7 +961,7 @@ public class BlockManager implements BlockStatsMXBean {
machines[j++] = storage; machines[j++] = storage;
// TODO this can be more efficient // TODO this can be more efficient
if (blockIndices != null) { if (blockIndices != null) {
int index = ((BlockInfoStriped) blk).getStorageBlockIndex(storage); byte index = ((BlockInfoStriped) blk).getStorageBlockIndex(storage);
assert index >= 0; assert index >= 0;
blockIndices[i++] = index; blockIndices[i++] = index;
} }
@ -1036,7 +1036,7 @@ public class BlockManager implements BlockStatsMXBean {
if (b.isStriped()) { if (b.isStriped()) {
Preconditions.checkState(b instanceof LocatedStripedBlock); Preconditions.checkState(b instanceof LocatedStripedBlock);
LocatedStripedBlock sb = (LocatedStripedBlock) b; LocatedStripedBlock sb = (LocatedStripedBlock) b;
int[] indices = sb.getBlockIndices(); byte[] indices = sb.getBlockIndices();
Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock()); ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock());
for (int i = 0; i < indices.length; i++) { for (int i = 0; i < indices.length; i++) {
@ -1562,7 +1562,7 @@ public class BlockManager implements BlockStatsMXBean {
List<DatanodeDescriptor> containingNodes = new ArrayList<>(); List<DatanodeDescriptor> containingNodes = new ArrayList<>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas(); NumberReplicas numReplicas = new NumberReplicas();
List<Short> liveBlockIndices = new ArrayList<>(); List<Byte> liveBlockIndices = new ArrayList<>();
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
containingNodes, liveReplicaNodes, numReplicas, containingNodes, liveReplicaNodes, numReplicas,
liveBlockIndices, priority); liveBlockIndices, priority);
@ -1599,7 +1599,7 @@ public class BlockManager implements BlockStatsMXBean {
// Wait the previous recovery to finish. // Wait the previous recovery to finish.
return null; return null;
} }
short[] indices = new short[liveBlockIndices.size()]; byte[] indices = new byte[liveBlockIndices.size()];
for (int i = 0 ; i < liveBlockIndices.size(); i++) { for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i); indices[i] = liveBlockIndices.get(i);
} }
@ -1807,7 +1807,7 @@ public class BlockManager implements BlockStatsMXBean {
List<DatanodeDescriptor> containingNodes, List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas, List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas, NumberReplicas numReplicas,
List<Short> liveBlockIndices, int priority) { List<Byte> liveBlockIndices, int priority) {
containingNodes.clear(); containingNodes.clear();
nodesContainingLiveReplicas.clear(); nodesContainingLiveReplicas.clear();
List<DatanodeDescriptor> srcNodes = new ArrayList<>(); List<DatanodeDescriptor> srcNodes = new ArrayList<>();
@ -1866,7 +1866,7 @@ public class BlockManager implements BlockStatsMXBean {
if(isStriped || srcNodes.isEmpty()) { if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node); srcNodes.add(node);
if (isStriped) { if (isStriped) {
liveBlockIndices.add((short) ((BlockInfoStriped) block). liveBlockIndices.add(((BlockInfoStriped) block).
getStorageBlockIndex(storage)); getStorageBlockIndex(storage));
} }
continue; continue;
@ -4160,7 +4160,7 @@ public class BlockManager implements BlockStatsMXBean {
public static LocatedStripedBlock newLocatedStripedBlock( public static LocatedStripedBlock newLocatedStripedBlock(
ExtendedBlock b, DatanodeStorageInfo[] storages, ExtendedBlock b, DatanodeStorageInfo[] storages,
int[] indices, long startOffset, boolean corrupt) { byte[] indices, long startOffset, boolean corrupt) {
// startOffset is unknown // startOffset is unknown
return new LocatedStripedBlock( return new LocatedStripedBlock(
b, DatanodeStorageInfo.toDatanodeInfos(storages), b, DatanodeStorageInfo.toDatanodeInfos(storages),

View File

@ -110,9 +110,9 @@ public class BlockUnderConstructionFeature {
* @return the index array indicating the block index in each storage. Used * @return the index array indicating the block index in each storage. Used
* only by striped blocks. * only by striped blocks.
*/ */
public int[] getBlockIndices() { public byte[] getBlockIndices() {
int numLocations = getNumExpectedLocations(); int numLocations = getNumExpectedLocations();
int[] indices = new int[numLocations]; byte[] indices = new byte[numLocations];
for (int i = 0; i < numLocations; i++) { for (int i = 0; i < numLocations; i++) {
indices[i] = BlockIdManager.getBlockIndex(replicas[i]); indices[i] = BlockIdManager.getBlockIndex(replicas[i]);
} }

View File

@ -603,7 +603,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
*/ */
void addBlockToBeErasureCoded(ExtendedBlock block, void addBlockToBeErasureCoded(ExtendedBlock block,
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
short[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
assert (block != null && sources != null && sources.length > 0); assert (block != null && sources != null && sources.length > 0);
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
liveBlockIndices, ecPolicy); liveBlockIndices, ecPolicy);

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
class ErasureCodingWork extends BlockRecoveryWork { class ErasureCodingWork extends BlockRecoveryWork {
private final short[] liveBlockIndicies; private final byte[] liveBlockIndicies;
public ErasureCodingWork(BlockInfo block, public ErasureCodingWork(BlockInfo block,
BlockCollection bc, BlockCollection bc,
@ -31,14 +31,14 @@ class ErasureCodingWork extends BlockRecoveryWork {
List<DatanodeDescriptor> containingNodes, List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages, List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired, int additionalReplRequired,
int priority, short[] liveBlockIndicies) { int priority, byte[] liveBlockIndicies) {
super(block, bc, srcNodes, containingNodes, super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority); liveReplicaStorages, additionalReplRequired, priority);
this.liveBlockIndicies = liveBlockIndicies; this.liveBlockIndicies = liveBlockIndicies;
BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block); BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block);
} }
short[] getLiveBlockIndicies() { byte[] getLiveBlockIndicies() {
return liveBlockIndicies; return liveBlockIndicies;
} }

View File

@ -372,7 +372,7 @@ public class BlockRecoveryWorker {
private final DatanodeInfo[] locs; private final DatanodeInfo[] locs;
private final long recoveryId; private final long recoveryId;
private final int[] blockIndices; private final byte[] blockIndices;
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;
RecoveryTaskStriped(RecoveringStripedBlock rBlock) { RecoveryTaskStriped(RecoveringStripedBlock rBlock) {

View File

@ -253,7 +253,7 @@ public final class ErasureCodingWorker {
private long positionInBlock; private long positionInBlock;
// sources // sources
private final short[] liveIndices; private final byte[] liveIndices;
private final DatanodeInfo[] sources; private final DatanodeInfo[] sources;
private final List<StripedReader> stripedReaders; private final List<StripedReader> stripedReaders;

View File

@ -76,11 +76,11 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
private DatanodeInfo[] targets; private DatanodeInfo[] targets;
private String[] targetStorageIDs; private String[] targetStorageIDs;
private StorageType[] targetStorageTypes; private StorageType[] targetStorageTypes;
private final short[] liveBlockIndices; private final byte[] liveBlockIndices;
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices, DatanodeStorageInfo[] targetDnStorageInfo, byte[] liveBlockIndices,
ErasureCodingPolicy ecPolicy) { ErasureCodingPolicy ecPolicy) {
this(block, sources, DatanodeStorageInfo this(block, sources, DatanodeStorageInfo
.toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo
@ -90,14 +90,15 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeInfo[] targets, String[] targetStorageIDs, DatanodeInfo[] targets, String[] targetStorageIDs,
StorageType[] targetStorageTypes, short[] liveBlockIndices, StorageType[] targetStorageTypes, byte[] liveBlockIndices,
ErasureCodingPolicy ecPolicy) { ErasureCodingPolicy ecPolicy) {
this.block = block; this.block = block;
this.sources = sources; this.sources = sources;
this.targets = targets; this.targets = targets;
this.targetStorageIDs = targetStorageIDs; this.targetStorageIDs = targetStorageIDs;
this.targetStorageTypes = targetStorageTypes; this.targetStorageTypes = targetStorageTypes;
this.liveBlockIndices = liveBlockIndices; this.liveBlockIndices = liveBlockIndices == null ?
new byte[]{} : liveBlockIndices;
this.ecPolicy = ecPolicy; this.ecPolicy = ecPolicy;
} }
@ -121,7 +122,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
return targetStorageTypes; return targetStorageTypes;
} }
public short[] getLiveBlockIndices() { public byte[] getLiveBlockIndices() {
return liveBlockIndices; return liveBlockIndices;
} }

View File

@ -101,17 +101,17 @@ public class BlockRecoveryCommand extends DatanodeCommand {
} }
public static class RecoveringStripedBlock extends RecoveringBlock { public static class RecoveringStripedBlock extends RecoveringBlock {
private final int[] blockIndices; private final byte[] blockIndices;
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;
public RecoveringStripedBlock(RecoveringBlock rBlock, int[] blockIndices, public RecoveringStripedBlock(RecoveringBlock rBlock, byte[] blockIndices,
ErasureCodingPolicy ecPolicy) { ErasureCodingPolicy ecPolicy) {
super(rBlock); super(rBlock);
this.blockIndices = blockIndices; this.blockIndices = blockIndices == null ? new byte[]{} : blockIndices;
this.ecPolicy = ecPolicy; this.ecPolicy = ecPolicy;
} }
public int[] getBlockIndices() { public byte[] getBlockIndices() {
return blockIndices; return blockIndices;
} }

View File

@ -123,7 +123,7 @@ message RecoveringBlockProto {
optional ErasureCodingPolicyProto ecPolicy = 4; optional ErasureCodingPolicyProto ecPolicy = 4;
// block indices of striped internal blocks for each storage in LocatedBlock // block indices of striped internal blocks for each storage in LocatedBlock
repeated uint32 blockIndices = 5; optional bytes blockIndices = 5;
} }
/** /**

View File

@ -345,7 +345,7 @@ public class StripedFileTestUtil {
assertEquals(groupSize, locs.size()); assertEquals(groupSize, locs.size());
// verify that every internal blocks exists // verify that every internal blocks exists
int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); byte[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
assertEquals(groupSize, blockIndices.length); assertEquals(groupSize, blockIndices.length);
HashSet<Integer> found = new HashSet<>(); HashSet<Integer> found = new HashSet<>();
for (int index : blockIndices) { for (int index : blockIndices) {

View File

@ -203,7 +203,7 @@ public class TestRecoverStripedFile {
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
DatanodeInfo[] storageInfos = lastBlock.getLocations(); DatanodeInfo[] storageInfos = lastBlock.getLocations();
int[] indices = lastBlock.getBlockIndices(); byte[] indices = lastBlock.getBlockIndices();
BitSet bitset = new BitSet(dnNum); BitSet bitset = new BitSet(dnNum);
for (DatanodeInfo storageInfo : storageInfos) { for (DatanodeInfo storageInfo : storageInfos) {
@ -355,7 +355,7 @@ public class TestRecoverStripedFile {
// thread pool submission should succeed, so that it will not prevent // thread pool submission should succeed, so that it will not prevent
// processing other tasks in the list if any exceptions. // processing other tasks in the list if any exceptions.
int size = cluster.dataNodes.size(); int size = cluster.dataNodes.size();
short[] liveIndices = new short[size]; byte[] liveIndices = new byte[size];
DatanodeInfo[] dataDNs = new DatanodeInfo[size + 1]; DatanodeInfo[] dataDNs = new DatanodeInfo[size + 1];
DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),

View File

@ -688,7 +688,7 @@ public class TestPBHelper {
new DatanodeStorage("s01")); new DatanodeStorage("s01"));
DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] { DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
targetDnInfos_0, targetDnInfos_1 }; targetDnInfos_0, targetDnInfos_1 };
short[] liveBlkIndices0 = new short[2]; byte[] liveBlkIndices0 = new byte[2];
BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy()); liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy());
@ -702,7 +702,7 @@ public class TestPBHelper {
new DatanodeStorage("s03")); new DatanodeStorage("s03"));
DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] { DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
targetDnInfos_2, targetDnInfos_3 }; targetDnInfos_2, targetDnInfos_3 };
short[] liveBlkIndices1 = new short[2]; byte[] liveBlkIndices1 = new byte[2];
BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy()); liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy());
@ -741,8 +741,8 @@ public class TestPBHelper {
assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]); assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]);
} }
short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices(); byte[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices();
short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices(); byte[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices();
for (int i = 0; i < liveBlockIndices1.length; i++) { for (int i = 0; i < liveBlockIndices1.length; i++) {
assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]); assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
} }

View File

@ -540,7 +540,7 @@ public class TestBlockManager {
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
new ArrayList<Short>(), new ArrayList<Byte>(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
assertEquals("Does not choose a source node for a less-than-highest-priority" assertEquals("Does not choose a source node for a less-than-highest-priority"
@ -551,7 +551,7 @@ public class TestBlockManager {
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
new ArrayList<Short>(), new ArrayList<Byte>(),
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length); UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
// Increase the replication count to test replication count > hard limit // Increase the replication count to test replication count > hard limit
@ -565,7 +565,7 @@ public class TestBlockManager {
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
new ArrayList<Short>(), new ArrayList<Byte>(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length); UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
} }
@ -591,7 +591,7 @@ public class TestBlockManager {
bm.getStoredBlock(aBlock), bm.getStoredBlock(aBlock),
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new LinkedList<Short>(), new NumberReplicas(), new LinkedList<Byte>(),
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]); UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
@ -605,7 +605,7 @@ public class TestBlockManager {
bm.getStoredBlock(aBlock), bm.getStoredBlock(aBlock),
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new LinkedList<Short>(), new NumberReplicas(), new LinkedList<Byte>(),
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length); UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
} }

View File

@ -757,7 +757,7 @@ public class TestBlockRecovery {
ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager
.getSystemDefaultPolicy(); .getSystemDefaultPolicy();
RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock, RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock,
new int[9], ecPolicy); new byte[9], ecPolicy);
BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn); BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn);
BlockRecoveryWorker.RecoveryTaskStriped recoveryTask = BlockRecoveryWorker.RecoveryTaskStriped recoveryTask =
recoveryWorker.new RecoveryTaskStriped(rBlockStriped); recoveryWorker.new RecoveryTaskStriped(rBlockStriped);

View File

@ -209,7 +209,7 @@ public class TestAddStripedBlocks {
BlockInfoStriped lastBlk = (BlockInfoStriped) fileNode.getLastBlock(); BlockInfoStriped lastBlk = (BlockInfoStriped) fileNode.getLastBlock();
DatanodeInfo[] expectedDNs = DatanodeStorageInfo.toDatanodeInfos( DatanodeInfo[] expectedDNs = DatanodeStorageInfo.toDatanodeInfos(
lastBlk.getUnderConstructionFeature().getExpectedStorageLocations()); lastBlk.getUnderConstructionFeature().getExpectedStorageLocations());
int[] indices = lastBlk.getUnderConstructionFeature().getBlockIndices(); byte[] indices = lastBlk.getUnderConstructionFeature().getBlockIndices();
LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L); LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
Assert.assertEquals(1, blks.locatedBlockCount()); Assert.assertEquals(1, blks.locatedBlockCount());
@ -217,7 +217,7 @@ public class TestAddStripedBlocks {
Assert.assertTrue(lblk instanceof LocatedStripedBlock); Assert.assertTrue(lblk instanceof LocatedStripedBlock);
DatanodeInfo[] datanodes = lblk.getLocations(); DatanodeInfo[] datanodes = lblk.getLocations();
int[] blockIndices = ((LocatedStripedBlock) lblk).getBlockIndices(); byte[] blockIndices = ((LocatedStripedBlock) lblk).getBlockIndices();
Assert.assertEquals(GROUP_SIZE, datanodes.length); Assert.assertEquals(GROUP_SIZE, datanodes.length);
Assert.assertEquals(GROUP_SIZE, blockIndices.length); Assert.assertEquals(GROUP_SIZE, blockIndices.length);
Assert.assertArrayEquals(indices, blockIndices); Assert.assertArrayEquals(indices, blockIndices);
@ -249,7 +249,7 @@ public class TestAddStripedBlocks {
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature() DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()
.getExpectedStorageLocations(); .getExpectedStorageLocations();
int[] indices = lastBlock.getUnderConstructionFeature().getBlockIndices(); byte[] indices = lastBlock.getUnderConstructionFeature().getBlockIndices();
Assert.assertEquals(GROUP_SIZE, locs.length); Assert.assertEquals(GROUP_SIZE, locs.length);
Assert.assertEquals(GROUP_SIZE, indices.length); Assert.assertEquals(GROUP_SIZE, indices.length);
@ -308,7 +308,7 @@ public class TestAddStripedBlocks {
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature() DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()
.getExpectedStorageLocations(); .getExpectedStorageLocations();
int[] indices = lastBlock.getUnderConstructionFeature().getBlockIndices(); byte[] indices = lastBlock.getUnderConstructionFeature().getBlockIndices();
Assert.assertEquals(GROUP_SIZE, locs.length); Assert.assertEquals(GROUP_SIZE, locs.length);
Assert.assertEquals(GROUP_SIZE, indices.length); Assert.assertEquals(GROUP_SIZE, indices.length);
for (i = 0; i < GROUP_SIZE; i++) { for (i = 0; i < GROUP_SIZE; i++) {

View File

@ -126,9 +126,9 @@ public class TestStripedBlockUtil {
DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_WIDTH]; DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_WIDTH];
String[] storageIDs = new String[BLK_GROUP_WIDTH]; String[] storageIDs = new String[BLK_GROUP_WIDTH];
StorageType[] storageTypes = new StorageType[BLK_GROUP_WIDTH]; StorageType[] storageTypes = new StorageType[BLK_GROUP_WIDTH];
int[] indices = new int[BLK_GROUP_WIDTH]; byte[] indices = new byte[BLK_GROUP_WIDTH];
for (int i = 0; i < BLK_GROUP_WIDTH; i++) { for (int i = 0; i < BLK_GROUP_WIDTH; i++) {
indices[i] = (i + 2) % DATA_BLK_NUM; indices[i] = (byte) ((i + 2) % DATA_BLK_NUM);
// Location port always equal to logical index of a block, // Location port always equal to logical index of a block,
// for easier verification // for easier verification
locs[i] = DFSTestUtil.getLocalDatanodeInfo(indices[i]); locs[i] = DFSTestUtil.getLocalDatanodeInfo(indices[i]);