Merge from trunk to branch
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1612928 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
512b756973
commit
552b4fb9f9
|
@ -307,6 +307,13 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
|
||||
will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
|
||||
|
||||
HDFS-6702. Change DFSClient to pass the StorageType from the namenode to
|
||||
datanodes and change datanode to write block replicas using the specified
|
||||
storage type. (szetszwo)
|
||||
|
||||
HDFS-6701. Make seed optional in NetworkTopology#sortByDistance.
|
||||
(Ashwin Shankar via wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
@ -344,6 +351,12 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6667. In HDFS HA mode, Distcp/SLive with webhdfs on secure cluster fails
|
||||
with Client cannot authenticate via:[TOKEN, KERBEROS] error. (jing9)
|
||||
|
||||
HDFS-6704. Fix the command to launch JournalNode in HDFS-HA document.
|
||||
(Akira AJISAKA via jing9)
|
||||
|
||||
HDFS-6731. Run "hdfs zkfc-formatZK" on a server in a non-namenode will cause
|
||||
a null pointer exception. (Masatake Iwasaki via brandonli)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -589,6 +602,11 @@ Release 2.5.0 - UNRELEASED
|
|||
HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second
|
||||
instead of millisecond. (Juan Yu via wang)
|
||||
|
||||
HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes
|
||||
correctly. (szetszwo)
|
||||
|
||||
HDFS-6712. Document HDFS Multihoming Settings. (Arpit Agarwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||
|
@ -871,6 +889,9 @@ Release 2.5.0 - UNRELEASED
|
|||
HDFS-6378. NFS registration should timeout instead of hanging when
|
||||
portmap/rpcbind is not available (Abhiraj Butala via brandonli)
|
||||
|
||||
HDFS-6703. NFS: Files can be deleted from a read-only mount
|
||||
(Srikanth Upputuri via brandonli)
|
||||
|
||||
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
|
||||
|
|
|
@ -214,6 +214,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
|
||||
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
|
||||
|
||||
public static final String DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK = "dfs.namenode.randomize-block-locations-per-block";
|
||||
public static final boolean DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT = false;
|
||||
|
||||
public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
|
||||
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
|
||||
|
||||
|
|
|
@ -316,6 +316,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private DataInputStream blockReplyStream;
|
||||
private ResponseProcessor response = null;
|
||||
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
|
||||
private volatile StorageType[] storageTypes = null;
|
||||
private volatile String[] storageIDs = null;
|
||||
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
|
||||
CacheBuilder.newBuilder()
|
||||
|
@ -420,10 +421,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
private void setPipeline(LocatedBlock lb) {
|
||||
setPipeline(lb.getLocations(), lb.getStorageIDs());
|
||||
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
|
||||
}
|
||||
private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
|
||||
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
|
||||
String[] storageIDs) {
|
||||
this.nodes = nodes;
|
||||
this.storageTypes = storageTypes;
|
||||
this.storageIDs = storageIDs;
|
||||
}
|
||||
|
||||
|
@ -449,7 +452,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this.setName("DataStreamer for file " + src);
|
||||
closeResponder();
|
||||
closeStream();
|
||||
setPipeline(null, null);
|
||||
setPipeline(null, null, null);
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
}
|
||||
|
||||
|
@ -1034,10 +1037,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
//transfer replica
|
||||
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
|
||||
final DatanodeInfo[] targets = {nodes[d]};
|
||||
transfer(src, targets, lb.getBlockToken());
|
||||
final StorageType[] targetStorageTypes = {storageTypes[d]};
|
||||
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
|
||||
}
|
||||
|
||||
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||
//transfer replica to the new datanode
|
||||
Socket sock = null;
|
||||
|
@ -1059,7 +1064,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
//send the TRANSFER_BLOCK request
|
||||
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
|
||||
targets);
|
||||
targets, targetStorageTypes);
|
||||
out.flush();
|
||||
|
||||
//ack
|
||||
|
@ -1138,16 +1143,15 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
failed.add(nodes[errorIndex]);
|
||||
|
||||
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
||||
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
|
||||
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
|
||||
newnodes.length-errorIndex);
|
||||
arraycopy(nodes, newnodes, errorIndex);
|
||||
|
||||
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
|
||||
arraycopy(storageTypes, newStorageTypes, errorIndex);
|
||||
|
||||
final String[] newStorageIDs = new String[newnodes.length];
|
||||
System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
|
||||
System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
|
||||
newStorageIDs.length-errorIndex);
|
||||
arraycopy(storageIDs, newStorageIDs, errorIndex);
|
||||
|
||||
setPipeline(newnodes, newStorageIDs);
|
||||
setPipeline(newnodes, newStorageTypes, newStorageIDs);
|
||||
|
||||
// Just took care of a node error while waiting for a node restart
|
||||
if (restartingNodeIndex >= 0) {
|
||||
|
@ -1184,7 +1188,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
// set up the pipeline again with the remaining nodes
|
||||
if (failPacket) { // for testing
|
||||
success = createBlockOutputStream(nodes, newGS, isRecovery);
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
failPacket = false;
|
||||
try {
|
||||
// Give DNs time to send in bad reports. In real situations,
|
||||
|
@ -1193,7 +1197,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
Thread.sleep(2000);
|
||||
} catch (InterruptedException ie) {}
|
||||
} else {
|
||||
success = createBlockOutputStream(nodes, newGS, isRecovery);
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
}
|
||||
|
||||
if (restartingNodeIndex >= 0) {
|
||||
|
@ -1245,6 +1249,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private LocatedBlock nextBlockOutputStream() throws IOException {
|
||||
LocatedBlock lb = null;
|
||||
DatanodeInfo[] nodes = null;
|
||||
StorageType[] storageTypes = null;
|
||||
int count = dfsClient.getConf().nBlockWriteRetry;
|
||||
boolean success = false;
|
||||
ExtendedBlock oldBlock = block;
|
||||
|
@ -1267,11 +1272,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
bytesSent = 0;
|
||||
accessToken = lb.getBlockToken();
|
||||
nodes = lb.getLocations();
|
||||
storageTypes = lb.getStorageTypes();
|
||||
|
||||
//
|
||||
// Connect to first DataNode in the list.
|
||||
//
|
||||
success = createBlockOutputStream(nodes, 0L, false);
|
||||
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
|
||||
|
||||
if (!success) {
|
||||
DFSClient.LOG.info("Abandoning " + block);
|
||||
|
@ -1292,8 +1298,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// connects to the first datanode in the pipeline
|
||||
// Returns true if success, otherwise return failure.
|
||||
//
|
||||
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
||||
boolean recoveryFlag) {
|
||||
private boolean createBlockOutputStream(DatanodeInfo[] nodes,
|
||||
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
|
||||
if (nodes.length == 0) {
|
||||
DFSClient.LOG.info("nodes are empty for write pipeline of block "
|
||||
+ block);
|
||||
|
@ -1335,9 +1341,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// Xmit header info to datanode
|
||||
//
|
||||
|
||||
BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
|
||||
// send the request
|
||||
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
|
||||
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
|
||||
new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
|
||||
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
|
||||
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
|
||||
cachingStrategy.get());
|
||||
|
||||
|
@ -2203,4 +2210,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
public long getFileId() {
|
||||
return fileId;
|
||||
}
|
||||
|
||||
private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
|
||||
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
|
||||
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
|
@ -71,11 +72,20 @@ public interface DataTransferProtocol {
|
|||
|
||||
/**
|
||||
* Write a block to a datanode pipeline.
|
||||
* The receiver datanode of this call is the next datanode in the pipeline.
|
||||
* The other downstream datanodes are specified by the targets parameter.
|
||||
* Note that the receiver {@link DatanodeInfo} is not required in the
|
||||
* parameter list since the receiver datanode knows its info. However, the
|
||||
* {@link StorageType} for storing the replica in the receiver datanode is a
|
||||
* parameter since the receiver datanode may support multiple storage types.
|
||||
*
|
||||
* @param blk the block being written.
|
||||
* @param storageType for storing the replica in the receiver datanode.
|
||||
* @param blockToken security token for accessing the block.
|
||||
* @param clientName client's name.
|
||||
* @param targets target datanodes in the pipeline.
|
||||
* @param targets other downstream datanodes in the pipeline.
|
||||
* @param targetStorageTypes target {@link StorageType}s corresponding
|
||||
* to the target datanodes.
|
||||
* @param source source datanode.
|
||||
* @param stage pipeline stage.
|
||||
* @param pipelineSize the size of the pipeline.
|
||||
|
@ -84,9 +94,11 @@ public interface DataTransferProtocol {
|
|||
* @param latestGenerationStamp the latest generation stamp of the block.
|
||||
*/
|
||||
public void writeBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final DatanodeInfo source,
|
||||
final BlockConstructionStage stage,
|
||||
final int pipelineSize,
|
||||
|
@ -110,7 +122,8 @@ public interface DataTransferProtocol {
|
|||
public void transferBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets) throws IOException;
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes) throws IOException;
|
||||
|
||||
/**
|
||||
* Request short circuit access file descriptors from a DataNode.
|
||||
|
@ -148,11 +161,13 @@ public interface DataTransferProtocol {
|
|||
* It is used for balancing purpose.
|
||||
*
|
||||
* @param blk the block being replaced.
|
||||
* @param storageType the {@link StorageType} for storing the block.
|
||||
* @param blockToken security token for accessing the block.
|
||||
* @param delHint the hint for deleting the block in the original datanode.
|
||||
* @param source the source datanode for receiving the block.
|
||||
*/
|
||||
public void replaceBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String delHint,
|
||||
final DatanodeInfo source) throws IOException;
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
||||
|
@ -121,10 +122,13 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
/** Receive OP_WRITE_BLOCK */
|
||||
private void opWriteBlock(DataInputStream in) throws IOException {
|
||||
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
|
||||
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
|
||||
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
PBHelper.convert(proto.getTargetsList()),
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
|
||||
PBHelper.convert(proto.getSource()),
|
||||
fromProto(proto.getStage()),
|
||||
proto.getPipelineSize(),
|
||||
|
@ -140,10 +144,12 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
private void opTransferBlock(DataInputStream in) throws IOException {
|
||||
final OpTransferBlockProto proto =
|
||||
OpTransferBlockProto.parseFrom(vintPrefixed(in));
|
||||
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
|
||||
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
PBHelper.convert(proto.getTargetsList()));
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
|
||||
}
|
||||
|
||||
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
|
||||
|
@ -176,6 +182,7 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
private void opReplaceBlock(DataInputStream in) throws IOException {
|
||||
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
|
||||
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
proto.getDelHint(),
|
||||
PBHelper.convert(proto.getSource()));
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
|
@ -111,9 +112,11 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
@Override
|
||||
public void writeBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final DatanodeInfo source,
|
||||
final BlockConstructionStage stage,
|
||||
final int pipelineSize,
|
||||
|
@ -130,7 +133,9 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
|
||||
.setHeader(header)
|
||||
.setStorageType(PBHelper.convertStorageType(storageType))
|
||||
.addAllTargets(PBHelper.convert(targets, 1))
|
||||
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
|
||||
.setStage(toProto(stage))
|
||||
.setPipelineSize(pipelineSize)
|
||||
.setMinBytesRcvd(minBytesRcvd)
|
||||
|
@ -150,12 +155,14 @@ public class Sender implements DataTransferProtocol {
|
|||
public void transferBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets) throws IOException {
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes) throws IOException {
|
||||
|
||||
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildClientHeader(
|
||||
blk, clientName, blockToken))
|
||||
.addAllTargets(PBHelper.convert(targets))
|
||||
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
|
||||
.build();
|
||||
|
||||
send(out, Op.TRANSFER_BLOCK, proto);
|
||||
|
@ -196,11 +203,13 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
@Override
|
||||
public void replaceBlock(final ExtendedBlock blk,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String delHint,
|
||||
final DatanodeInfo source) throws IOException {
|
||||
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
|
||||
.setStorageType(PBHelper.convertStorageType(storageType))
|
||||
.setDelHint(delHint)
|
||||
.setSource(PBHelper.convertDatanodeInfo(source))
|
||||
.build();
|
||||
|
|
|
@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryLi
|
|||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
|
||||
|
@ -679,14 +680,8 @@ public class PBHelper {
|
|||
targets[i] = PBHelper.convert(locs.get(i));
|
||||
}
|
||||
|
||||
final int storageTypesCount = proto.getStorageTypesCount();
|
||||
final StorageType[] storageTypes;
|
||||
if (storageTypesCount == 0) {
|
||||
storageTypes = null;
|
||||
} else {
|
||||
Preconditions.checkState(storageTypesCount == locs.size());
|
||||
storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
|
||||
}
|
||||
final StorageType[] storageTypes = convertStorageTypes(
|
||||
proto.getStorageTypesList(), locs.size());
|
||||
|
||||
final int storageIDsCount = proto.getStorageIDsCount();
|
||||
final String[] storageIDs;
|
||||
|
@ -974,6 +969,20 @@ public class PBHelper {
|
|||
targets[i] = PBHelper.convert(targetList.get(i));
|
||||
}
|
||||
|
||||
StorageType[][] targetStorageTypes = new StorageType[targetList.size()][];
|
||||
List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList();
|
||||
if (targetStorageTypesList.isEmpty()) { // missing storage types
|
||||
for(int i = 0; i < targetStorageTypes.length; i++) {
|
||||
targetStorageTypes[i] = new StorageType[targets[i].length];
|
||||
Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT);
|
||||
}
|
||||
} else {
|
||||
for(int i = 0; i < targetStorageTypes.length; i++) {
|
||||
List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
|
||||
targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
|
||||
String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
|
||||
for(int i = 0; i < targetStorageIDs.length; i++) {
|
||||
|
@ -996,7 +1005,7 @@ public class PBHelper {
|
|||
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
|
||||
}
|
||||
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
|
||||
targetStorageIDs);
|
||||
targetStorageTypes, targetStorageIDs);
|
||||
}
|
||||
|
||||
public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
|
||||
|
@ -1620,8 +1629,25 @@ public class PBHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static StorageTypeProto convertStorageType(
|
||||
StorageType type) {
|
||||
public static List<StorageTypeProto> convertStorageTypes(
|
||||
StorageType[] types) {
|
||||
return convertStorageTypes(types, 0);
|
||||
}
|
||||
|
||||
public static List<StorageTypeProto> convertStorageTypes(
|
||||
StorageType[] types, int startIdx) {
|
||||
if (types == null) {
|
||||
return null;
|
||||
}
|
||||
final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
|
||||
types.length);
|
||||
for (int i = startIdx; i < types.length; ++i) {
|
||||
protos.add(convertStorageType(types[i]));
|
||||
}
|
||||
return protos;
|
||||
}
|
||||
|
||||
public static StorageTypeProto convertStorageType(StorageType type) {
|
||||
switch(type) {
|
||||
case DISK:
|
||||
return StorageTypeProto.DISK;
|
||||
|
@ -1636,7 +1662,7 @@ public class PBHelper {
|
|||
public static DatanodeStorage convert(DatanodeStorageProto s) {
|
||||
return new DatanodeStorage(s.getStorageUuid(),
|
||||
PBHelper.convertState(s.getState()),
|
||||
PBHelper.convertType(s.getStorageType()));
|
||||
PBHelper.convertStorageType(s.getStorageType()));
|
||||
}
|
||||
|
||||
private static State convertState(StorageState state) {
|
||||
|
@ -1649,7 +1675,7 @@ public class PBHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static StorageType convertType(StorageTypeProto type) {
|
||||
public static StorageType convertStorageType(StorageTypeProto type) {
|
||||
switch(type) {
|
||||
case DISK:
|
||||
return StorageType.DISK;
|
||||
|
@ -1661,11 +1687,16 @@ public class PBHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static StorageType[] convertStorageTypeProtos(
|
||||
List<StorageTypeProto> storageTypesList) {
|
||||
final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
|
||||
public static StorageType[] convertStorageTypes(
|
||||
List<StorageTypeProto> storageTypesList, int expectedSize) {
|
||||
final StorageType[] storageTypes = new StorageType[expectedSize];
|
||||
if (storageTypesList.size() != expectedSize) { // missing storage types
|
||||
Preconditions.checkState(storageTypesList.isEmpty());
|
||||
Arrays.fill(storageTypes, StorageType.DEFAULT);
|
||||
} else {
|
||||
for (int i = 0; i < storageTypes.length; ++i) {
|
||||
storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
|
||||
storageTypes[i] = convertStorageType(storageTypesList.get(i));
|
||||
}
|
||||
}
|
||||
return storageTypes;
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configured;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -368,7 +369,7 @@ public class Balancer {
|
|||
in = new DataInputStream(new BufferedInputStream(unbufIn,
|
||||
HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
sendRequest(out, eb, accessToken);
|
||||
sendRequest(out, eb, StorageType.DEFAULT, accessToken);
|
||||
receiveResponse(in);
|
||||
bytesMoved.addAndGet(block.getNumBytes());
|
||||
LOG.info("Successfully moved " + this);
|
||||
|
@ -400,8 +401,9 @@ public class Balancer {
|
|||
|
||||
/* Send a block replace request to the output stream*/
|
||||
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
|
||||
StorageType storageType,
|
||||
Token<BlockTokenIdentifier> accessToken) throws IOException {
|
||||
new Sender(out).replaceBlock(eb, accessToken,
|
||||
new Sender(out).replaceBlock(eb, storageType, accessToken,
|
||||
source.getStorageID(), proxySource.getDatanode());
|
||||
}
|
||||
|
||||
|
|
|
@ -145,14 +145,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
|
||||
boolean avoidStaleNodes = stats != null
|
||||
&& stats.isAvoidingStaleDataNodesForWrite();
|
||||
for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
|
||||
for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
|
||||
DatanodeDescriptor favoredNode = favoredNodes.get(i);
|
||||
// Choose a single node which is local to favoredNode.
|
||||
// 'results' is updated within chooseLocalNode
|
||||
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
|
||||
favoriteAndExcludedNodes, blocksize,
|
||||
getMaxNodesPerRack(results.size(), numOfReplicas)[1],
|
||||
results, avoidStaleNodes, storageType);
|
||||
results, avoidStaleNodes, storageType, false);
|
||||
if (target == null) {
|
||||
LOG.warn("Could not find a target for file " + src
|
||||
+ " with favored node " + favoredNode);
|
||||
|
@ -271,7 +271,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
try {
|
||||
if (numOfResults == 0) {
|
||||
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes, storageType)
|
||||
maxNodesPerRack, results, avoidStaleNodes, storageType, true)
|
||||
.getDatanodeDescriptor();
|
||||
if (--numOfReplicas == 0) {
|
||||
return writer;
|
||||
|
@ -345,12 +345,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
int maxNodesPerRack,
|
||||
List<DatanodeStorageInfo> results,
|
||||
boolean avoidStaleNodes,
|
||||
StorageType storageType)
|
||||
StorageType storageType,
|
||||
boolean fallbackToLocalRack)
|
||||
throws NotEnoughReplicasException {
|
||||
// if no local machine, randomly choose one node
|
||||
if (localMachine == null)
|
||||
if (localMachine == null) {
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes, storageType);
|
||||
}
|
||||
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
|
||||
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
|
||||
// otherwise try local machine first
|
||||
|
@ -364,6 +366,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!fallbackToLocalRack) {
|
||||
return null;
|
||||
}
|
||||
// try a node on local rack
|
||||
return chooseLocalRack(localMachine, excludedNodes, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes, storageType);
|
||||
|
|
|
@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|||
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
||||
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
||||
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
||||
StorageType storageType) throws NotEnoughReplicasException {
|
||||
StorageType storageType, boolean fallbackToLocalRack
|
||||
) throws NotEnoughReplicasException {
|
||||
// if no local machine, randomly choose one node
|
||||
if (localMachine == null)
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
|
@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|||
if (chosenStorage != null) {
|
||||
return chosenStorage;
|
||||
}
|
||||
|
||||
if (!fallbackToLocalRack) {
|
||||
return null;
|
||||
}
|
||||
// try a node on local rack
|
||||
return chooseLocalRack(localMachine, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
|
||||
|
|
|
@ -345,7 +345,8 @@ public class DatanodeManager {
|
|||
|
||||
/** Sort the located blocks by the distance to the target host. */
|
||||
public void sortLocatedBlocks(final String targethost,
|
||||
final List<LocatedBlock> locatedblocks) {
|
||||
final List<LocatedBlock> locatedblocks,
|
||||
boolean randomizeBlockLocationsPerBlock) {
|
||||
//sort the blocks
|
||||
// As it is possible for the separation of node manager and datanode,
|
||||
// here we should get node but not datanode only .
|
||||
|
@ -372,8 +373,8 @@ public class DatanodeManager {
|
|||
--lastActiveIndex;
|
||||
}
|
||||
int activeLen = lastActiveIndex + 1;
|
||||
networktopology.sortByDistance(client, b.getLocations(), activeLen,
|
||||
b.getBlock().getBlockId());
|
||||
networktopology.sortByDistance(client, b.getLocations(), activeLen, b
|
||||
.getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -575,7 +575,8 @@ class BPOfferService {
|
|||
switch(cmd.getAction()) {
|
||||
case DatanodeProtocol.DNA_TRANSFER:
|
||||
// Send a copy of a block to another datanode
|
||||
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
|
||||
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
|
||||
bcmd.getTargets(), bcmd.getTargetStorageTypes());
|
||||
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
|
||||
break;
|
||||
case DatanodeProtocol.DNA_INVALIDATE:
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.util.zip.Checksum;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.FSOutputSummer;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -122,7 +123,8 @@ class BlockReceiver implements Closeable {
|
|||
private boolean syncOnClose;
|
||||
private long restartBudget;
|
||||
|
||||
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
|
||||
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
||||
final DataInputStream in,
|
||||
final String inAddr, final String myAddr,
|
||||
final BlockConstructionStage stage,
|
||||
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
|
||||
|
@ -162,11 +164,11 @@ class BlockReceiver implements Closeable {
|
|||
// Open local disk out
|
||||
//
|
||||
if (isDatanode) { //replication or move
|
||||
replicaInfo = datanode.data.createTemporary(block);
|
||||
replicaInfo = datanode.data.createTemporary(storageType, block);
|
||||
} else {
|
||||
switch (stage) {
|
||||
case PIPELINE_SETUP_CREATE:
|
||||
replicaInfo = datanode.data.createRbw(block);
|
||||
replicaInfo = datanode.data.createRbw(storageType, block);
|
||||
datanode.notifyNamenodeReceivingBlock(
|
||||
block, replicaInfo.getStorageUuid());
|
||||
break;
|
||||
|
@ -198,7 +200,7 @@ class BlockReceiver implements Closeable {
|
|||
case TRANSFER_RBW:
|
||||
case TRANSFER_FINALIZED:
|
||||
// this is a transfer destination
|
||||
replicaInfo = datanode.data.createTemporary(block);
|
||||
replicaInfo = datanode.data.createTemporary(storageType, block);
|
||||
break;
|
||||
default: throw new IOException("Unsupported stage " + stage +
|
||||
" while receiving block " + block + " from " + inAddr);
|
||||
|
|
|
@ -19,11 +19,66 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.net.DomainPeerServer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
|
||||
|
@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Client
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
|
||||
import org.apache.hadoop.hdfs.protocolPB.*;
|
||||
import org.apache.hadoop.hdfs.security.token.block.*;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
|
@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
|
@ -88,22 +171,21 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.*;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ServicePlugin;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import java.io.*;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.*;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
/**********************************************************
|
||||
* DataNode is a class (and program) that stores a set of
|
||||
|
@ -1475,8 +1557,8 @@ public class DataNode extends Configured
|
|||
return xmitsInProgress.get();
|
||||
}
|
||||
|
||||
private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
|
||||
throws IOException {
|
||||
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
|
||||
StorageType[] xferTargetStorageTypes) throws IOException {
|
||||
BPOfferService bpos = getBPOSForBlock(block);
|
||||
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
||||
|
||||
|
@ -1512,16 +1594,17 @@ public class DataNode extends Configured
|
|||
LOG.info(bpReg + " Starting thread to transfer " +
|
||||
block + " to " + xfersBuilder);
|
||||
|
||||
new Daemon(new DataTransfer(xferTargets, block,
|
||||
new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
|
||||
}
|
||||
}
|
||||
|
||||
void transferBlocks(String poolId, Block blocks[],
|
||||
DatanodeInfo xferTargets[][]) {
|
||||
DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
try {
|
||||
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
|
||||
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
|
||||
xferTargetStorageTypes[i]);
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed to transfer block " + blocks[i], ie);
|
||||
}
|
||||
|
@ -1624,6 +1707,7 @@ public class DataNode extends Configured
|
|||
*/
|
||||
private class DataTransfer implements Runnable {
|
||||
final DatanodeInfo[] targets;
|
||||
final StorageType[] targetStorageTypes;
|
||||
final ExtendedBlock b;
|
||||
final BlockConstructionStage stage;
|
||||
final private DatanodeRegistration bpReg;
|
||||
|
@ -1634,7 +1718,8 @@ public class DataNode extends Configured
|
|||
* Connect to the first item in the target list. Pass along the
|
||||
* entire target list, the block, and the data.
|
||||
*/
|
||||
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
|
||||
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
|
||||
ExtendedBlock b, BlockConstructionStage stage,
|
||||
final String clientname) {
|
||||
if (DataTransferProtocol.LOG.isDebugEnabled()) {
|
||||
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
|
||||
|
@ -1644,6 +1729,7 @@ public class DataNode extends Configured
|
|||
+ ", targests=" + Arrays.asList(targets));
|
||||
}
|
||||
this.targets = targets;
|
||||
this.targetStorageTypes = targetStorageTypes;
|
||||
this.b = b;
|
||||
this.stage = stage;
|
||||
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
|
||||
|
@ -1702,7 +1788,8 @@ public class DataNode extends Configured
|
|||
false, false, true, DataNode.this, null, cachingStrategy);
|
||||
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
|
||||
|
||||
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
|
||||
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
|
||||
clientname, targets, targetStorageTypes, srcNode,
|
||||
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
|
||||
|
||||
// send data & checksum
|
||||
|
@ -2403,7 +2490,8 @@ public class DataNode extends Configured
|
|||
* @param client client name
|
||||
*/
|
||||
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
|
||||
final DatanodeInfo[] targets, final String client) throws IOException {
|
||||
final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
|
||||
final String client) throws IOException {
|
||||
final long storedGS;
|
||||
final long visible;
|
||||
final BlockConstructionStage stage;
|
||||
|
@ -2436,7 +2524,7 @@ public class DataNode extends Configured
|
|||
b.setNumBytes(visible);
|
||||
|
||||
if (targets.length > 0) {
|
||||
new DataTransfer(targets, b, stage, client).run();
|
||||
new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.util.Arrays;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -524,9 +525,11 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
|
||||
@Override
|
||||
public void writeBlock(final ExtendedBlock block,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientname,
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final DatanodeInfo srcDataNode,
|
||||
final BlockConstructionStage stage,
|
||||
final int pipelineSize,
|
||||
|
@ -590,12 +593,13 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
if (isDatanode ||
|
||||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
||||
// open a block receiver
|
||||
blockReceiver = new BlockReceiver(block, in,
|
||||
blockReceiver = new BlockReceiver(block, storageType, in,
|
||||
peer.getRemoteAddressString(),
|
||||
peer.getLocalAddressString(),
|
||||
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
|
||||
clientname, srcDataNode, datanode, requestedChecksum,
|
||||
cachingStrategy);
|
||||
|
||||
storageUuid = blockReceiver.getStorageUuid();
|
||||
} else {
|
||||
storageUuid = datanode.data.recoverClose(
|
||||
|
@ -636,10 +640,10 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
mirrorIn = new DataInputStream(unbufMirrorIn);
|
||||
|
||||
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
|
||||
clientname, targets, srcDataNode, stage, pipelineSize,
|
||||
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
|
||||
cachingStrategy);
|
||||
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
|
||||
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
|
||||
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
|
||||
latestGenerationStamp, requestedChecksum, cachingStrategy);
|
||||
|
||||
mirrorOut.flush();
|
||||
|
||||
|
@ -754,7 +758,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
public void transferBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final DatanodeInfo[] targets) throws IOException {
|
||||
final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes) throws IOException {
|
||||
checkAccess(socketOut, true, blk, blockToken,
|
||||
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
|
||||
previousOpClientName = clientName;
|
||||
|
@ -763,7 +768,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
final DataOutputStream out = new DataOutputStream(
|
||||
getOutputStream());
|
||||
try {
|
||||
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
|
||||
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
||||
targetStorageTypes, clientName);
|
||||
writeResponse(Status.SUCCESS, null, out);
|
||||
} finally {
|
||||
IOUtils.closeStream(out);
|
||||
|
@ -941,6 +947,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
|
||||
@Override
|
||||
public void replaceBlock(final ExtendedBlock block,
|
||||
final StorageType storageType,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String delHint,
|
||||
final DatanodeInfo proxySource) throws IOException {
|
||||
|
@ -1026,8 +1033,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
|
||||
checksumInfo.getChecksum());
|
||||
// open a block receiver and check if the block does not exist
|
||||
blockReceiver = new BlockReceiver(
|
||||
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
||||
blockReceiver = new BlockReceiver(block, storageType,
|
||||
proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
||||
proxySock.getLocalSocketAddress().toString(),
|
||||
null, 0, 0, 0, "", null, datanode, remoteChecksum,
|
||||
CachingStrategy.newDropBehind());
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
|
@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* @return the meta info of the replica which is being written to
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
|
||||
) throws IOException;
|
||||
public ReplicaInPipelineInterface createTemporary(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a RBW replica and returns the meta info of the replica
|
||||
|
@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* @return the meta info of the replica which is being written to
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public ReplicaInPipelineInterface createRbw(ExtendedBlock b
|
||||
) throws IOException;
|
||||
public ReplicaInPipelineInterface createRbw(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException;
|
||||
|
||||
/**
|
||||
* Recovers a RBW replica and returns the meta info of the replica
|
||||
|
|
|
@ -17,6 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.*;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
|
@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**************************************************
|
||||
* FSDataset manages a set of data blocks. Each block
|
||||
* has a unique name and an extent on disk.
|
||||
|
@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
|
||||
throws IOException {
|
||||
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
|
@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
" and thus cannot be created.");
|
||||
}
|
||||
// create a new block
|
||||
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
|
||||
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
|
||||
// create a rbw file to hold block in the designated volume
|
||||
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
|
||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
|
@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
|
||||
throws IOException {
|
||||
public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
throw new ReplicaAlreadyExistsException("Block " + b +
|
||||
|
@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
" and thus cannot be created.");
|
||||
}
|
||||
|
||||
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
|
||||
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
|
||||
// create a temporary file to hold block in the designated volume
|
||||
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
|
||||
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
||||
|
|
|
@ -18,13 +18,17 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
class FsVolumeList {
|
||||
/**
|
||||
|
@ -52,11 +56,18 @@ class FsVolumeList {
|
|||
* by a single thread and next volume is chosen with no concurrent
|
||||
* update to {@link #volumes}.
|
||||
* @param blockSize free space needed on the volume
|
||||
* @param storageType the desired {@link StorageType}
|
||||
* @return next volume to store the block in.
|
||||
*/
|
||||
// TODO should choose volume with storage type
|
||||
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
|
||||
return blockChooser.chooseVolume(volumes, blockSize);
|
||||
synchronized FsVolumeImpl getNextVolume(StorageType storageType,
|
||||
long blockSize) throws IOException {
|
||||
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
|
||||
for(FsVolumeImpl v : volumes) {
|
||||
if (v.getStorageType() == storageType) {
|
||||
list.add(v);
|
||||
}
|
||||
}
|
||||
return blockChooser.chooseVolume(list, blockSize);
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
|
|
|
@ -86,6 +86,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
|
@ -549,6 +552,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
private final FSImage fsImage;
|
||||
|
||||
private boolean randomizeBlockLocationsPerBlock;
|
||||
|
||||
/**
|
||||
* Notify that loading of this FSDirectory is complete, and
|
||||
* it is imageLoaded for use
|
||||
|
@ -862,6 +867,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
|
||||
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
|
||||
|
||||
this.randomizeBlockLocationsPerBlock = conf.getBoolean(
|
||||
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
|
||||
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
|
||||
|
||||
this.dtSecretManager = createDelegationTokenSecretManager(conf);
|
||||
this.dir = new FSDirectory(this, conf);
|
||||
this.snapshotManager = new SnapshotManager(dir);
|
||||
|
@ -1761,8 +1770,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
|
||||
true);
|
||||
if (blocks != null) {
|
||||
blockManager.getDatanodeManager().sortLocatedBlocks(
|
||||
clientMachine, blocks.getLocatedBlocks());
|
||||
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
|
||||
blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
|
||||
|
||||
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
|
||||
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
|
||||
|
@ -1770,8 +1779,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
ArrayList<LocatedBlock> lastBlockList =
|
||||
Lists.newArrayListWithCapacity(1);
|
||||
lastBlockList.add(lastBlock);
|
||||
blockManager.getDatanodeManager().sortLocatedBlocks(
|
||||
clientMachine, lastBlockList);
|
||||
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
|
||||
lastBlockList, randomizeBlockLocationsPerBlock);
|
||||
}
|
||||
}
|
||||
return blocks;
|
||||
|
@ -2580,11 +2589,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
// Path is within an EZ and we have provided encryption parameters.
|
||||
// Make sure that the generated EDEK matches the settings of the EZ.
|
||||
String ezKeyName = dir.getKeyName(iip);
|
||||
if (!ezKeyName.equals(edek.getKeyName())) {
|
||||
if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
|
||||
throw new RetryStartFileException();
|
||||
}
|
||||
feInfo = new FileEncryptionInfo(suite, edek.getEncryptedKey()
|
||||
.getMaterial(), edek.getIv(), edek.getKeyVersionName());
|
||||
feInfo = new FileEncryptionInfo(suite,
|
||||
edek.getEncryptedKeyVersion().getMaterial(),
|
||||
edek.getEncryptedKeyIv(),
|
||||
edek.getEncryptionKeyVersionName());
|
||||
Preconditions.checkNotNull(feInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||
|
@ -50,6 +51,7 @@ public class BlockCommand extends DatanodeCommand {
|
|||
final String poolId;
|
||||
final Block[] blocks;
|
||||
final DatanodeInfo[][] targets;
|
||||
final StorageType[][] targetStorageTypes;
|
||||
final String[][] targetStorageIDs;
|
||||
|
||||
/**
|
||||
|
@ -62,17 +64,20 @@ public class BlockCommand extends DatanodeCommand {
|
|||
this.poolId = poolId;
|
||||
blocks = new Block[blocktargetlist.size()];
|
||||
targets = new DatanodeInfo[blocks.length][];
|
||||
targetStorageTypes = new StorageType[blocks.length][];
|
||||
targetStorageIDs = new String[blocks.length][];
|
||||
|
||||
for(int i = 0; i < blocks.length; i++) {
|
||||
BlockTargetPair p = blocktargetlist.get(i);
|
||||
blocks[i] = p.block;
|
||||
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
|
||||
targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
|
||||
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
|
||||
}
|
||||
}
|
||||
|
||||
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
|
||||
private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
|
||||
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
|
||||
|
||||
/**
|
||||
|
@ -81,7 +86,7 @@ public class BlockCommand extends DatanodeCommand {
|
|||
*/
|
||||
public BlockCommand(int action, String poolId, Block blocks[]) {
|
||||
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
|
||||
EMPTY_TARGET_STORAGEIDS);
|
||||
EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -89,11 +94,13 @@ public class BlockCommand extends DatanodeCommand {
|
|||
* @param blocks blocks related to the action
|
||||
*/
|
||||
public BlockCommand(int action, String poolId, Block[] blocks,
|
||||
DatanodeInfo[][] targets, String[][] targetStorageIDs) {
|
||||
DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
|
||||
String[][] targetStorageIDs) {
|
||||
super(action);
|
||||
this.poolId = poolId;
|
||||
this.blocks = blocks;
|
||||
this.targets = targets;
|
||||
this.targetStorageTypes = targetStorageTypes;
|
||||
this.targetStorageIDs = targetStorageIDs;
|
||||
}
|
||||
|
||||
|
@ -109,6 +116,10 @@ public class BlockCommand extends DatanodeCommand {
|
|||
return targets;
|
||||
}
|
||||
|
||||
public StorageType[][] getTargetStorageTypes() {
|
||||
return targetStorageTypes;
|
||||
}
|
||||
|
||||
public String[][] getTargetStorageIDs() {
|
||||
return targetStorageIDs;
|
||||
}
|
||||
|
|
|
@ -122,6 +122,11 @@ public class DFSZKFailoverController extends ZKFailoverController {
|
|||
"HA is not enabled for this namenode.");
|
||||
}
|
||||
String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
|
||||
if (nnId == null) {
|
||||
String msg = "Could not get the namenode ID of this node. " +
|
||||
"You may run zkfc on the node other than namenode.";
|
||||
throw new HadoopIllegalArgumentException(msg);
|
||||
}
|
||||
NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
|
||||
DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);
|
||||
|
||||
|
|
|
@ -113,6 +113,7 @@ message BlockCommandProto {
|
|||
repeated BlockProto blocks = 3;
|
||||
repeated DatanodeInfosProto targets = 4;
|
||||
repeated StorageUuidsProto targetStorageUuids = 5;
|
||||
repeated StorageTypesProto targetStorageTypes = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -107,17 +107,21 @@ message OpWriteBlockProto {
|
|||
*/
|
||||
required ChecksumProto requestedChecksum = 9;
|
||||
optional CachingStrategyProto cachingStrategy = 10;
|
||||
optional StorageTypeProto storageType = 11 [default = DISK];
|
||||
repeated StorageTypeProto targetStorageTypes = 12;
|
||||
}
|
||||
|
||||
message OpTransferBlockProto {
|
||||
required ClientOperationHeaderProto header = 1;
|
||||
repeated DatanodeInfoProto targets = 2;
|
||||
repeated StorageTypeProto targetStorageTypes = 3;
|
||||
}
|
||||
|
||||
message OpReplaceBlockProto {
|
||||
required BaseHeaderProto header = 1;
|
||||
required string delHint = 2;
|
||||
required DatanodeInfoProto source = 3;
|
||||
optional StorageTypeProto storageType = 4 [default = DISK];
|
||||
}
|
||||
|
||||
message OpCopyBlockProto {
|
||||
|
|
|
@ -136,6 +136,13 @@ enum StorageTypeProto {
|
|||
SSD = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of storage types.
|
||||
*/
|
||||
message StorageTypesProto {
|
||||
repeated StorageTypeProto storageTypes = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of storage IDs.
|
||||
*/
|
||||
|
|
|
@ -2039,4 +2039,17 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.randomize-block-locations-per-block</name>
|
||||
<value>false</value>
|
||||
<description>When fetching replica locations of a block, the replicas
|
||||
are sorted based on network distance. This configuration parameter
|
||||
determines whether the replicas at the same network distance are randomly
|
||||
shuffled. By default, this is false, such that repeated requests for a block's
|
||||
replicas always result in the same order. This potentially improves page cache
|
||||
behavior. However, for some network topologies, it is desirable to shuffle this
|
||||
order for better load balancing.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -416,8 +416,8 @@ HDFS High Availability Using the Quorum Journal Manager
|
|||
|
||||
After all of the necessary configuration options have been set, you must
|
||||
start the JournalNode daemons on the set of machines where they will run. This
|
||||
can be done by running the command "<hdfs-daemon.sh journalnode>" and waiting
|
||||
for the daemon to start on each of the relevant machines.
|
||||
can be done by running the command "<hadoop-daemon.sh start journalnode>" and
|
||||
waiting for the daemon to start on each of the relevant machines.
|
||||
|
||||
Once the JournalNodes have been started, one must initially synchronize the
|
||||
two HA NameNodes' on-disk metadata.
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
~~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~~ you may not use this file except in compliance with the License.
|
||||
~~ You may obtain a copy of the License at
|
||||
~~
|
||||
~~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~~
|
||||
~~ Unless required by applicable law or agreed to in writing, software
|
||||
~~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~~ See the License for the specific language governing permissions and
|
||||
~~ limitations under the License. See accompanying LICENSE file.
|
||||
|
||||
---
|
||||
Hadoop Distributed File System-${project.version} - Support for Multi-Homed Networks
|
||||
---
|
||||
---
|
||||
${maven.build.timestamp}
|
||||
|
||||
HDFS Support for Multihomed Networks
|
||||
|
||||
This document is targetted to cluster administrators deploying <<<HDFS>>> in
|
||||
multihomed networks. Similar support for <<<YARN>>>/<<<MapReduce>>> is
|
||||
work in progress and will be documented when available.
|
||||
|
||||
%{toc|section=1|fromDepth=0}
|
||||
|
||||
* Multihoming Background
|
||||
|
||||
In multihomed networks the cluster nodes are connected to more than one
|
||||
network interface. There could be multiple reasons for doing so.
|
||||
|
||||
[[1]] <<Security>>: Security requirements may dictate that intra-cluster
|
||||
traffic be confined to a different network than the network used to
|
||||
transfer data in and out of the cluster.
|
||||
|
||||
[[2]] <<Performance>>: Intra-cluster traffic may use one or more high bandwidth
|
||||
interconnects like Fiber Channel, Infiniband or 10GbE.
|
||||
|
||||
[[3]] <<Failover/Redundancy>>: The nodes may have multiple network adapters
|
||||
connected to a single network to handle network adapter failure.
|
||||
|
||||
|
||||
Note that NIC Bonding (also known as NIC Teaming or Link
|
||||
Aggregation) is a related but separate topic. The following settings
|
||||
are usually not applicable to a NIC bonding configuration which handles
|
||||
multiplexing and failover transparently while presenting a single 'logical
|
||||
network' to applications.
|
||||
|
||||
* Fixing Hadoop Issues In Multihomed Environments
|
||||
|
||||
** Ensuring HDFS Daemons Bind All Interfaces
|
||||
|
||||
By default <<<HDFS>>> endpoints are specified as either hostnames or IP addresses.
|
||||
In either case <<<HDFS>>> daemons will bind to a single IP address making
|
||||
the daemons unreachable from other networks.
|
||||
|
||||
The solution is to have separate setting for server endpoints to force binding
|
||||
the wildcard IP address <<<INADDR_ANY>>> i.e. <<<0.0.0.0>>>. Do NOT supply a port
|
||||
number with any of these settings.
|
||||
|
||||
----
|
||||
<property>
|
||||
<name>dfs.namenode.rpc-bind-host</name>
|
||||
<value>0.0.0.0</value>
|
||||
<description>
|
||||
The actual address the RPC server will bind to. If this optional address is
|
||||
set, it overrides only the hostname portion of dfs.namenode.rpc-address.
|
||||
It can also be specified per name node or name service for HA/Federation.
|
||||
This is useful for making the name node listen on all interfaces by
|
||||
setting it to 0.0.0.0.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.servicerpc-bind-host</name>
|
||||
<value>0.0.0.0</value>
|
||||
<description>
|
||||
The actual address the service RPC server will bind to. If this optional address is
|
||||
set, it overrides only the hostname portion of dfs.namenode.servicerpc-address.
|
||||
It can also be specified per name node or name service for HA/Federation.
|
||||
This is useful for making the name node listen on all interfaces by
|
||||
setting it to 0.0.0.0.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.http-bind-host</name>
|
||||
<value>0.0.0.0</value>
|
||||
<description>
|
||||
The actual adress the HTTP server will bind to. If this optional address
|
||||
is set, it overrides only the hostname portion of dfs.namenode.http-address.
|
||||
It can also be specified per name node or name service for HA/Federation.
|
||||
This is useful for making the name node HTTP server listen on all
|
||||
interfaces by setting it to 0.0.0.0.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.https-bind-host</name>
|
||||
<value>0.0.0.0</value>
|
||||
<description>
|
||||
The actual adress the HTTPS server will bind to. If this optional address
|
||||
is set, it overrides only the hostname portion of dfs.namenode.https-address.
|
||||
It can also be specified per name node or name service for HA/Federation.
|
||||
This is useful for making the name node HTTPS server listen on all
|
||||
interfaces by setting it to 0.0.0.0.
|
||||
</description>
|
||||
</property>
|
||||
----
|
||||
|
||||
** Clients use Hostnames when connecting to DataNodes
|
||||
|
||||
By default <<<HDFS>>> clients connect to DataNodes using the IP address
|
||||
provided by the NameNode. Depending on the network configuration this
|
||||
IP address may be unreachable by the clients. The fix is letting clients perform
|
||||
their own DNS resolution of the DataNode hostname. The following setting
|
||||
enables this behavior.
|
||||
|
||||
----
|
||||
<property>
|
||||
<name>dfs.client.use.datanode.hostname</name>
|
||||
<value>true</value>
|
||||
<description>Whether clients should use datanode hostnames when
|
||||
connecting to datanodes.
|
||||
</description>
|
||||
</property>
|
||||
----
|
||||
|
||||
** DataNodes use HostNames when connecting to other DataNodes
|
||||
|
||||
Rarely, the NameNode-resolved IP address for a DataNode may be unreachable
|
||||
from other DataNodes. The fix is to force DataNodes to perform their own
|
||||
DNS resolution for inter-DataNode connections. The following setting enables
|
||||
this behavior.
|
||||
|
||||
----
|
||||
<property>
|
||||
<name>dfs.datanode.use.datanode.hostname</name>
|
||||
<value>true</value>
|
||||
<description>Whether datanodes should use datanode hostnames when
|
||||
connecting to other datanodes for data transfer.
|
||||
</description>
|
||||
</property>
|
||||
----
|
||||
|
|
@ -380,7 +380,7 @@ public class DFSTestUtil {
|
|||
*/
|
||||
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
|
||||
int racks, int replicas, int neededReplicas)
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
throws TimeoutException, InterruptedException {
|
||||
int curRacks = 0;
|
||||
int curReplicas = 0;
|
||||
int curNeededReplicas = 0;
|
||||
|
@ -414,7 +414,7 @@ public class DFSTestUtil {
|
|||
*/
|
||||
public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
|
||||
Path file, ExtendedBlock b, int corruptRepls)
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
throws TimeoutException, InterruptedException {
|
||||
int count = 0;
|
||||
final int ATTEMPTS = 50;
|
||||
int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
|
||||
|
@ -839,7 +839,8 @@ public class DFSTestUtil {
|
|||
|
||||
// send the request
|
||||
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
|
||||
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
|
||||
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
|
||||
new StorageType[]{StorageType.DEFAULT});
|
||||
out.flush();
|
||||
|
||||
return BlockOpResponseProto.parseDelimitedFrom(in);
|
||||
|
|
|
@ -125,17 +125,16 @@ public class TestDataTransferProtocol {
|
|||
throw eof;
|
||||
}
|
||||
|
||||
LOG.info("Received: " +new String(retBuf));
|
||||
LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray()));
|
||||
String received = StringUtils.byteToHexString(retBuf);
|
||||
String expected = StringUtils.byteToHexString(recvBuf.toByteArray());
|
||||
LOG.info("Received: " + received);
|
||||
LOG.info("Expected: " + expected);
|
||||
|
||||
if (eofExpected) {
|
||||
throw new IOException("Did not recieve IOException when an exception " +
|
||||
"is expected while reading from " + datanode);
|
||||
}
|
||||
|
||||
byte[] needed = recvBuf.toByteArray();
|
||||
assertEquals(StringUtils.byteToHexString(needed),
|
||||
StringUtils.byteToHexString(retBuf));
|
||||
assertEquals(expected, received);
|
||||
} finally {
|
||||
IOUtils.closeSocket(sock);
|
||||
}
|
||||
|
@ -184,10 +183,7 @@ public class TestDataTransferProtocol {
|
|||
String description, Boolean eofExcepted) throws IOException {
|
||||
sendBuf.reset();
|
||||
recvBuf.reset();
|
||||
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null, stage,
|
||||
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
||||
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
|
||||
if (eofExcepted) {
|
||||
sendResponse(Status.ERROR, null, null, recvOut);
|
||||
sendRecvData(description, true);
|
||||
|
@ -343,10 +339,7 @@ public class TestDataTransferProtocol {
|
|||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
DFSClient dfsClient = new DFSClient(
|
||||
new InetSocketAddress("localhost", cluster.getNameNodePort()),
|
||||
conf);
|
||||
datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
|
||||
datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0];
|
||||
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
|
||||
|
@ -381,23 +374,14 @@ public class TestDataTransferProtocol {
|
|||
DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
|
||||
Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
|
||||
|
||||
sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE,
|
||||
0, 0L, 0L, 0L,
|
||||
badChecksum, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(poolId, newBlockId, badChecksum);
|
||||
recvBuf.reset();
|
||||
sendResponse(Status.ERROR, null, null, recvOut);
|
||||
sendRecvData("wrong bytesPerChecksum while writing", true);
|
||||
|
||||
sendBuf.reset();
|
||||
recvBuf.reset();
|
||||
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
|
||||
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
|
||||
|
||||
PacketHeader hdr = new PacketHeader(
|
||||
4, // size of packet
|
||||
|
@ -416,11 +400,7 @@ public class TestDataTransferProtocol {
|
|||
// test for writing a valid zero size block
|
||||
sendBuf.reset();
|
||||
recvBuf.reset();
|
||||
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
|
||||
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
|
||||
writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
|
||||
|
||||
hdr = new PacketHeader(
|
||||
8, // size of packet
|
||||
|
@ -532,4 +512,18 @@ public class TestDataTransferProtocol {
|
|||
assertTrue(hdr.sanityCheck(99));
|
||||
assertFalse(hdr.sanityCheck(100));
|
||||
}
|
||||
|
||||
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
|
||||
writeBlock(new ExtendedBlock(poolId, blockId),
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
|
||||
}
|
||||
|
||||
void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
|
||||
long newGS, DataChecksum checksum) throws IOException {
|
||||
sender.writeBlock(block, StorageType.DEFAULT,
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
new DatanodeInfo[1], new StorageType[1], null, stage,
|
||||
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
||||
checksum, CachingStrategy.newDefaultStrategy());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -550,8 +550,10 @@ public class TestPBHelper {
|
|||
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
|
||||
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
|
||||
String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
|
||||
StorageType[][] storageTypes = {{StorageType.DEFAULT},
|
||||
{StorageType.DEFAULT, StorageType.DEFAULT}};
|
||||
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
|
||||
blocks, dnInfos, storageIDs);
|
||||
blocks, dnInfos, storageTypes, storageIDs);
|
||||
BlockCommandProto bcProto = PBHelper.convert(bc);
|
||||
BlockCommand bc2 = PBHelper.convert(bcProto);
|
||||
assertEquals(bc.getAction(), bc2.getAction());
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -324,7 +325,7 @@ public abstract class BlockReportTestBase {
|
|||
public void blockReport_03() throws IOException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
||||
writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
||||
|
||||
// all blocks belong to the same file, hence same BP
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||
|
@ -363,7 +364,7 @@ public abstract class BlockReportTestBase {
|
|||
// Create a bogus new block which will not be present on the namenode.
|
||||
ExtendedBlock b = new ExtendedBlock(
|
||||
poolId, rand.nextLong(), 1024L, rand.nextLong());
|
||||
dn.getFSDataset().createRbw(b);
|
||||
dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
|
||||
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||
|
|
|
@ -744,14 +744,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
|
||||
throws IOException {
|
||||
return createTemporary(b);
|
||||
public synchronized ReplicaInPipelineInterface createRbw(
|
||||
StorageType storageType, ExtendedBlock b) throws IOException {
|
||||
return createTemporary(storageType, b);
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
|
||||
throws IOException {
|
||||
public synchronized ReplicaInPipelineInterface createTemporary(
|
||||
StorageType storageType, ExtendedBlock b) throws IOException {
|
||||
if (isValidBlock(b)) {
|
||||
throw new ReplicaAlreadyExistsException("Block " + b +
|
||||
" is valid, and cannot be written to.");
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -531,7 +532,7 @@ public class TestBlockRecovery {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
dn.data.createRbw(block);
|
||||
dn.data.createRbw(StorageType.DEFAULT, block);
|
||||
try {
|
||||
dn.syncBlock(rBlock, initBlockRecords(dn));
|
||||
fail("Sync should fail");
|
||||
|
@ -554,7 +555,8 @@ public class TestBlockRecovery {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
|
||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
|
||||
StorageType.DEFAULT, block);
|
||||
ReplicaOutputStreams streams = null;
|
||||
try {
|
||||
streams = replicaInfo.createStreams(true,
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
|
@ -264,7 +265,8 @@ public class TestBlockReplacement {
|
|||
sock.setKeepAlive(true);
|
||||
// sendRequest
|
||||
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
|
||||
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
|
||||
new Sender(out).replaceBlock(block, StorageType.DEFAULT,
|
||||
BlockTokenSecretManager.DUMMY_TOKEN,
|
||||
source.getDatanodeUuid(), sourceProxy);
|
||||
out.flush();
|
||||
// receiveResponse
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -147,9 +148,9 @@ public class TestDiskError {
|
|||
|
||||
DataChecksum checksum = DataChecksum.newDataChecksum(
|
||||
DataChecksum.Type.CRC32, 512);
|
||||
new Sender(out).writeBlock(block.getBlock(),
|
||||
new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT,
|
||||
BlockTokenSecretManager.DUMMY_TOKEN, "",
|
||||
new DatanodeInfo[0], null,
|
||||
new DatanodeInfo[0], new StorageType[0], null,
|
||||
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
|
||||
checksum, CachingStrategy.newDefaultStrategy());
|
||||
out.flush();
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.io.OutputStream;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -65,7 +66,8 @@ public class TestSimulatedFSDataset {
|
|||
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
|
||||
// we pass expected len as zero, - fsdataset should use the sizeof actual
|
||||
// data written
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
|
||||
StorageType.DEFAULT, b);
|
||||
ReplicaOutputStreams out = bInfo.createStreams(true,
|
||||
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
|
||||
try {
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
|
@ -147,7 +148,7 @@ public class TestWriteToReplica {
|
|||
};
|
||||
|
||||
ReplicaMap replicasMap = dataSet.volumeMap;
|
||||
FsVolumeImpl vol = dataSet.volumes.getNextVolume(0);
|
||||
FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0);
|
||||
ReplicaInfo replicaInfo = new FinalizedReplica(
|
||||
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
|
||||
replicasMap.add(bpid, replicaInfo);
|
||||
|
@ -357,7 +358,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[FINALIZED]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
|
||||
Assert.fail("Should not have created a replica that's already " +
|
||||
"finalized " + blocks[FINALIZED]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -375,7 +376,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[TEMPORARY]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
|
||||
Assert.fail("Should not have created a replica that had created as " +
|
||||
"temporary " + blocks[TEMPORARY]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -385,7 +386,7 @@ public class TestWriteToReplica {
|
|||
0L, blocks[RBW].getNumBytes()); // expect to be successful
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[RBW]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
|
||||
Assert.fail("Should not have created a replica that had created as RBW " +
|
||||
blocks[RBW]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -401,7 +402,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[RWR]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
|
||||
Assert.fail("Should not have created a replica that was waiting to be " +
|
||||
"recovered " + blocks[RWR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -417,7 +418,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(blocks[RUR]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
|
||||
Assert.fail("Should not have created a replica that was under recovery " +
|
||||
blocks[RUR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -434,45 +435,45 @@ public class TestWriteToReplica {
|
|||
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
|
||||
}
|
||||
|
||||
dataSet.createRbw(blocks[NON_EXISTENT]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
|
||||
}
|
||||
|
||||
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
try {
|
||||
dataSet.createTemporary(blocks[FINALIZED]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
|
||||
Assert.fail("Should not have created a temporary replica that was " +
|
||||
"finalized " + blocks[FINALIZED]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[TEMPORARY]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
|
||||
Assert.fail("Should not have created a replica that had created as" +
|
||||
"temporary " + blocks[TEMPORARY]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[RBW]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
|
||||
Assert.fail("Should not have created a replica that had created as RBW " +
|
||||
blocks[RBW]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[RWR]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
|
||||
Assert.fail("Should not have created a replica that was waiting to be " +
|
||||
"recovered " + blocks[RWR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
dataSet.createTemporary(blocks[RUR]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
|
||||
Assert.fail("Should not have created a replica that was under recovery " +
|
||||
blocks[RUR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
}
|
||||
|
||||
dataSet.createTemporary(blocks[NON_EXISTENT]);
|
||||
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,32 +18,41 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.junit.Test;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class TestFavoredNodesEndToEnd {
|
||||
{
|
||||
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
private static MiniDFSCluster cluster;
|
||||
private static Configuration conf;
|
||||
private final static int NUM_DATA_NODES = 10;
|
||||
|
@ -79,7 +88,7 @@ public class TestFavoredNodesEndToEnd {
|
|||
InetSocketAddress datanode[] = getDatanodes(rand);
|
||||
Path p = new Path("/filename"+i);
|
||||
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
|
||||
4096, (short)3, (long)4096, null, datanode);
|
||||
4096, (short)3, 4096L, null, datanode);
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
BlockLocation[] locations = getBlockLocations(p);
|
||||
|
@ -98,14 +107,13 @@ public class TestFavoredNodesEndToEnd {
|
|||
//get some other nodes. In other words, the write to hdfs should not fail
|
||||
//and if we do getBlockLocations on the file, we should see one blklocation
|
||||
//and three hosts for that
|
||||
Random rand = new Random(System.currentTimeMillis());
|
||||
InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
|
||||
for (int i = 0; i < 3; i++) {
|
||||
arbitraryAddrs[i] = getArbitraryLocalHostAddr();
|
||||
}
|
||||
Path p = new Path("/filename-foo-bar");
|
||||
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
|
||||
4096, (short)3, (long)4096, null, arbitraryAddrs);
|
||||
4096, (short)3, 4096L, null, arbitraryAddrs);
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
getBlockLocations(p);
|
||||
|
@ -113,35 +121,41 @@ public class TestFavoredNodesEndToEnd {
|
|||
|
||||
@Test(timeout=180000)
|
||||
public void testWhenSomeNodesAreNotGood() throws Exception {
|
||||
// 4 favored nodes
|
||||
final InetSocketAddress addrs[] = new InetSocketAddress[4];
|
||||
final String[] hosts = new String[addrs.length];
|
||||
for (int i = 0; i < addrs.length; i++) {
|
||||
addrs[i] = datanodes.get(i).getXferAddress();
|
||||
hosts[i] = addrs[i].getAddress().getHostAddress() + ":" + addrs[i].getPort();
|
||||
}
|
||||
|
||||
//make some datanode not "good" so that even if the client prefers it,
|
||||
//the namenode would not give it as a replica to write to
|
||||
DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanodeByXferAddr(
|
||||
datanodes.get(0).getXferAddress().getAddress().getHostAddress(),
|
||||
datanodes.get(0).getXferAddress().getPort());
|
||||
addrs[0].getAddress().getHostAddress(), addrs[0].getPort());
|
||||
//set the decommission status to true so that
|
||||
//BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
|
||||
d.setDecommissioned();
|
||||
InetSocketAddress addrs[] = new InetSocketAddress[3];
|
||||
for (int i = 0; i < 3; i++) {
|
||||
addrs[i] = datanodes.get(i).getXferAddress();
|
||||
}
|
||||
Path p = new Path("/filename-foo-bar-baz");
|
||||
final short replication = (short)3;
|
||||
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
|
||||
4096, (short)3, (long)4096, null, addrs);
|
||||
4096, replication, 4096L, null, addrs);
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
//reset the state
|
||||
d.stopDecommission();
|
||||
|
||||
BlockLocation[] locations = getBlockLocations(p);
|
||||
Assert.assertEquals(replication, locations[0].getNames().length);;
|
||||
//also make sure that the datanode[0] is not in the list of hosts
|
||||
String datanode0 =
|
||||
datanodes.get(0).getXferAddress().getAddress().getHostAddress()
|
||||
+ ":" + datanodes.get(0).getXferAddress().getPort();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
if (locations[0].getNames()[i].equals(datanode0)) {
|
||||
fail(datanode0 + " not supposed to be a replica for the block");
|
||||
}
|
||||
for (int i = 0; i < replication; i++) {
|
||||
final String loc = locations[0].getNames()[i];
|
||||
int j = 0;
|
||||
for(; j < hosts.length && !loc.equals(hosts[j]); j++);
|
||||
Assert.assertTrue("j=" + j, j > 0);
|
||||
Assert.assertTrue("loc=" + loc + " not in host list "
|
||||
+ Arrays.asList(hosts) + ", j=" + j, j < hosts.length);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,14 @@ public class TestNetworkTopology {
|
|||
DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/d3/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/d3/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/d3/r2"),
|
||||
DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2")
|
||||
DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2"),
|
||||
DFSTestUtil.getDatanodeDescriptor("14.14.14.14", "/d4/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("15.15.15.15", "/d4/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("16.16.16.16", "/d4/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("17.17.17.17", "/d4/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("18.18.18.18", "/d4/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("19.19.19.19", "/d4/r1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("20.20.20.20", "/d4/r1"),
|
||||
};
|
||||
for (int i = 0; i < dataNodes.length; i++) {
|
||||
cluster.add(dataNodes[i]);
|
||||
|
@ -107,7 +114,7 @@ public class TestNetworkTopology {
|
|||
|
||||
@Test
|
||||
public void testRacks() throws Exception {
|
||||
assertEquals(cluster.getNumOfRacks(), 5);
|
||||
assertEquals(cluster.getNumOfRacks(), 6);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[0], dataNodes[1]));
|
||||
assertFalse(cluster.isOnSameRack(dataNodes[1], dataNodes[2]));
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[2], dataNodes[3]));
|
||||
|
@ -133,7 +140,7 @@ public class TestNetworkTopology {
|
|||
testNodes[1] = dataNodes[2];
|
||||
testNodes[2] = dataNodes[0];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[1]);
|
||||
assertTrue(testNodes[2] == dataNodes[2]);
|
||||
|
@ -146,7 +153,7 @@ public class TestNetworkTopology {
|
|||
dtestNodes[3] = dataNodes[9];
|
||||
dtestNodes[4] = dataNodes[10];
|
||||
cluster.sortByDistance(dataNodes[8], dtestNodes,
|
||||
dtestNodes.length - 2, 0xDEADBEEF);
|
||||
dtestNodes.length - 2, 0xDEADBEEF, false);
|
||||
assertTrue(dtestNodes[0] == dataNodes[8]);
|
||||
assertTrue(dtestNodes[1] == dataNodes[11]);
|
||||
assertTrue(dtestNodes[2] == dataNodes[12]);
|
||||
|
@ -158,7 +165,7 @@ public class TestNetworkTopology {
|
|||
testNodes[1] = dataNodes[3];
|
||||
testNodes[2] = dataNodes[0];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[1]);
|
||||
assertTrue(testNodes[2] == dataNodes[3]);
|
||||
|
@ -168,7 +175,7 @@ public class TestNetworkTopology {
|
|||
testNodes[1] = dataNodes[3];
|
||||
testNodes[2] = dataNodes[1];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[1]);
|
||||
assertTrue(testNodes[1] == dataNodes[3]);
|
||||
assertTrue(testNodes[2] == dataNodes[5]);
|
||||
|
@ -178,7 +185,7 @@ public class TestNetworkTopology {
|
|||
testNodes[1] = dataNodes[5];
|
||||
testNodes[2] = dataNodes[3];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[1]);
|
||||
assertTrue(testNodes[1] == dataNodes[3]);
|
||||
assertTrue(testNodes[2] == dataNodes[5]);
|
||||
|
@ -188,7 +195,7 @@ public class TestNetworkTopology {
|
|||
testNodes[1] = dataNodes[5];
|
||||
testNodes[2] = dataNodes[3];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEAD);
|
||||
testNodes.length, 0xDEAD, false);
|
||||
// sortByDistance does not take the "data center" layer into consideration
|
||||
// and it doesn't sort by getDistance, so 1, 5, 3 is also valid here
|
||||
assertTrue(testNodes[0] == dataNodes[1]);
|
||||
|
@ -204,7 +211,27 @@ public class TestNetworkTopology {
|
|||
testNodes[1] = dataNodes[6];
|
||||
testNodes[2] = dataNodes[7];
|
||||
cluster.sortByDistance(dataNodes[i], testNodes,
|
||||
testNodes.length, 0xBEADED+i);
|
||||
testNodes.length, 0xBEADED+i, false);
|
||||
if (first == null) {
|
||||
first = testNodes[0];
|
||||
} else {
|
||||
if (first != testNodes[0]) {
|
||||
foundRandom = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertTrue("Expected to find a different first location", foundRandom);
|
||||
// Array of rack local nodes with randomizeBlockLocationsPerBlock set to
|
||||
// true
|
||||
// Expect random order of block locations for same block
|
||||
first = null;
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
testNodes[0] = dataNodes[13];
|
||||
testNodes[1] = dataNodes[14];
|
||||
testNodes[2] = dataNodes[15];
|
||||
cluster.sortByDistance(dataNodes[15 + i], testNodes, testNodes.length,
|
||||
0xBEADED, true);
|
||||
if (first == null) {
|
||||
first = testNodes[0];
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue