HDFS-6702. Change DFSClient to pass the StorageType from the namenode to datanodes and change datanode to write block replicas using the specified storage type.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612493 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
70dededdc9
commit
25b0e8471e
|
@ -307,6 +307,10 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
|
||||
will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
|
||||
|
||||
HDFS-6702. Change DFSClient to pass the StorageType from the namenode to
|
||||
datanodes and change datanode to write block replicas using the specified
|
||||
storage type. (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
|
|
@ -313,6 +313,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private DataInputStream blockReplyStream;
|
||||
private ResponseProcessor response = null;
|
||||
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
|
||||
private volatile StorageType[] storageTypes = null;
|
||||
private volatile String[] storageIDs = null;
|
||||
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
|
||||
CacheBuilder.newBuilder()
|
||||
|
@ -417,10 +418,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
private void setPipeline(LocatedBlock lb) {
|
||||
setPipeline(lb.getLocations(), lb.getStorageIDs());
|
||||
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
|
||||
}
|
||||
private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
|
||||
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
|
||||
String[] storageIDs) {
|
||||
this.nodes = nodes;
|
||||
this.storageTypes = storageTypes;
|
||||
this.storageIDs = storageIDs;
|
||||
}
|
||||
|
||||
|
@ -446,7 +449,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this.setName("DataStreamer for file " + src);
|
||||
closeResponder();
|
||||
closeStream();
|
||||
setPipeline(null, null);
|
||||
setPipeline(null, null, null);
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
}
|
||||
|
||||
|
@ -1031,10 +1034,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
//transfer replica
|
||||
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
|
||||
final DatanodeInfo[] targets = {nodes[d]};
|
||||
transfer(src, targets, lb.getBlockToken());
|
||||
final StorageType[] targetStorageTypes = {storageTypes[d]};
|
||||
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
|
||||
}
|
||||
|
||||
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||
//transfer replica to the new datanode
|
||||
Socket sock = null;
|
||||
|
@ -1056,7 +1061,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
//send the TRANSFER_BLOCK request
|
||||
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
|
||||
targets);
|
||||
targets, targetStorageTypes);
|
||||
out.flush();
|
||||
|
||||
//ack
|
||||
|
@ -1135,16 +1140,15 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
failed.add(nodes[errorIndex]);
|
||||
|
||||
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
||||
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
|
||||
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
|
||||
newnodes.length-errorIndex);
|
||||
arraycopy(nodes, newnodes, errorIndex);
|
||||
|
||||
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
|
||||
arraycopy(storageTypes, newStorageTypes, errorIndex);
|
||||
|
||||
final String[] newStorageIDs = new String[newnodes.length];
|
||||
System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
|
||||
System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
|
||||
newStorageIDs.length-errorIndex);
|
||||
arraycopy(storageIDs, newStorageIDs, errorIndex);
|
||||
|
||||
setPipeline(newnodes, newStorageIDs);
|
||||
setPipeline(newnodes, newStorageTypes, newStorageIDs);
|
||||
|
||||
// Just took care of a node error while waiting for a node restart
|
||||
if (restartingNodeIndex >= 0) {
|
||||
|
@ -1181,7 +1185,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
// set up the pipeline again with the remaining nodes
|
||||
if (failPacket) { // for testing
|
||||
success = createBlockOutputStream(nodes, newGS, isRecovery);
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
failPacket = false;
|
||||
try {
|
||||
// Give DNs time to send in bad reports. In real situations,
|
||||
|
@ -1190,7 +1194,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
Thread.sleep(2000);
|
||||
} catch (InterruptedException ie) {}
|
||||
} else {
|
||||
success = createBlockOutputStream(nodes, newGS, isRecovery);
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
}
|
||||
|
||||
if (restartingNodeIndex >= 0) {
|
||||
|
@ -1242,6 +1246,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private LocatedBlock nextBlockOutputStream() throws IOException {
|
||||
LocatedBlock lb = null;
|
||||
DatanodeInfo[] nodes = null;
|
||||
StorageType[] storageTypes = null;
|
||||
int count = dfsClient.getConf().nBlockWriteRetry;
|
||||
boolean success = false;
|
||||
ExtendedBlock oldBlock = block;
|
||||
|
@ -1264,11 +1269,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
bytesSent = 0;
|
||||
accessToken = lb.getBlockToken();
|
||||
nodes = lb.getLocations();
|
||||
storageTypes = lb.getStorageTypes();
|
||||
|
||||
//
|
||||
// Connect to first DataNode in the list.
|
||||
//
|
||||
success = createBlockOutputStream(nodes, 0L, false);
|
||||
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
|
||||
|
||||
if (!success) {
|
||||
DFSClient.LOG.info("Abandoning " + block);
|
||||
|
@ -1289,8 +1295,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// connects to the first datanode in the pipeline
|
||||
// Returns true if success, otherwise return failure.
|
||||
//
|
||||
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
||||
boolean recoveryFlag) {
|
||||
private boolean createBlockOutputStream(DatanodeInfo[] nodes,
|
||||
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
|
||||
if (nodes.length == 0) {
|
||||
DFSClient.LOG.info("nodes are empty for write pipeline of block "
|
||||
+ block);
|
||||
|
@ -1332,9 +1338,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// Xmit header info to datanode
|
||||
//
|
||||
|
||||
BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
|
||||
// send the request
|
||||
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
|
||||
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
|
||||
new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
|
||||
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
|
||||
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
|
||||
cachingStrategy.get());
|
||||
|
||||
|
@ -2197,4 +2204,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
public long getFileId() {
|
||||
return fileId;
|
||||
}
|
||||
|
||||
private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
|
||||
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
|
||||
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
|
@ -71,11 +72,20 @@ public interface DataTransferProtocol {
|
|||
|
||||
/**
|
||||
* Write a block to a datanode pipeline.
|
||||
*
|
||||
* The receiver datanode of this call is the next datanode in the pipeline.
|
||||
* The other downstream datanodes are specified by the targets parameter.
|
||||
* Note that the receiver {@link DatanodeInfo} is not required in the
|
||||
* parameter list since the receiver datanode knows its info. However, the
|
||||
* {@link StorageType} for storing the replica in the receiver datanode is a
|
||||
* parameter since the receiver datanode may support multiple storage types.
|
||||
*
|
||||
* @param blk the block being written.
|
||||
* @param storageType for storing the replica in the receiver datanode.
|
||||
* @param blockToken security token for accessing the block.
|
||||
* @param clientName client's name.
|
||||
* @param targets target datanodes in the pipeline.
|
||||
* @param targets other downstream datanodes in the pipeline.
|
||||
* @param targetStorageTypes target {@link StorageType}s corresponding
|
||||
* to the target datanodes.
|
||||
* @param source source datanode.
|
||||
* @param stage pipeline stage.
|
||||
* @param pipelineSize the size of the pipeline.
|
||||
|
@ -84,9 +94,11 @@ public interface DataTransferProtocol {
|
|||
* @param latestGenerationStamp the latest generation stamp of the block.
|
||||
*/
|
||||
public void writeBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final DatanodeInfo source,
|
||||
final BlockConstructionStage stage,
|
||||
final int pipelineSize,
|
||||
|
@ -110,7 +122,8 @@ public interface DataTransferProtocol {
|
|||
public void transferBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets) throws IOException;
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes) throws IOException;
|
||||
|
||||
/**
|
||||
* Request short circuit access file descriptors from a DataNode.
|
||||
|
@ -148,11 +161,13 @@ public interface DataTransferProtocol {
|
|||
* It is used for balancing purpose.
|
||||
*
|
||||
* @param blk the block being replaced.
|
||||
* @param storageType the {@link StorageType} for storing the block.
|
||||
* @param blockToken security token for accessing the block.
|
||||
* @param delHint the hint for deleting the block in the original datanode.
|
||||
* @param source the source datanode for receiving the block.
|
||||
*/
|
||||
public void replaceBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String delHint,
|
||||
final DatanodeInfo source) throws IOException;
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
||||
|
@ -121,10 +122,13 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
/** Receive OP_WRITE_BLOCK */
|
||||
private void opWriteBlock(DataInputStream in) throws IOException {
|
||||
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
|
||||
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
|
||||
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
PBHelper.convert(proto.getTargetsList()),
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
|
||||
PBHelper.convert(proto.getSource()),
|
||||
fromProto(proto.getStage()),
|
||||
proto.getPipelineSize(),
|
||||
|
@ -140,10 +144,12 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
private void opTransferBlock(DataInputStream in) throws IOException {
|
||||
final OpTransferBlockProto proto =
|
||||
OpTransferBlockProto.parseFrom(vintPrefixed(in));
|
||||
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
|
||||
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
PBHelper.convert(proto.getTargetsList()));
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
|
||||
}
|
||||
|
||||
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
|
||||
|
@ -176,6 +182,7 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
private void opReplaceBlock(DataInputStream in) throws IOException {
|
||||
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
|
||||
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
proto.getDelHint(),
|
||||
PBHelper.convert(proto.getSource()));
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
|
@ -111,9 +112,11 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
@Override
|
||||
public void writeBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final DatanodeInfo source,
|
||||
final BlockConstructionStage stage,
|
||||
final int pipelineSize,
|
||||
|
@ -130,7 +133,9 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
|
||||
.setHeader(header)
|
||||
.setStorageType(PBHelper.convertStorageType(storageType))
|
||||
.addAllTargets(PBHelper.convert(targets, 1))
|
||||
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
|
||||
.setStage(toProto(stage))
|
||||
.setPipelineSize(pipelineSize)
|
||||
.setMinBytesRcvd(minBytesRcvd)
|
||||
|
@ -150,12 +155,14 @@ public class Sender implements DataTransferProtocol {
|
|||
public void transferBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets) throws IOException {
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes) throws IOException {
|
||||
|
||||
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildClientHeader(
|
||||
blk, clientName, blockToken))
|
||||
.addAllTargets(PBHelper.convert(targets))
|
||||
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
|
||||
.build();
|
||||
|
||||
send(out, Op.TRANSFER_BLOCK, proto);
|
||||
|
@ -196,11 +203,13 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
@Override
|
||||
public void replaceBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String delHint,
|
||||
final DatanodeInfo source) throws IOException {
|
||||
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
|
||||
.setStorageType(PBHelper.convertStorageType(storageType))
|
||||
.setDelHint(delHint)
|
||||
.setSource(PBHelper.convertDatanodeInfo(source))
|
||||
.build();
|
||||
|
|
|
@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryLi
|
|||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
|
||||
|
@ -674,14 +675,8 @@ public class PBHelper {
|
|||
targets[i] = PBHelper.convert(locs.get(i));
|
||||
}
|
||||
|
||||
final int storageTypesCount = proto.getStorageTypesCount();
|
||||
final StorageType[] storageTypes;
|
||||
if (storageTypesCount == 0) {
|
||||
storageTypes = null;
|
||||
} else {
|
||||
Preconditions.checkState(storageTypesCount == locs.size());
|
||||
storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
|
||||
}
|
||||
final StorageType[] storageTypes = convertStorageTypes(
|
||||
proto.getStorageTypesList(), locs.size());
|
||||
|
||||
final int storageIDsCount = proto.getStorageIDsCount();
|
||||
final String[] storageIDs;
|
||||
|
@ -969,6 +964,20 @@ public class PBHelper {
|
|||
targets[i] = PBHelper.convert(targetList.get(i));
|
||||
}
|
||||
|
||||
StorageType[][] targetStorageTypes = new StorageType[targetList.size()][];
|
||||
List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList();
|
||||
if (targetStorageTypesList.isEmpty()) { // missing storage types
|
||||
for(int i = 0; i < targetStorageTypes.length; i++) {
|
||||
targetStorageTypes[i] = new StorageType[targets[i].length];
|
||||
Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT);
|
||||
}
|
||||
} else {
|
||||
for(int i = 0; i < targetStorageTypes.length; i++) {
|
||||
List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
|
||||
targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
|
||||
String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
|
||||
for(int i = 0; i < targetStorageIDs.length; i++) {
|
||||
|
@ -991,7 +1000,7 @@ public class PBHelper {
|
|||
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
|
||||
}
|
||||
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
|
||||
targetStorageIDs);
|
||||
targetStorageTypes, targetStorageIDs);
|
||||
}
|
||||
|
||||
public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
|
||||
|
@ -1605,8 +1614,25 @@ public class PBHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static StorageTypeProto convertStorageType(
|
||||
StorageType type) {
|
||||
public static List<StorageTypeProto> convertStorageTypes(
|
||||
StorageType[] types) {
|
||||
return convertStorageTypes(types, 0);
|
||||
}
|
||||
|
||||
public static List<StorageTypeProto> convertStorageTypes(
|
||||
StorageType[] types, int startIdx) {
|
||||
if (types == null) {
|
||||
return null;
|
||||
}
|
||||
final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
|
||||
types.length);
|
||||
for (int i = startIdx; i < types.length; ++i) {
|
||||
protos.add(convertStorageType(types[i]));
|
||||
}
|
||||
return protos;
|
||||
}
|
||||
|
||||
public static StorageTypeProto convertStorageType(StorageType type) {
|
||||
switch(type) {
|
||||
case DISK:
|
||||
return StorageTypeProto.DISK;
|
||||
|
@ -1621,7 +1647,7 @@ public class PBHelper {
|
|||
public static DatanodeStorage convert(DatanodeStorageProto s) {
|
||||
return new DatanodeStorage(s.getStorageUuid(),
|
||||
PBHelper.convertState(s.getState()),
|
||||
PBHelper.convertType(s.getStorageType()));
|
||||
PBHelper.convertStorageType(s.getStorageType()));
|
||||
}
|
||||
|
||||
private static State convertState(StorageState state) {
|
||||
|
@ -1634,7 +1660,7 @@ public class PBHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static StorageType convertType(StorageTypeProto type) {
|
||||
public static StorageType convertStorageType(StorageTypeProto type) {
|
||||
switch(type) {
|
||||
case DISK:
|
||||
return StorageType.DISK;
|
||||
|
@ -1646,11 +1672,16 @@ public class PBHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static StorageType[] convertStorageTypeProtos(
|
||||
List<StorageTypeProto> storageTypesList) {
|
||||
final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
|
||||
for (int i = 0; i < storageTypes.length; ++i) {
|
||||
storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
|
||||
public static StorageType[] convertStorageTypes(
|
||||
List<StorageTypeProto> storageTypesList, int expectedSize) {
|
||||
final StorageType[] storageTypes = new StorageType[expectedSize];
|
||||
if (storageTypesList.size() != expectedSize) { // missing storage types
|
||||
Preconditions.checkState(storageTypesList.isEmpty());
|
||||
Arrays.fill(storageTypes, StorageType.DEFAULT);
|
||||
} else {
|
||||
for (int i = 0; i < storageTypes.length; ++i) {
|
||||
storageTypes[i] = convertStorageType(storageTypesList.get(i));
|
||||
}
|
||||
}
|
||||
return storageTypes;
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configured;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -368,7 +369,7 @@ public class Balancer {
|
|||
in = new DataInputStream(new BufferedInputStream(unbufIn,
|
||||
HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
sendRequest(out, eb, accessToken);
|
||||
sendRequest(out, eb, StorageType.DEFAULT, accessToken);
|
||||
receiveResponse(in);
|
||||
bytesMoved.addAndGet(block.getNumBytes());
|
||||
LOG.info("Successfully moved " + this);
|
||||
|
@ -400,8 +401,9 @@ public class Balancer {
|
|||
|
||||
/* Send a block replace request to the output stream*/
|
||||
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
|
||||
StorageType storageType,
|
||||
Token<BlockTokenIdentifier> accessToken) throws IOException {
|
||||
new Sender(out).replaceBlock(eb, accessToken,
|
||||
new Sender(out).replaceBlock(eb, storageType, accessToken,
|
||||
source.getStorageID(), proxySource.getDatanode());
|
||||
}
|
||||
|
||||
|
|
|
@ -575,7 +575,8 @@ class BPOfferService {
|
|||
switch(cmd.getAction()) {
|
||||
case DatanodeProtocol.DNA_TRANSFER:
|
||||
// Send a copy of a block to another datanode
|
||||
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
|
||||
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
|
||||
bcmd.getTargets(), bcmd.getTargetStorageTypes());
|
||||
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
|
||||
break;
|
||||
case DatanodeProtocol.DNA_INVALIDATE:
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.util.zip.Checksum;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.FSOutputSummer;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -122,7 +123,8 @@ class BlockReceiver implements Closeable {
|
|||
private boolean syncOnClose;
|
||||
private long restartBudget;
|
||||
|
||||
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
|
||||
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
||||
final DataInputStream in,
|
||||
final String inAddr, final String myAddr,
|
||||
final BlockConstructionStage stage,
|
||||
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
|
||||
|
@ -162,11 +164,11 @@ class BlockReceiver implements Closeable {
|
|||
// Open local disk out
|
||||
//
|
||||
if (isDatanode) { //replication or move
|
||||
replicaInfo = datanode.data.createTemporary(block);
|
||||
replicaInfo = datanode.data.createTemporary(storageType, block);
|
||||
} else {
|
||||
switch (stage) {
|
||||
case PIPELINE_SETUP_CREATE:
|
||||
replicaInfo = datanode.data.createRbw(block);
|
||||
replicaInfo = datanode.data.createRbw(storageType, block);
|
||||
datanode.notifyNamenodeReceivingBlock(
|
||||
block, replicaInfo.getStorageUuid());
|
||||
break;
|
||||
|
@ -198,7 +200,7 @@ class BlockReceiver implements Closeable {
|
|||
case TRANSFER_RBW:
|
||||
case TRANSFER_FINALIZED:
|
||||
// this is a transfer destination
|
||||
replicaInfo = datanode.data.createTemporary(block);
|
||||
replicaInfo = datanode.data.createTemporary(storageType, block);
|
||||
break;
|
||||
default: throw new IOException("Unsupported stage " + stage +
|
||||
" while receiving block " + block + " from " + inAddr);
|
||||
|
|
|
@ -19,11 +19,66 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.net.DomainPeerServer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
|
||||
|
@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Client
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
|
||||
import org.apache.hadoop.hdfs.protocolPB.*;
|
||||
import org.apache.hadoop.hdfs.security.token.block.*;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
|
@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
|
@ -88,22 +171,21 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.*;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ServicePlugin;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import java.io.*;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.*;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
/**********************************************************
|
||||
* DataNode is a class (and program) that stores a set of
|
||||
|
@ -1475,8 +1557,8 @@ public class DataNode extends Configured
|
|||
return xmitsInProgress.get();
|
||||
}
|
||||
|
||||
private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
|
||||
throws IOException {
|
||||
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
|
||||
StorageType[] xferTargetStorageTypes) throws IOException {
|
||||
BPOfferService bpos = getBPOSForBlock(block);
|
||||
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
||||
|
||||
|
@ -1512,16 +1594,17 @@ public class DataNode extends Configured
|
|||
LOG.info(bpReg + " Starting thread to transfer " +
|
||||
block + " to " + xfersBuilder);
|
||||
|
||||
new Daemon(new DataTransfer(xferTargets, block,
|
||||
new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
|
||||
}
|
||||
}
|
||||
|
||||
void transferBlocks(String poolId, Block blocks[],
|
||||
DatanodeInfo xferTargets[][]) {
|
||||
DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
try {
|
||||
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
|
||||
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
|
||||
xferTargetStorageTypes[i]);
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed to transfer block " + blocks[i], ie);
|
||||
}
|
||||
|
@ -1624,6 +1707,7 @@ public class DataNode extends Configured
|
|||
*/
|
||||
private class DataTransfer implements Runnable {
|
||||
final DatanodeInfo[] targets;
|
||||
final StorageType[] targetStorageTypes;
|
||||
final ExtendedBlock b;
|
||||
final BlockConstructionStage stage;
|
||||
final private DatanodeRegistration bpReg;
|
||||
|
@ -1634,7 +1718,8 @@ public class DataNode extends Configured
|
|||
* Connect to the first item in the target list. Pass along the
|
||||
* entire target list, the block, and the data.
|
||||
*/
|
||||
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
|
||||
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
|
||||
ExtendedBlock b, BlockConstructionStage stage,
|
||||
final String clientname) {
|
||||
if (DataTransferProtocol.LOG.isDebugEnabled()) {
|
||||
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
|
||||
|
@ -1644,6 +1729,7 @@ public class DataNode extends Configured
|
|||
+ ", targests=" + Arrays.asList(targets));
|
||||
}
|
||||
this.targets = targets;
|
||||
this.targetStorageTypes = targetStorageTypes;
|
||||
this.b = b;
|
||||
this.stage = stage;
|
||||
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
|
||||
|
@ -1702,7 +1788,8 @@ public class DataNode extends Configured
|
|||
false, false, true, DataNode.this, null, cachingStrategy);
|
||||
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
|
||||
|
||||
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
|
||||
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
|
||||
clientname, targets, targetStorageTypes, srcNode,
|
||||
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
|
||||
|
||||
// send data & checksum
|
||||
|
@ -2403,7 +2490,8 @@ public class DataNode extends Configured
|
|||
* @param client client name
|
||||
*/
|
||||
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
|
||||
final DatanodeInfo[] targets, final String client) throws IOException {
|
||||
final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
|
||||
final String client) throws IOException {
|
||||
final long storedGS;
|
||||
final long visible;
|
||||
final BlockConstructionStage stage;
|
||||
|
@ -2436,7 +2524,7 @@ public class DataNode extends Configured
|
|||
b.setNumBytes(visible);
|
||||
|
||||
if (targets.length > 0) {
|
||||
new DataTransfer(targets, b, stage, client).run();
|
||||
new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.util.Arrays;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -524,9 +525,11 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
|
||||
@Override
|
||||
public void writeBlock(final ExtendedBlock block,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientname,
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final DatanodeInfo srcDataNode,
|
||||
final BlockConstructionStage stage,
|
||||
final int pipelineSize,
|
||||
|
@ -590,12 +593,13 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
if (isDatanode ||
|
||||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
||||
// open a block receiver
|
||||
blockReceiver = new BlockReceiver(block, in,
|
||||
blockReceiver = new BlockReceiver(block, storageType, in,
|
||||
peer.getRemoteAddressString(),
|
||||
peer.getLocalAddressString(),
|
||||
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
|
||||
clientname, srcDataNode, datanode, requestedChecksum,
|
||||
cachingStrategy);
|
||||
|
||||
storageUuid = blockReceiver.getStorageUuid();
|
||||
} else {
|
||||
storageUuid = datanode.data.recoverClose(
|
||||
|
@ -636,10 +640,10 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
mirrorIn = new DataInputStream(unbufMirrorIn);
|
||||
|
||||
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
|
||||
clientname, targets, srcDataNode, stage, pipelineSize,
|
||||
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
|
||||
cachingStrategy);
|
||||
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
|
||||
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
|
||||
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
|
||||
latestGenerationStamp, requestedChecksum, cachingStrategy);
|
||||
|
||||
mirrorOut.flush();
|
||||
|
||||
|
@ -754,7 +758,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
public void transferBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets) throws IOException {
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes) throws IOException {
|
||||
checkAccess(socketOut, true, blk, blockToken,
|
||||
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
|
||||
previousOpClientName = clientName;
|
||||
|
@ -763,7 +768,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
final DataOutputStream out = new DataOutputStream(
|
||||
getOutputStream());
|
||||
try {
|
||||
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
|
||||
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
||||
targetStorageTypes, clientName);
|
||||
writeResponse(Status.SUCCESS, null, out);
|
||||
} finally {
|
||||
IOUtils.closeStream(out);
|
||||
|
@ -941,6 +947,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
|
||||
@Override
|
||||
public void replaceBlock(final ExtendedBlock block,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String delHint,
|
||||
final DatanodeInfo proxySource) throws IOException {
|
||||
|
@ -1026,8 +1033,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
|
||||
checksumInfo.getChecksum());
|
||||
// open a block receiver and check if the block does not exist
|
||||
blockReceiver = new BlockReceiver(
|
||||
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
||||
blockReceiver = new BlockReceiver(block, storageType,
|
||||
proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
||||
proxySock.getLocalSocketAddress().toString(),
|
||||
null, 0, 0, 0, "", null, datanode, remoteChecksum,
|
||||
CachingStrategy.newDropBehind());
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
|
@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* @return the meta info of the replica which is being written to
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
|
||||
) throws IOException;
|
||||
public ReplicaInPipelineInterface createTemporary(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a RBW replica and returns the meta info of the replica
|
||||
|
@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* @return the meta info of the replica which is being written to
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public ReplicaInPipelineInterface createRbw(ExtendedBlock b
|
||||
) throws IOException;
|
||||
public ReplicaInPipelineInterface createRbw(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException;
|
||||
|
||||
/**
|
||||
* Recovers a RBW replica and returns the meta info of the replica
|
||||
|
|
|
@ -17,6 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.*;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
|
@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**************************************************
|
||||
* FSDataset manages a set of data blocks. Each block
|
||||
* has a unique name and an extent on disk.
|
||||
|
@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
|
||||
throws IOException {
|
||||
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
|
@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
" and thus cannot be created.");
|
||||
}
|
||||
// create a new block
|
||||
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
|
||||
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
|
||||
// create a rbw file to hold block in the designated volume
|
||||
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
|
||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
|
@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
|
||||
throws IOException {
|
||||
public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
throw new ReplicaAlreadyExistsException("Block " + b +
|
||||
|
@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
" and thus cannot be created.");
|
||||
}
|
||||
|
||||
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
|
||||
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
|
||||
// create a temporary file to hold block in the designated volume
|
||||
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
|
||||
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
||||
|
|
|
@ -18,13 +18,17 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
class FsVolumeList {
|
||||
/**
|
||||
|
@ -52,11 +56,18 @@ class FsVolumeList {
|
|||
* by a single thread and next volume is chosen with no concurrent
|
||||
* update to {@link #volumes}.
|
||||
* @param blockSize free space needed on the volume
|
||||
* @param storageType the desired {@link StorageType}
|
||||
* @return next volume to store the block in.
|
||||
*/
|
||||
// TODO should choose volume with storage type
|
||||
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
|
||||
return blockChooser.chooseVolume(volumes, blockSize);
|
||||
synchronized FsVolumeImpl getNextVolume(StorageType storageType,
|
||||
long blockSize) throws IOException {
|
||||
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
|
||||
for(FsVolumeImpl v : volumes) {
|
||||
if (v.getStorageType() == storageType) {
|
||||
list.add(v);
|
||||
}
|
||||
}
|
||||
return blockChooser.chooseVolume(list, blockSize);
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||
|
@ -50,6 +51,7 @@ public class BlockCommand extends DatanodeCommand {
|
|||
final String poolId;
|
||||
final Block[] blocks;
|
||||
final DatanodeInfo[][] targets;
|
||||
final StorageType[][] targetStorageTypes;
|
||||
final String[][] targetStorageIDs;
|
||||
|
||||
/**
|
||||
|
@ -62,17 +64,20 @@ public class BlockCommand extends DatanodeCommand {
|
|||
this.poolId = poolId;
|
||||
blocks = new Block[blocktargetlist.size()];
|
||||
targets = new DatanodeInfo[blocks.length][];
|
||||
targetStorageTypes = new StorageType[blocks.length][];
|
||||
targetStorageIDs = new String[blocks.length][];
|
||||
|
||||
for(int i = 0; i < blocks.length; i++) {
|
||||
BlockTargetPair p = blocktargetlist.get(i);
|
||||
blocks[i] = p.block;
|
||||
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
|
||||
targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
|
||||
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
|
||||
}
|
||||
}
|
||||
|
||||
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
|
||||
private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
|
||||
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
|
||||
|
||||
/**
|
||||
|
@ -81,7 +86,7 @@ public class BlockCommand extends DatanodeCommand {
|
|||
*/
|
||||
public BlockCommand(int action, String poolId, Block blocks[]) {
|
||||
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
|
||||
EMPTY_TARGET_STORAGEIDS);
|
||||
EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -89,11 +94,13 @@ public class BlockCommand extends DatanodeCommand {
|
|||
* @param blocks blocks related to the action
|
||||
*/
|
||||
public BlockCommand(int action, String poolId, Block[] blocks,
|
||||
DatanodeInfo[][] targets, String[][] targetStorageIDs) {
|
||||
DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
|
||||
String[][] targetStorageIDs) {
|
||||
super(action);
|
||||
this.poolId = poolId;
|
||||
this.blocks = blocks;
|
||||
this.targets = targets;
|
||||
this.targetStorageTypes = targetStorageTypes;
|
||||
this.targetStorageIDs = targetStorageIDs;
|
||||
}
|
||||
|
||||
|
@ -109,6 +116,10 @@ public class BlockCommand extends DatanodeCommand {
|
|||
return targets;
|
||||
}
|
||||
|
||||
public StorageType[][] getTargetStorageTypes() {
|
||||
return targetStorageTypes;
|
||||
}
|
||||
|
||||
public String[][] getTargetStorageIDs() {
|
||||
return targetStorageIDs;
|
||||
}
|
||||
|
|
|
@ -113,6 +113,7 @@ message BlockCommandProto {
|
|||
repeated BlockProto blocks = 3;
|
||||
repeated DatanodeInfosProto targets = 4;
|
||||
repeated StorageUuidsProto targetStorageUuids = 5;
|
||||
repeated StorageTypesProto targetStorageTypes = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -107,17 +107,21 @@ message OpWriteBlockProto {
|
|||
*/
|
||||
required ChecksumProto requestedChecksum = 9;
|
||||
optional CachingStrategyProto cachingStrategy = 10;
|
||||
optional StorageTypeProto storageType = 11 [default = DISK];
|
||||
repeated StorageTypeProto targetStorageTypes = 12;
|
||||
}
|
||||
|
||||
message OpTransferBlockProto {
|
||||
required ClientOperationHeaderProto header = 1;
|
||||
repeated DatanodeInfoProto targets = 2;
|
||||
repeated StorageTypeProto targetStorageTypes = 3;
|
||||
}
|
||||
|
||||
message OpReplaceBlockProto {
|
||||
required BaseHeaderProto header = 1;
|
||||
required string delHint = 2;
|
||||
required DatanodeInfoProto source = 3;
|
||||
optional StorageTypeProto storageType = 4 [default = DISK];
|
||||
}
|
||||
|
||||
message OpCopyBlockProto {
|
||||
|
|
|
@ -136,6 +136,13 @@ enum StorageTypeProto {
|
|||
SSD = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of storage types.
|
||||
*/
|
||||
message StorageTypesProto {
|
||||
repeated StorageTypeProto storageTypes = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of storage IDs.
|
||||
*/
|
||||
|
|
|
@ -380,7 +380,7 @@ public class DFSTestUtil {
|
|||
*/
|
||||
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
|
||||
int racks, int replicas, int neededReplicas)
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
throws TimeoutException, InterruptedException {
|
||||
int curRacks = 0;
|
||||
int curReplicas = 0;
|
||||
int curNeededReplicas = 0;
|
||||
|
@ -414,7 +414,7 @@ public class DFSTestUtil {
|
|||
*/
|
||||
public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
|
||||
Path file, ExtendedBlock b, int corruptRepls)
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
throws TimeoutException, InterruptedException {
|
||||
int count = 0;
|
||||
final int ATTEMPTS = 50;
|
||||
int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
|
||||
|
@ -839,7 +839,8 @@ public class DFSTestUtil {
|
|||
|
||||
// send the request
|
||||
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
|
||||
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
|
||||
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
|
||||
new StorageType[]{StorageType.DEFAULT});
|
||||
out.flush();
|
||||
|
||||
return BlockOpResponseProto.parseDelimitedFrom(in);
|
||||
|
|
|
@ -125,17 +125,16 @@ public class TestDataTransferProtocol {
|
|||
throw eof;
|
||||
}
|
||||
|
||||
LOG.info("Received: " +new String(retBuf));
|
||||
LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray()));
|
||||
String received = StringUtils.byteToHexString(retBuf);
|
||||
String expected = StringUtils.byteToHexString(recvBuf.toByteArray());
|
||||
LOG.info("Received: " + received);
|
||||
LOG.info("Expected: " + expected);
|
||||
|
||||
if (eofExpected) {
|
||||
throw new IOException("Did not recieve IOException when an exception " +
|
||||
"is expected while reading from " + datanode);
|
||||
}
|
||||
|
||||
byte[] needed = recvBuf.toByteArray();
|
||||
assertEquals(StringUtils.byteToHexString(needed),
|
||||
StringUtils.byteToHexString(retBuf));
|
||||
assertEquals(expected, received);
|
||||
} finally {
|
||||
IOUtils.closeSocket(sock);
|
||||
}
|
||||
|
@ -184,10 +183,7 @@ public class TestDataTransferProtocol {
|
|||
String description, Boolean eofExcepted) throws IOException {
|
||||
sendBuf.reset();
|
||||
recvBuf.reset();
|
||||
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null, stage,
|
||||
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
||||
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
|
||||
if (eofExcepted) {
|
||||
sendResponse(Status.ERROR, null, null, recvOut);
|
||||
sendRecvData(description, true);
|
||||
|
@ -343,10 +339,7 @@ public class TestDataTransferProtocol {
|
|||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
DFSClient dfsClient = new DFSClient(
|
||||
new InetSocketAddress("localhost", cluster.getNameNodePort()),
|
||||
conf);
|
||||
datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
|
||||
datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0];
|
||||
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
|
||||
|
@ -381,23 +374,14 @@ public class TestDataTransferProtocol {
|
|||
DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
|
||||
Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
|
||||
|
||||
sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE,
|
||||
0, 0L, 0L, 0L,
|
||||
badChecksum, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(poolId, newBlockId, badChecksum);
|
||||
recvBuf.reset();
|
||||
sendResponse(Status.ERROR, null, null, recvOut);
|
||||
sendRecvData("wrong bytesPerChecksum while writing", true);
|
||||
|
||||
sendBuf.reset();
|
||||
recvBuf.reset();
|
||||
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
|
||||
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
|
||||
|
||||
PacketHeader hdr = new PacketHeader(
|
||||
4, // size of packet
|
||||
|
@ -416,11 +400,7 @@ public class TestDataTransferProtocol {
|
|||
// test for writing a valid zero size block
|
||||
sendBuf.reset();
|
||||
recvBuf.reset();
|
||||
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
|
||||
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
|
||||
|
||||
hdr = new PacketHeader(
|
||||
8, // size of packet
|
||||
|
@ -532,4 +512,18 @@ public class TestDataTransferProtocol {
|
|||
assertTrue(hdr.sanityCheck(99));
|
||||
assertFalse(hdr.sanityCheck(100));
|
||||
}
|
||||
|
||||
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
|
||||
writeBlock(new ExtendedBlock(poolId, blockId),
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
|
||||
}
|
||||
|
||||
void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
|
||||
long newGS, DataChecksum checksum) throws IOException {
|
||||
sender.writeBlock(block, StorageType.DEFAULT,
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], new StorageType[1], null, stage,
|
||||
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
||||
checksum, CachingStrategy.newDefaultStrategy());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -550,8 +550,10 @@ public class TestPBHelper {
|
|||
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
|
||||
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
|
||||
String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
|
||||
StorageType[][] storageTypes = {{StorageType.DEFAULT},
|
||||
{StorageType.DEFAULT, StorageType.DEFAULT}};
|
||||
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
|
||||
blocks, dnInfos, storageIDs);
|
||||
blocks, dnInfos, storageTypes, storageIDs);
|
||||
BlockCommandProto bcProto = PBHelper.convert(bc);
|
||||
BlockCommand bc2 = PBHelper.convert(bcProto);
|
||||
assertEquals(bc.getAction(), bc2.getAction());
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -324,7 +325,7 @@ public abstract class BlockReportTestBase {
|
|||
public void blockReport_03() throws IOException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
||||
writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
||||
|
||||
// all blocks belong to the same file, hence same BP
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||
|
@ -363,7 +364,7 @@ public abstract class BlockReportTestBase {
|
|||
// Create a bogus new block which will not be present on the namenode.
|
||||
ExtendedBlock b = new ExtendedBlock(
|
||||
poolId, rand.nextLong(), 1024L, rand.nextLong());
|
||||
dn.getFSDataset().createRbw(b);
|
||||
dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
|
||||
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||
|
|
|
@ -744,14 +744,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
|
||||
throws IOException {
|
||||
return createTemporary(b);
|
||||
public synchronized ReplicaInPipelineInterface createRbw(
|
||||
StorageType storageType, ExtendedBlock b) throws IOException {
|
||||
return createTemporary(storageType, b);
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
|
||||
throws IOException {
|
||||
public synchronized ReplicaInPipelineInterface createTemporary(
|
||||
StorageType storageType, ExtendedBlock b) throws IOException {
|
||||
if (isValidBlock(b)) {
|
||||
throw new ReplicaAlreadyExistsException("Block " + b +
|
||||
" is valid, and cannot be written to.");
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -531,7 +532,7 @@ public class TestBlockRecovery {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
dn.data.createRbw(block);
|
||||
dn.data.createRbw(StorageType.DEFAULT, block);
|
||||
try {
|
||||
dn.syncBlock(rBlock, initBlockRecords(dn));
|
||||
fail("Sync should fail");
|
||||
|
@ -554,7 +555,8 @@ public class TestBlockRecovery {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
|
||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
|
||||
StorageType.DEFAULT, block);
|
||||
ReplicaOutputStreams streams = null;
|
||||
try {
|
||||
streams = replicaInfo.createStreams(true,
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
|
@ -264,7 +265,8 @@ public class TestBlockReplacement {
|
|||
sock.setKeepAlive(true);
|
||||
// sendRequest
|
||||
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
|
||||
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
|
||||
new Sender(out).replaceBlock(block, StorageType.DEFAULT,
|
||||
BlockTokenSecretManager.DUMMY_TOKEN,
|
||||
source.getDatanodeUuid(), sourceProxy);
|
||||
out.flush();
|
||||
// receiveResponse
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -147,9 +148,9 @@ public class TestDiskError {
|
|||
|
||||
DataChecksum checksum = DataChecksum.newDataChecksum(
|
||||
DataChecksum.Type.CRC32, 512);
|
||||
new Sender(out).writeBlock(block.getBlock(),
|
||||
new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT,
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "",
|
||||
new DatanodeInfo[0], null,
|
||||
new DatanodeInfo[0], new StorageType[0], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
|
||||
checksum, CachingStrategy.newDefaultStrategy());
|
||||
out.flush();
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.io.OutputStream;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -65,7 +66,8 @@ public class TestSimulatedFSDataset {
|
|||
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
|
||||
// we pass expected len as zero, - fsdataset should use the sizeof actual
|
||||
// data written
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
|
||||
StorageType.DEFAULT, b);
|
||||
ReplicaOutputStreams out = bInfo.createStreams(true,
|
||||
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
|
||||
try {
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
|
@ -147,7 +148,7 @@ public class TestWriteToReplica {
|
|||
};
|
||||
|
||||
ReplicaMap replicasMap = dataSet.volumeMap;
|
||||
FsVolumeImpl vol = dataSet.volumes.getNextVolume(0);
|
||||
FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0);
|
||||
ReplicaInfo replicaInfo = new FinalizedReplica(
|
||||
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
|
||||
replicasMap.add(bpid, replicaInfo);
|
||||
|
@ -357,7 +358,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[FINALIZED]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
|
||||
Assert.fail("Should not have created a replica that's already " +
|
||||
"finalized " + blocks[FINALIZED]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -375,7 +376,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[TEMPORARY]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
|
||||
Assert.fail("Should not have created a replica that had created as " +
|
||||
"temporary " + blocks[TEMPORARY]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -385,7 +386,7 @@ public class TestWriteToReplica {
|
|||
0L, blocks[RBW].getNumBytes()); // expect to be successful
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[RBW]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
|
||||
Assert.fail("Should not have created a replica that had created as RBW " +
|
||||
blocks[RBW]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -401,7 +402,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[RWR]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
|
||||
Assert.fail("Should not have created a replica that was waiting to be " +
|
||||
"recovered " + blocks[RWR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -417,7 +418,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[RUR]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
|
||||
Assert.fail("Should not have created a replica that was under recovery " +
|
||||
blocks[RUR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -434,45 +435,45 @@ public class TestWriteToReplica {
|
|||
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
|
||||
}
|
||||
|
||||
dataSet.createRbw(blocks[NON_EXISTENT]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
|
||||
}
|
||||
|
||||
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
try {
|
||||
dataSet.createTemporary(blocks[FINALIZED]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
|
||||
Assert.fail("Should not have created a temporary replica that was " +
|
||||
"finalized " + blocks[FINALIZED]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[TEMPORARY]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
|
||||
Assert.fail("Should not have created a replica that had created as" +
|
||||
"temporary " + blocks[TEMPORARY]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[RBW]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
|
||||
Assert.fail("Should not have created a replica that had created as RBW " +
|
||||
blocks[RBW]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[RWR]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
|
||||
Assert.fail("Should not have created a replica that was waiting to be " +
|
||||
"recovered " + blocks[RWR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[RUR]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
|
||||
Assert.fail("Should not have created a replica that was under recovery " +
|
||||
blocks[RUR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
dataSet.createTemporary(blocks[NON_EXISTENT]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue