HDFS-8137. Send the EC schema to DataNode via EC encoding/recovering command. Contributed by Uma Maheswara Rao G

This commit is contained in:
Uma Maheswara Rao G 2015-05-05 11:22:52 +05:30 committed by Zhe Zhang
parent 6dea01f1ee
commit 436c14855a
9 changed files with 102 additions and 37 deletions

View File

@ -164,3 +164,5 @@
HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout.
(jing9)
HDFS-8137. Send the EC schema to DataNode via EC encoding/recovering command(umamahesh)

View File

@ -3191,8 +3191,10 @@ public class PBHelper {
liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
}
ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema());
return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices);
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema);
}
public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
@ -3217,6 +3219,8 @@ public class PBHelper {
short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema()));
return builder.build();
}

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -84,7 +83,10 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.io.erasurecode.ECSchema;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
@ -94,6 +96,7 @@ import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1554,10 +1557,25 @@ public class BlockManager {
if (block.isStriped()) {
assert rw instanceof ErasureCodingWork;
assert rw.targets.length > 0;
String src = block.getBlockCollection().getName();
ECSchema ecSchema = null;
try {
ecSchema = namesystem.getECSchemaForPath(src);
} catch (IOException e) {
blockLog
.warn("Failed to get the EC schema for the file {} ", src);
}
if (ecSchema == null) {
blockLog.warn("No EC schema found for the file {}. "
+ "So cannot proceed for recovery", src);
// TODO: we may have to revisit later for what we can do better to
// handle this case.
continue;
}
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(namesystem.getBlockPoolId(), block),
rw.srcNodes, rw.targets,
((ErasureCodingWork) rw).liveBlockIndicies);
((ErasureCodingWork) rw).liveBlockIndicies, ecSchema);
} else {
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
}

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@ -51,6 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
@ -608,15 +608,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
/**
* Store block erasure coding work.
*/
void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
assert(block != null && sources != null && sources.length > 0);
void addBlockToBeErasureCoded(ExtendedBlock block,
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
short[] liveBlockIndices, ECSchema ecSchema) {
assert (block != null && sources != null && sources.length > 0);
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
liveBlockIndices);
liveBlockIndices, ecSchema);
erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block recovery task " + task +
"to " + getName() + ", current queue size is " +
erasurecodeBlocks.size());
BlockManager.LOG.debug("Adding block recovery task " + task + "to "
+ getName() + ", current queue size is " + erasurecodeBlocks.size());
}
/**

View File

@ -7581,25 +7581,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
ECInfo getErasureCodingInfo(String src) throws AccessControlException,
UnresolvedLinkException, IOException {
checkOperation(OperationCategory.READ);
final byte[][] pathComponents = FSDirectory
.getPathComponentsForReservedPath(src);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
}
// Get schema set for the zone
ECSchema schema = dir.getECSchema(iip);
if (schema != null) {
return new ECInfo(src, schema);
}
} finally {
readUnlock();
ECSchema schema = getECSchemaForPath(src);
if (schema != null) {
return new ECInfo(src, schema);
}
return null;
}
@ -7841,5 +7825,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
@Override
public ECSchema getECSchemaForPath(String src) throws IOException {
checkOperation(OperationCategory.READ);
final byte[][] pathComponents = FSDirectory
.getPathComponentsForReservedPath(src);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
}
// Get schema set for the zone
return dir.getECSchema(iip);
} finally {
readUnlock();
}
}
}

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
@ -47,4 +49,14 @@ public interface Namesystem extends RwLock, SafeMode {
public void checkOperation(OperationCategory read) throws StandbyException;
public boolean isInSnapshot(BlockCollection bc);
/**
* Gets the ECSchema for the specified path
*
* @param src
* - path
* @return ECSchema
* @throws IOException
*/
public ECSchema getECSchemaForPath(String src) throws IOException;
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.io.erasurecode.ECSchema;
import java.util.Arrays;
import java.util.Collection;
@ -76,9 +77,11 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
private String[] targetStorageIDs;
private StorageType[] targetStorageTypes;
private final short[] liveBlockIndices;
private final ECSchema ecSchema;
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices) {
DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices,
ECSchema ecSchema) {
this.block = block;
this.sources = sources;
this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo);
@ -87,17 +90,20 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
this.targetStorageTypes = DatanodeStorageInfo
.toStorageTypes(targetDnStorageInfo);
this.liveBlockIndices = liveBlockIndices;
this.ecSchema = ecSchema;
}
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeInfo[] targets, String[] targetStorageIDs,
StorageType[] targetStorageTypes, short[] liveBlockIndices) {
StorageType[] targetStorageTypes, short[] liveBlockIndices,
ECSchema ecSchema) {
this.block = block;
this.sources = sources;
this.targets = targets;
this.targetStorageIDs = targetStorageIDs;
this.targetStorageTypes = targetStorageTypes;
this.liveBlockIndices = liveBlockIndices;
this.ecSchema = ecSchema;
}
public ExtendedBlock getExtendedBlock() {
@ -123,6 +129,10 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
public short[] getLiveBlockIndices() {
return liveBlockIndices;
}
public ECSchema getECSchema() {
return ecSchema;
}
@Override
public String toString() {

View File

@ -99,4 +99,5 @@ message BlockECRecoveryInfoProto {
required StorageUuidsProto targetStorageUuids = 4;
required StorageTypesProto targetStorageTypes = 5;
repeated uint32 liveBlockIndices = 6;
required ECSchemaProto ecSchema = 7;
}

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@ -71,8 +70,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@ -88,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -663,7 +663,7 @@ public class TestPBHelper {
short[] liveBlkIndices0 = new short[2];
BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
liveBlkIndices0);
liveBlkIndices0, ECSchemaManager.getSystemDefaultSchema());
DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
@ -677,7 +677,7 @@ public class TestPBHelper {
short[] liveBlkIndices1 = new short[2];
BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
liveBlkIndices1);
liveBlkIndices1, ECSchemaManager.getSystemDefaultSchema());
List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
blkRecoveryInfosList.add(blkECRecoveryInfo0);
blkRecoveryInfosList.add(blkECRecoveryInfo1);
@ -718,6 +718,19 @@ public class TestPBHelper {
for (int i = 0; i < liveBlockIndices1.length; i++) {
assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
}
ECSchema ecSchema1 = blkECRecoveryInfo1.getECSchema();
ECSchema ecSchema2 = blkECRecoveryInfo2.getECSchema();
// Compare ECSchemas same as default ECSchema as we used system default
// ECSchema used in this test
compareECSchemas(ECSchemaManager.getSystemDefaultSchema(), ecSchema1);
compareECSchemas(ECSchemaManager.getSystemDefaultSchema(), ecSchema2);
}
private void compareECSchemas(ECSchema ecSchema1, ECSchema ecSchema2) {
assertEquals(ecSchema1.getSchemaName(), ecSchema2.getSchemaName());
assertEquals(ecSchema1.getNumDataUnits(), ecSchema2.getNumDataUnits());
assertEquals(ecSchema1.getNumParityUnits(), ecSchema2.getNumParityUnits());
}
private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,