HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for making it ready for transfer to DN (Contributed by Uma Maheswara Rao G)

This commit is contained in:
Vinayakumar B 2015-04-18 23:20:45 +05:30 committed by Zhe Zhang
parent f6e1160ef1
commit a1c9425265
10 changed files with 335 additions and 41 deletions

View File

@ -87,3 +87,6 @@
startup. (Hui Zheng via szetszwo)
HDFS-8167. BlockManager.addBlockCollectionWithCheck should check if the block is a striped block. (Hui Zheng via zhz).
HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for
making it ready for transfer to DN (Uma Maheswara Rao G via vinayakumarb)

View File

@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@ -100,7 +101,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
@ -121,6 +122,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@ -132,11 +134,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaOptionEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECZoneInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
@ -184,7 +186,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StripedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@ -204,8 +205,10 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@ -3150,4 +3153,132 @@ public class PBHelper {
return new ECZoneInfo(ecZoneInfoProto.getDir(),
convertECSchema(ecZoneInfoProto.getSchema()));
}
public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
ExtendedBlock block = convert(blockProto);
DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
.getSourceDnInfos();
DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
.getTargetDnInfos();
DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
.getTargetStorageUuids();
String[] targetStorageUuids = convert(targetStorageUuidsProto);
StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
.getTargetStorageTypes();
StorageType[] convertStorageTypes = convertStorageTypes(
targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
.getStorageTypesList().size());
List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
.getLiveBlockIndicesList();
short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
for (int i = 0; i < liveBlockIndicesList.size(); i++) {
liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
}
return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices);
}
public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
BlockECRecoveryInfo blockEcRecoveryInfo) {
BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
.newBuilder();
builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock()));
DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
StorageType[] targetStorageTypes = blockEcRecoveryInfo
.getTargetStorageTypes();
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
return builder.build();
}
private static List<Integer> convertIntArray(short[] liveBlockIndices) {
List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
for (short s : liveBlockIndices) {
liveBlockIndicesList.add((int) s);
}
return liveBlockIndicesList;
}
private static StorageTypesProto convertStorageTypesProto(
StorageType[] targetStorageTypes) {
StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
for (StorageType storageType : targetStorageTypes) {
builder.addStorageTypes(convertStorageType(storageType));
}
return builder.build();
}
private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
for (String storageUuid : targetStorageIDs) {
builder.addStorageUuids(storageUuid);
}
return builder.build();
}
private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
for (DatanodeInfo datanodeInfo : dnInfos) {
builder.addDatanodes(convert(datanodeInfo));
}
return builder.build();
}
private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
List<String> storageUuidsList = targetStorageUuidsProto
.getStorageUuidsList();
String[] storageUuids = new String[storageUuidsList.size()];
for (int i = 0; i < storageUuidsList.size(); i++) {
storageUuids[i] = storageUuidsList.get(i);
}
return storageUuids;
}
public static BlockECRecoveryCommandProto convert(
BlockECRecoveryCommand blkECRecoveryCmd) {
BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
.newBuilder();
Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
.getECTasks();
for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
builder
.addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
}
return builder.build();
}
public static BlockECRecoveryCommand convert(
BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
.getBlockECRecoveryinfoList();
for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
blkECRecoveryInfos
.add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
}
return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
blkECRecoveryInfos);
}
}

View File

@ -32,8 +32,8 @@ import java.util.Set;
import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -99,34 +100,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class BlockECRecoveryInfo {
public final ExtendedBlock block;
public final DatanodeDescriptor[] sources;
public final DatanodeStorageInfo[] targets;
public final short[] liveBlockIndices;
BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
this.block = block;
this.sources = sources;
this.targets = targets;
this.liveBlockIndices = liveBlockIndices;
}
@Override
public String toString() {
return new StringBuilder().append("BlockECRecoveryInfo(\n ").
append("Recovering ").append(block).
append(" From: ").append(Arrays.asList(sources)).
append(" To: ").append(Arrays.asList(targets)).append(")\n").
append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)).
toString();
}
}
/** A BlockTargetPair queue. */
private static class BlockQueue<E> {
private final Queue<E> blockq = new LinkedList<E>();

View File

@ -34,12 +34,12 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
@ -1442,7 +1442,7 @@ public class DatanodeManager {
List<BlockECRecoveryInfo> pendingECList =
nodeinfo.getErasureCodeCommand(maxTransfers);
if (pendingECList != null) {
cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC,
cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
pendingECList));
}
//check block invalidation

View File

@ -18,10 +18,15 @@
package org.apache.hadoop.hdfs.server.protocol;
import com.google.common.base.Joiner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import java.util.Arrays;
import java.util.Collection;
/**
@ -60,4 +65,77 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
sb.append("\n)");
return sb.toString();
}
/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class BlockECRecoveryInfo {
private final ExtendedBlock block;
private final DatanodeInfo[] sources;
private DatanodeInfo[] targets;
private String[] targetStorageIDs;
private StorageType[] targetStorageTypes;
private final short[] liveBlockIndices;
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices) {
this.block = block;
this.sources = sources;
this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo);
this.targetStorageIDs = DatanodeStorageInfo
.toStorageIDs(targetDnStorageInfo);
this.targetStorageTypes = DatanodeStorageInfo
.toStorageTypes(targetDnStorageInfo);
this.liveBlockIndices = liveBlockIndices;
}
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeInfo[] targets, String[] targetStorageIDs,
StorageType[] targetStorageTypes, short[] liveBlockIndices) {
this.block = block;
this.sources = sources;
this.targets = targets;
this.targetStorageIDs = targetStorageIDs;
this.targetStorageTypes = targetStorageTypes;
this.liveBlockIndices = liveBlockIndices;
}
public ExtendedBlock getExtendedBlock() {
return block;
}
public DatanodeInfo[] getSourceDnInfos() {
return sources;
}
public DatanodeInfo[] getTargetDnInfos() {
return targets;
}
public String[] getTargetStorageIDs() {
return targetStorageIDs;
}
public StorageType[] getTargetStorageTypes() {
return targetStorageTypes;
}
public short[] getLiveBlockIndices() {
return liveBlockIndices;
}
@Override
public String toString() {
return new StringBuilder().append("BlockECRecoveryInfo(\n ")
.append("Recovering ").append(block).append(" From: ")
.append(Arrays.asList(sources)).append(" To: [")
.append(Arrays.asList(targets)).append(")\n")
.append(" Block Indices: ").append(Arrays.asList(liveBlockIndices))
.toString();
}
}
public Collection<BlockECRecoveryInfo> getECTasks() {
return this.ecTasks;
}
}

View File

@ -76,7 +76,7 @@ public interface DatanodeProtocol {
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_CODEC = 11; // uncache blocks
final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command
/**
* Register Datanode.

View File

@ -33,6 +33,7 @@ package hadoop.hdfs.datanode;
import "HAServiceProtocol.proto";
import "hdfs.proto";
import "erasurecoding.proto";
/**
* Information to identify a datanode to a namenode
@ -144,6 +145,13 @@ message RegisterCommandProto {
// void
}
/**
* Block Erasure coding recovery command
*/
message BlockECRecoveryCommandProto {
repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1;
}
/**
* registration - Information of the datanode registering with the namenode
*/

View File

@ -21,6 +21,7 @@ option java_outer_classname = "ErasureCodingProtos";
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
import "hdfs.proto";
/**
* ECSchema options entry
@ -86,4 +87,16 @@ message GetECZoneInfoRequestProto {
message GetECZoneInfoResponseProto {
optional ECZoneInfoProto ECZoneInfo = 1;
}
/**
* Block erasure coding recovery info
*/
message BlockECRecoveryInfoProto {
required ExtendedBlockProto block = 1;
required DatanodeInfosProto sourceDnInfos = 2;
required DatanodeInfosProto targetDnInfos = 3;
required StorageUuidsProto targetStorageUuids = 4;
required StorageTypesProto targetStorageTypes = 5;
repeated uint32 liveBlockIndices = 6;
}

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.permission.AclEntry;
@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
@ -63,15 +66,20 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -639,4 +647,84 @@ public class TestPBHelper {
.build();
Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s)));
}
@Test
public void testBlockECRecoveryCommand() {
DatanodeInfo[] dnInfos0 = new DatanodeInfo[] {
DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
new DatanodeStorage("s00"));
DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
new DatanodeStorage("s01"));
DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
targetDnInfos_0, targetDnInfos_1 };
short[] liveBlkIndices0 = new short[2];
BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
liveBlkIndices0);
DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
new DatanodeStorage("s02"));
DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
new DatanodeStorage("s03"));
DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
targetDnInfos_2, targetDnInfos_3 };
short[] liveBlkIndices1 = new short[2];
BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
liveBlkIndices1);
List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
blkRecoveryInfosList.add(blkECRecoveryInfo0);
blkRecoveryInfosList.add(blkECRecoveryInfo1);
BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand(
DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList);
BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper
.convert(blkECRecoveryCmd);
blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto);
Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks()
.iterator();
assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next());
assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
}
private void assertBlockECRecoveryInfoEquals(
BlockECRecoveryInfo blkECRecoveryInfo1,
BlockECRecoveryInfo blkECRecoveryInfo2) {
assertEquals(blkECRecoveryInfo1.getExtendedBlock(),
blkECRecoveryInfo2.getExtendedBlock());
DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos();
DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos();
assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2);
DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos();
DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos();
assertDnInfosEqual(targetDnInfos1, targetDnInfos2);
String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs();
String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs();
assertEquals(targetStorageIDs1.length, targetStorageIDs2.length);
for (int i = 0; i < targetStorageIDs1.length; i++) {
assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]);
}
short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices();
short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices();
for (int i = 0; i < liveBlockIndices1.length; i++) {
assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
}
}
private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
DatanodeInfo[] dnInfos2) {
assertEquals(dnInfos1.length, dnInfos2.length);
for (int i = 0; i < dnInfos1.length; i++) {
compare(dnInfos1[i], dnInfos2[i]);
}
}
}

View File

@ -29,9 +29,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -115,10 +115,10 @@ public class TestRecoverStripedBlocks {
last.getNumberOfBlocksToBeErasureCoded());
List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
for (BlockECRecoveryInfo info : recovery) {
assertEquals(1, info.targets.length);
assertEquals(last, info.targets[0].getDatanodeDescriptor());
assertEquals(GROUP_SIZE - 1, info.sources.length);
assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length);
assertEquals(1, info.getTargetDnInfos().length);
assertEquals(last, info.getTargetDnInfos()[0]);
assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length);
assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length);
}
}
}