From 25b0e8471ed744578b2d8e3f0debe5477b268e54 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 22 Jul 2014 07:41:24 +0000 Subject: [PATCH] HDFS-6702. Change DFSClient to pass the StorageType from the namenode to datanodes and change datanode to write block replicas using the specified storage type. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612493 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 + .../apache/hadoop/hdfs/DFSOutputStream.java | 50 +++--- .../datatransfer/DataTransferProtocol.java | 21 ++- .../hdfs/protocol/datatransfer/Receiver.java | 11 +- .../hdfs/protocol/datatransfer/Sender.java | 11 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 67 +++++--- .../hadoop/hdfs/server/balancer/Balancer.java | 6 +- .../hdfs/server/datanode/BPOfferService.java | 3 +- .../hdfs/server/datanode/BlockReceiver.java | 10 +- .../hadoop/hdfs/server/datanode/DataNode.java | 150 ++++++++++++++---- .../hdfs/server/datanode/DataXceiver.java | 25 +-- .../datanode/fsdataset/FsDatasetSpi.java | 9 +- .../fsdataset/impl/FsDatasetImpl.java | 74 ++++++--- .../datanode/fsdataset/impl/FsVolumeList.java | 21 ++- .../hdfs/server/protocol/BlockCommand.java | 15 +- .../src/main/proto/DatanodeProtocol.proto | 1 + .../src/main/proto/datatransfer.proto | 4 + .../hadoop-hdfs/src/main/proto/hdfs.proto | 7 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 7 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 54 +++---- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 4 +- .../server/datanode/BlockReportTestBase.java | 5 +- .../server/datanode/SimulatedFSDataset.java | 10 +- .../server/datanode/TestBlockRecovery.java | 6 +- .../server/datanode/TestBlockReplacement.java | 4 +- .../hdfs/server/datanode/TestDiskError.java | 5 +- .../datanode/TestSimulatedFSDataset.java | 4 +- .../fsdataset/impl/TestWriteToReplica.java | 27 ++-- 28 files changed, 436 insertions(+), 179 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ec75643e539..acea2a3b1fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -307,6 +307,10 @@ Release 2.6.0 - UNRELEASED HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo) + HDFS-6702. Change DFSClient to pass the StorageType from the namenode to + datanodes and change datanode to write block replicas using the specified + storage type. (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e78ee160bab..a7cb92fa269 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -313,6 +313,7 @@ public class DFSOutputStream extends FSOutputSummer private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; private final LoadingCache excludedNodes = CacheBuilder.newBuilder() @@ -417,10 +418,12 @@ public class DFSOutputStream extends FSOutputSummer } private void setPipeline(LocatedBlock lb) { - setPipeline(lb.getLocations(), lb.getStorageIDs()); + setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } - private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) { + private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, + String[] storageIDs) { this.nodes = nodes; + this.storageTypes = storageTypes; this.storageIDs = storageIDs; } @@ -446,7 +449,7 @@ public class DFSOutputStream extends FSOutputSummer this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); - setPipeline(null, null); + setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -1031,10 +1034,12 @@ public class DFSOutputStream extends FSOutputSummer //transfer replica final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; final DatanodeInfo[] targets = {nodes[d]}; - transfer(src, targets, lb.getBlockToken()); + final StorageType[] targetStorageTypes = {storageTypes[d]}; + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); } private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final Token blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; @@ -1056,7 +1061,7 @@ public class DFSOutputStream extends FSOutputSummer //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets); + targets, targetStorageTypes); out.flush(); //ack @@ -1135,16 +1140,15 @@ public class DFSOutputStream extends FSOutputSummer failed.add(nodes[errorIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - System.arraycopy(nodes, 0, newnodes, 0, errorIndex); - System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, - newnodes.length-errorIndex); + arraycopy(nodes, newnodes, errorIndex); + + final StorageType[] newStorageTypes = new StorageType[newnodes.length]; + arraycopy(storageTypes, newStorageTypes, errorIndex); final String[] newStorageIDs = new String[newnodes.length]; - System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex); - System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex, - newStorageIDs.length-errorIndex); + arraycopy(storageIDs, newStorageIDs, errorIndex); - setPipeline(newnodes, newStorageIDs); + setPipeline(newnodes, newStorageTypes, newStorageIDs); // Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { @@ -1181,7 +1185,7 @@ public class DFSOutputStream extends FSOutputSummer // set up the pipeline again with the remaining nodes if (failPacket) { // for testing - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, @@ -1190,7 +1194,7 @@ public class DFSOutputStream extends FSOutputSummer Thread.sleep(2000); } catch (InterruptedException ie) {} } else { - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); } if (restartingNodeIndex >= 0) { @@ -1242,6 +1246,7 @@ public class DFSOutputStream extends FSOutputSummer private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; + StorageType[] storageTypes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; ExtendedBlock oldBlock = block; @@ -1264,11 +1269,12 @@ public class DFSOutputStream extends FSOutputSummer bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); + storageTypes = lb.getStorageTypes(); // // Connect to first DataNode in the list. // - success = createBlockOutputStream(nodes, 0L, false); + success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { DFSClient.LOG.info("Abandoning " + block); @@ -1289,8 +1295,8 @@ public class DFSOutputStream extends FSOutputSummer // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, - boolean recoveryFlag) { + private boolean createBlockOutputStream(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { DFSClient.LOG.info("nodes are empty for write pipeline of block " + block); @@ -1332,9 +1338,10 @@ public class DFSOutputStream extends FSOutputSummer // Xmit header info to datanode // + BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken, + dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, cachingStrategy.get()); @@ -2197,4 +2204,9 @@ public class DFSOutputStream extends FSOutputSummer public long getFileId() { return fileId; } + + private static void arraycopy(T[] srcs, T[] dsts, int skipIndex) { + System.arraycopy(srcs, 0, dsts, 0, skipIndex); + System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index d620c6b502b..d54d5bed002 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -71,11 +72,20 @@ public interface DataTransferProtocol { /** * Write a block to a datanode pipeline. - * + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. * @param blockToken security token for accessing the block. * @param clientName client's name. - * @param targets target datanodes in the pipeline. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. * @param source source datanode. * @param stage pipeline stage. * @param pipelineSize the size of the pipeline. @@ -84,9 +94,11 @@ public interface DataTransferProtocol { * @param latestGenerationStamp the latest generation stamp of the block. */ public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -110,7 +122,8 @@ public interface DataTransferProtocol { public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException; + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; /** * Request short circuit access file descriptors from a DataNode. @@ -148,11 +161,13 @@ public interface DataTransferProtocol { * It is used for balancing purpose. * * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. */ public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo source) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 5a80f689da3..a09437c0b0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; @@ -121,10 +122,13 @@ public abstract class Receiver implements DataTransferProtocol { /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList()), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), @@ -140,10 +144,12 @@ public abstract class Receiver implements DataTransferProtocol { private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList())); + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -176,6 +182,7 @@ public abstract class Receiver implements DataTransferProtocol { private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index c03f33fb276..68da52399c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -111,9 +112,11 @@ public class Sender implements DataTransferProtocol { @Override public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -130,7 +133,9 @@ public class Sender implements DataTransferProtocol { OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) + .setStorageType(PBHelper.convertStorageType(storageType)) .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -150,12 +155,14 @@ public class Sender implements DataTransferProtocol { public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -196,11 +203,13 @@ public class Sender implements DataTransferProtocol { @Override public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelper.convertStorageType(storageType)) .setDelHint(delHint) .setSource(PBHelper.convertDatanodeInfo(source)) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e9435024bed..bd7e5f13918 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryLi import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; @@ -674,14 +675,8 @@ public class PBHelper { targets[i] = PBHelper.convert(locs.get(i)); } - final int storageTypesCount = proto.getStorageTypesCount(); - final StorageType[] storageTypes; - if (storageTypesCount == 0) { - storageTypes = null; - } else { - Preconditions.checkState(storageTypesCount == locs.size()); - storageTypes = convertStorageTypeProtos(proto.getStorageTypesList()); - } + final StorageType[] storageTypes = convertStorageTypes( + proto.getStorageTypesList(), locs.size()); final int storageIDsCount = proto.getStorageIDsCount(); final String[] storageIDs; @@ -969,6 +964,20 @@ public class PBHelper { targets[i] = PBHelper.convert(targetList.get(i)); } + StorageType[][] targetStorageTypes = new StorageType[targetList.size()][]; + List targetStorageTypesList = blkCmd.getTargetStorageTypesList(); + if (targetStorageTypesList.isEmpty()) { // missing storage types + for(int i = 0; i < targetStorageTypes.length; i++) { + targetStorageTypes[i] = new StorageType[targets[i].length]; + Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT); + } + } else { + for(int i = 0; i < targetStorageTypes.length; i++) { + List p = targetStorageTypesList.get(i).getStorageTypesList(); + targetStorageTypes[i] = p.toArray(new StorageType[p.size()]); + } + } + List targetStorageUuidsList = blkCmd.getTargetStorageUuidsList(); String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][]; for(int i = 0; i < targetStorageIDs.length; i++) { @@ -991,7 +1000,7 @@ public class PBHelper { throw new AssertionError("Unknown action type: " + blkCmd.getAction()); } return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets, - targetStorageIDs); + targetStorageTypes, targetStorageIDs); } public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { @@ -1605,8 +1614,25 @@ public class PBHelper { } } - private static StorageTypeProto convertStorageType( - StorageType type) { + public static List convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List protos = new ArrayList( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(convertStorageType(types[i])); + } + return protos; + } + + public static StorageTypeProto convertStorageType(StorageType type) { switch(type) { case DISK: return StorageTypeProto.DISK; @@ -1621,7 +1647,7 @@ public class PBHelper { public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelper.convertType(s.getStorageType())); + PBHelper.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1634,7 +1660,7 @@ public class PBHelper { } } - private static StorageType convertType(StorageTypeProto type) { + public static StorageType convertStorageType(StorageTypeProto type) { switch(type) { case DISK: return StorageType.DISK; @@ -1646,11 +1672,16 @@ public class PBHelper { } } - private static StorageType[] convertStorageTypeProtos( - List storageTypesList) { - final StorageType[] storageTypes = new StorageType[storageTypesList.size()]; - for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = PBHelper.convertType(storageTypesList.get(i)); + public static StorageType[] convertStorageTypes( + List storageTypesList, int expectedSize) { + final StorageType[] storageTypes = new StorageType[expectedSize]; + if (storageTypesList.size() != expectedSize) { // missing storage types + Preconditions.checkState(storageTypesList.isEmpty()); + Arrays.fill(storageTypes, StorageType.DEFAULT); + } else { + for (int i = 0; i < storageTypes.length; ++i) { + storageTypes[i] = convertStorageType(storageTypesList.get(i)); + } } return storageTypes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 7482b2fcdce..5dbdd643cdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -368,7 +369,7 @@ public class Balancer { in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - sendRequest(out, eb, accessToken); + sendRequest(out, eb, StorageType.DEFAULT, accessToken); receiveResponse(in); bytesMoved.addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); @@ -400,8 +401,9 @@ public class Balancer { /* Send a block replace request to the output stream*/ private void sendRequest(DataOutputStream out, ExtendedBlock eb, + StorageType storageType, Token accessToken) throws IOException { - new Sender(out).replaceBlock(eb, accessToken, + new Sender(out).replaceBlock(eb, storageType, accessToken, source.getStorageID(), proxySource.getDatanode()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index cab61364ed2..0a6549de8f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -575,7 +575,8 @@ class BPOfferService { switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), + bcmd.getTargets(), bcmd.getTargetStorageTypes()); dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3d9ccdcca90..2f3909ba564 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -37,6 +37,7 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -122,7 +123,8 @@ class BlockReceiver implements Closeable { private boolean syncOnClose; private long restartBudget; - BlockReceiver(final ExtendedBlock block, final DataInputStream in, + BlockReceiver(final ExtendedBlock block, final StorageType storageType, + final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, @@ -162,11 +164,11 @@ class BlockReceiver implements Closeable { // Open local disk out // if (isDatanode) { //replication or move - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(block); + replicaInfo = datanode.data.createRbw(storageType, block); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; @@ -198,7 +200,7 @@ class BlockReceiver implements Closeable { case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 5f32e29cd29..b55abed7e46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -19,11 +19,66 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.util.ExitUtil.terminate; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.management.ObjectName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; @@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Client import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; -import org.apache.hadoop.hdfs.protocolPB.*; -import org.apache.hadoop.hdfs.security.token.block.*; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpConfig; @@ -88,22 +171,21 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.*; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; -import javax.management.ObjectName; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.net.*; -import java.nio.channels.SocketChannel; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import static org.apache.hadoop.util.ExitUtil.terminate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -1475,8 +1557,8 @@ public class DataNode extends Configured return xmitsInProgress.get(); } - private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) - throws IOException { + private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, + StorageType[] xferTargetStorageTypes) throws IOException { BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); @@ -1512,16 +1594,17 @@ public class DataNode extends Configured LOG.info(bpReg + " Starting thread to transfer " + block + " to " + xfersBuilder); - new Daemon(new DataTransfer(xferTargets, block, + new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); } } void transferBlocks(String poolId, Block blocks[], - DatanodeInfo xferTargets[][]) { + DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { for (int i = 0; i < blocks.length; i++) { try { - transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]); + transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], + xferTargetStorageTypes[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -1624,6 +1707,7 @@ public class DataNode extends Configured */ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; + final StorageType[] targetStorageTypes; final ExtendedBlock b; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; @@ -1634,7 +1718,8 @@ public class DataNode extends Configured * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ - DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, + DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, + ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " @@ -1644,6 +1729,7 @@ public class DataNode extends Configured + ", targests=" + Arrays.asList(targets)); } this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); @@ -1702,7 +1788,8 @@ public class DataNode extends Configured false, false, true, DataNode.this, null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfo(bpReg); - new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, + new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, + clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); // send data & checksum @@ -2403,7 +2490,8 @@ public class DataNode extends Configured * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, - final DatanodeInfo[] targets, final String client) throws IOException { + final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; @@ -2436,7 +2524,7 @@ public class DataNode extends Configured b.setNumBytes(visible); if (targets.length > 0) { - new DataTransfer(targets, b, stage, client).run(); + new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 7b730905156..5ef6cc7ee22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -45,6 +45,7 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -524,9 +525,11 @@ class DataXceiver extends Receiver implements Runnable { @Override public void writeBlock(final ExtendedBlock block, + final StorageType storageType, final Token blockToken, final String clientname, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, @@ -590,12 +593,13 @@ class DataXceiver extends Receiver implements Runnable { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = new BlockReceiver(block, in, + blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy); + storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( @@ -636,10 +640,10 @@ class DataXceiver extends Receiver implements Runnable { HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); - new Sender(mirrorOut).writeBlock(originalBlock, blockToken, - clientname, targets, srcDataNode, stage, pipelineSize, - minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, - cachingStrategy); + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + blockToken, clientname, targets, targetStorageTypes, srcDataNode, + stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy); mirrorOut.flush(); @@ -754,7 +758,8 @@ class DataXceiver extends Receiver implements Runnable { public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; @@ -763,7 +768,8 @@ class DataXceiver extends Receiver implements Runnable { final DataOutputStream out = new DataOutputStream( getOutputStream()); try { - datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); + datanode.transferReplicaForPipelineRecovery(blk, targets, + targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); @@ -941,6 +947,7 @@ class DataXceiver extends Receiver implements Runnable { @Override public void replaceBlock(final ExtendedBlock block, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { @@ -1026,8 +1033,8 @@ class DataXceiver extends Receiver implements Runnable { DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver( - block, proxyReply, proxySock.getRemoteSocketAddress().toString(), + blockReceiver = new BlockReceiver(block, storageType, + proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, CachingStrategy.newDropBehind()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 8eb083a93f8..5e4f55e7339 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -176,8 +177,8 @@ public interface FsDatasetSpi extends FSDatasetMBean { * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Creates a RBW replica and returns the meta info of the replica @@ -186,8 +187,8 @@ public interface FsDatasetSpi extends FSDatasetMBean { * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createRbw(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 40d3e81ad62..b068c664fe3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -17,6 +17,28 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.datanode.*; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; +import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.*; -import java.util.concurrent.Executor; - /************************************************** * FSDataset manages a set of data blocks. Each block * has a unique name and an extent on disk. @@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createRbw(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDatasetSpi { " and thus cannot be created."); } // create a new block - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), @@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDatasetSpi { " and thus cannot be created."); } - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 76b3ee8ba9a..59a5c9021cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -18,13 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; -import org.apache.hadoop.util.Time; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Time; class FsVolumeList { /** @@ -52,11 +56,18 @@ class FsVolumeList { * by a single thread and next volume is chosen with no concurrent * update to {@link #volumes}. * @param blockSize free space needed on the volume + * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ - // TODO should choose volume with storage type - synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException { - return blockChooser.chooseVolume(volumes, blockSize); + synchronized FsVolumeImpl getNextVolume(StorageType storageType, + long blockSize) throws IOException { + final List list = new ArrayList(volumes.size()); + for(FsVolumeImpl v : volumes) { + if (v.getStorageType() == storageType) { + list.add(v); + } + } + return blockChooser.chooseVolume(list, blockSize); } long getDfsUsed() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java index 1798d664f93..f17d702f483 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; @@ -50,6 +51,7 @@ public class BlockCommand extends DatanodeCommand { final String poolId; final Block[] blocks; final DatanodeInfo[][] targets; + final StorageType[][] targetStorageTypes; final String[][] targetStorageIDs; /** @@ -62,17 +64,20 @@ public class BlockCommand extends DatanodeCommand { this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; + targetStorageTypes = new StorageType[blocks.length][]; targetStorageIDs = new String[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets); + targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets); targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets); } } private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {}; + private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {}; private static final String[][] EMPTY_TARGET_STORAGEIDS = {}; /** @@ -81,7 +86,7 @@ public class BlockCommand extends DatanodeCommand { */ public BlockCommand(int action, String poolId, Block blocks[]) { this(action, poolId, blocks, EMPTY_TARGET_DATANODES, - EMPTY_TARGET_STORAGEIDS); + EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS); } /** @@ -89,11 +94,13 @@ public class BlockCommand extends DatanodeCommand { * @param blocks blocks related to the action */ public BlockCommand(int action, String poolId, Block[] blocks, - DatanodeInfo[][] targets, String[][] targetStorageIDs) { + DatanodeInfo[][] targets, StorageType[][] targetStorageTypes, + String[][] targetStorageIDs) { super(action); this.poolId = poolId; this.blocks = blocks; this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.targetStorageIDs = targetStorageIDs; } @@ -109,6 +116,10 @@ public class BlockCommand extends DatanodeCommand { return targets; } + public StorageType[][] getTargetStorageTypes() { + return targetStorageTypes; + } + public String[][] getTargetStorageIDs() { return targetStorageIDs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index d6cc3d37777..2afcf057f70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -113,6 +113,7 @@ message BlockCommandProto { repeated BlockProto blocks = 3; repeated DatanodeInfosProto targets = 4; repeated StorageUuidsProto targetStorageUuids = 5; + repeated StorageTypesProto targetStorageTypes = 6; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index d9147138948..9b4ba339d23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -107,17 +107,21 @@ message OpWriteBlockProto { */ required ChecksumProto requestedChecksum = 9; optional CachingStrategyProto cachingStrategy = 10; + optional StorageTypeProto storageType = 11 [default = DISK]; + repeated StorageTypeProto targetStorageTypes = 12; } message OpTransferBlockProto { required ClientOperationHeaderProto header = 1; repeated DatanodeInfoProto targets = 2; + repeated StorageTypeProto targetStorageTypes = 3; } message OpReplaceBlockProto { required BaseHeaderProto header = 1; required string delHint = 2; required DatanodeInfoProto source = 3; + optional StorageTypeProto storageType = 4 [default = DISK]; } message OpCopyBlockProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 6fbb5b34205..8f15be6e92e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -136,6 +136,13 @@ enum StorageTypeProto { SSD = 2; } +/** + * A list of storage types. + */ +message StorageTypesProto { + repeated StorageTypeProto storageTypes = 1; +} + /** * A list of storage IDs. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index bebb946d27c..949713e6f44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -380,7 +380,7 @@ public class DFSTestUtil { */ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, int racks, int replicas, int neededReplicas) - throws IOException, TimeoutException, InterruptedException { + throws TimeoutException, InterruptedException { int curRacks = 0; int curReplicas = 0; int curNeededReplicas = 0; @@ -414,7 +414,7 @@ public class DFSTestUtil { */ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns, Path file, ExtendedBlock b, int corruptRepls) - throws IOException, TimeoutException, InterruptedException { + throws TimeoutException, InterruptedException { int count = 0; final int ATTEMPTS = 50; int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); @@ -839,7 +839,8 @@ public class DFSTestUtil { // send the request new Sender(out).transferBlock(b, new Token(), - dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}); + dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, + new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 244603eac62..bcb68e9ce71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -125,17 +125,16 @@ public class TestDataTransferProtocol { throw eof; } - LOG.info("Received: " +new String(retBuf)); - LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray())); + String received = StringUtils.byteToHexString(retBuf); + String expected = StringUtils.byteToHexString(recvBuf.toByteArray()); + LOG.info("Received: " + received); + LOG.info("Expected: " + expected); if (eofExpected) { throw new IOException("Did not recieve IOException when an exception " + "is expected while reading from " + datanode); } - - byte[] needed = recvBuf.toByteArray(); - assertEquals(StringUtils.byteToHexString(needed), - StringUtils.byteToHexString(retBuf)); + assertEquals(expected, received); } finally { IOUtils.closeSocket(sock); } @@ -184,10 +183,7 @@ public class TestDataTransferProtocol { String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, stage, - 0, block.getNumBytes(), block.getNumBytes(), newGS, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(block, stage, newGS, DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); @@ -343,10 +339,7 @@ public class TestDataTransferProtocol { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); try { cluster.waitActive(); - DFSClient dfsClient = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), - conf); - datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]; + datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0]; dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); FileSystem fileSys = cluster.getFileSystem(); @@ -381,23 +374,14 @@ public class TestDataTransferProtocol { DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM); Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum(); - sender.writeBlock(new ExtendedBlock(poolId, newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, - 0, 0L, 0L, 0L, - badChecksum, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, newBlockId, badChecksum); recvBuf.reset(); sendResponse(Status.ERROR, null, null, recvOut); sendRecvData("wrong bytesPerChecksum while writing", true); sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM); PacketHeader hdr = new PacketHeader( 4, // size of packet @@ -416,11 +400,7 @@ public class TestDataTransferProtocol { // test for writing a valid zero size block sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM); hdr = new PacketHeader( 8, // size of packet @@ -532,4 +512,18 @@ public class TestDataTransferProtocol { assertTrue(hdr.sanityCheck(99)); assertFalse(hdr.sanityCheck(100)); } + + void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException { + writeBlock(new ExtendedBlock(poolId, blockId), + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum); + } + + void writeBlock(ExtendedBlock block, BlockConstructionStage stage, + long newGS, DataChecksum checksum) throws IOException { + sender.writeBlock(block, StorageType.DEFAULT, + BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], new StorageType[1], null, stage, + 0, block.getNumBytes(), block.getNumBytes(), newGS, + checksum, CachingStrategy.newDefaultStrategy()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index e527439bc28..6c8547ebf8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -550,8 +550,10 @@ public class TestPBHelper { dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo(); dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo(); String[][] storageIDs = {{"s00"}, {"s10", "s11"}}; + StorageType[][] storageTypes = {{StorageType.DEFAULT}, + {StorageType.DEFAULT, StorageType.DEFAULT}}; BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1", - blocks, dnInfos, storageIDs); + blocks, dnInfos, storageTypes, storageIDs); BlockCommandProto bcProto = PBHelper.convert(bc); BlockCommand bc2 = PBHelper.convert(bcProto); assertEquals(bc.getAction(), bc2.getAction()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index d2f58a877eb..9ea6c5186a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -324,7 +325,7 @@ public abstract class BlockReportTestBase { public void blockReport_03() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); - ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); + writeFile(METHOD_NAME, FILE_SIZE, filePath); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N0); @@ -363,7 +364,7 @@ public abstract class BlockReportTestBase { // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); - dn.getFSDataset().createRbw(b); + dn.getFSDataset().createRbw(StorageType.DEFAULT, b); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index baf046af847..e3db5350029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -744,14 +744,14 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) - throws IOException { - return createTemporary(b); + public synchronized ReplicaInPipelineInterface createRbw( + StorageType storageType, ExtendedBlock b) throws IOException { + return createTemporary(storageType, b); } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipelineInterface createTemporary( + StorageType storageType, ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 4ff942b1f8e..a3622a465ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -531,7 +532,7 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - dn.data.createRbw(block); + dn.data.createRbw(StorageType.DEFAULT, block); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); @@ -554,7 +555,8 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); + ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( + StorageType.DEFAULT, block); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 02faa595e4c..478b6d1546b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -264,7 +265,8 @@ public class TestBlockReplacement { sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, + new Sender(out).replaceBlock(block, StorageType.DEFAULT, + BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index f75bf2465ff..798b7b7c705 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -147,9 +148,9 @@ public class TestDiskError { DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.Type.CRC32, 512); - new Sender(out).writeBlock(block.getBlock(), + new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "", - new DatanodeInfo[0], null, + new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, checksum, CachingStrategy.newDefaultStrategy()); out.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index d03e5ea0252..bd6c3de2266 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -29,6 +29,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -65,7 +66,8 @@ public class TestSimulatedFSDataset { ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual // data written - ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b); + ReplicaInPipelineInterface bInfo = fsdataset.createRbw( + StorageType.DEFAULT, b); ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index dfefc1e8437..b8246c31913 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; @@ -147,7 +148,7 @@ public class TestWriteToReplica { }; ReplicaMap replicasMap = dataSet.volumeMap; - FsVolumeImpl vol = dataSet.volumes.getNextVolume(0); + FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0); ReplicaInfo replicaInfo = new FinalizedReplica( blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); replicasMap.add(bpid, replicaInfo); @@ -357,7 +358,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(blocks[FINALIZED]); + dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]); Assert.fail("Should not have created a replica that's already " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { @@ -375,7 +376,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(blocks[TEMPORARY]); + dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]); Assert.fail("Should not have created a replica that had created as " + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { @@ -385,7 +386,7 @@ public class TestWriteToReplica { 0L, blocks[RBW].getNumBytes()); // expect to be successful try { - dataSet.createRbw(blocks[RBW]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { @@ -401,7 +402,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(blocks[RWR]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { @@ -417,7 +418,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(blocks[RUR]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { @@ -434,45 +435,45 @@ public class TestWriteToReplica { e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA)); } - dataSet.createRbw(blocks[NON_EXISTENT]); + dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]); } private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { try { - dataSet.createTemporary(blocks[FINALIZED]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]); Assert.fail("Should not have created a temporary replica that was " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[TEMPORARY]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]); Assert.fail("Should not have created a replica that had created as" + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[RBW]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[RWR]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[RUR]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { } - dataSet.createTemporary(blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); } }