diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2ab56720364..ef396ed6dc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -29,6 +29,8 @@ Release 0.23-PB - Unreleased HDFS-2488. Separate datatypes for InterDatanodeProtocol. (suresh) + HDFS-2496. Separate datatypes for DatanodeProtocol. (suresh) + BUG FIXES HDFS-2481 Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol (sanjay) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 58a9c14bb2a..2b61d5aed8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -39,9 +39,6 @@ import org.apache.hadoop.security.token.TokenInfo; serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) @TokenInfo(BlockTokenSelector.class) public interface ClientDatanodeProtocol extends VersionedProtocol { - public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); - - /** * Until version 9, this class ClientDatanodeProtocol served as both * the client interface to the DN AND the RPC protocol used to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java index 7518abaea9c..28cc2cedf40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java @@ -92,4 +92,20 @@ public class BlockWritable implements Writable { public long getGenerationStamp() { return generationStamp; } + + public static Block[] convert(BlockWritable[] blocks) { + Block[] ret = new Block[blocks.length]; + for (int i = 0; i < blocks.length; i++) { + ret[i] = blocks[i].convert(); + } + return ret; + } + + public static BlockWritable[] convert(Block[] blocks) { + BlockWritable[] ret = new BlockWritable[blocks.length]; + for (int i = 0; i < blocks.length; i++) { + ret[i] = BlockWritable.convert(blocks[i]); + } + return ret; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java index d236da36aec..cfbfb0a5360 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java @@ -67,7 +67,6 @@ public class BlockCommand extends DatanodeCommand { public BlockCommand(int action, String poolId, List blocktargetlist) { super(action); - this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; @@ -85,12 +84,21 @@ public class BlockCommand extends DatanodeCommand { * @param blocks blocks related to the action */ public BlockCommand(int action, String poolId, Block blocks[]) { + this(action, poolId, blocks, EMPTY_TARGET); + } + + /** + * Create BlockCommand for the given action + * @param blocks blocks related to the action + */ + public BlockCommand(int action, String poolId, Block[] blocks, + DatanodeInfo[][] targets) { super(action); this.poolId = poolId; this.blocks = blocks; - this.targets = EMPTY_TARGET; + this.targets = targets; } - + public String getBlockPoolId() { return poolId; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java index 992deb88028..0c2e55e6933 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java @@ -117,8 +117,12 @@ public class BlockRecoveryCommand extends DatanodeCommand { * the specified capacity for recovering blocks. */ public BlockRecoveryCommand(int capacity) { + this(new ArrayList(capacity)); + } + + public BlockRecoveryCommand(Collection blocks) { super(DatanodeProtocol.DNA_RECOVERBLOCK); - recoveringBlocks = new ArrayList(capacity); + recoveringBlocks = blocks; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 9fb509109ee..28f54e86eee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -22,10 +22,11 @@ import java.io.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; +import org.apache.hadoop.hdfs.server.protocolR23Compatible.DatanodeWireProtocol; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; @@ -45,7 +46,14 @@ import org.apache.avro.reflect.Nullable; @InterfaceAudience.Private public interface DatanodeProtocol extends VersionedProtocol { /** - * 28: Add Balancer Bandwidth Command protocol. + * This class is used by both the Namenode (client) and BackupNode (server) + * to insulate from the protocol serialization. + * + * If you are adding/changing DN's interface then you need to + * change both this class and ALSO + * {@link DatanodeWireProtocol}. + * These changes need to be done in a compatible fashion as described in + * {@link ClientNamenodeWireProtocol} */ public static final long versionID = 28L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java index 5dc8825e26b..c0fea64089a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java @@ -63,9 +63,14 @@ implements Writable, NodeRegistration { * Create DatanodeRegistration */ public DatanodeRegistration(String nodeName) { + this(nodeName, new StorageInfo(), new ExportedBlockKeys()); + } + + public DatanodeRegistration(String nodeName, StorageInfo info, + ExportedBlockKeys keys) { super(nodeName); - this.storageInfo = new StorageInfo(); - this.exportedKeys = new ExportedBlockKeys(); + this.storageInfo = info; + this.exportedKeys = keys; } public void setStorageInfo(StorageInfo storage) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BalancerBandwidthCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BalancerBandwidthCommandWritable.java new file mode 100644 index 00000000000..d6a2b0648f8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BalancerBandwidthCommandWritable.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * Balancer bandwidth command instructs each datanode to change its value for + * the max amount of network bandwidth it may use during the block balancing + * operation. + * + * The Balancer Bandwidth Command contains the new bandwidth value as its + * payload. The bandwidth value is in bytes per second. + */ +public class BalancerBandwidthCommandWritable extends DatanodeCommandWritable { + private final static long BBC_DEFAULTBANDWIDTH = 0L; + + private long bandwidth; + + /** + * Balancer Bandwidth Command constructor. Sets bandwidth to 0. + */ + BalancerBandwidthCommandWritable() { + this(BBC_DEFAULTBANDWIDTH); + } + + /** + * Balancer Bandwidth Command constructor. + * @param bandwidth Blanacer bandwidth in bytes per second. + */ + public BalancerBandwidthCommandWritable(long bandwidth) { + super(DatanodeWireProtocol.DNA_BALANCERBANDWIDTHUPDATE); + this.bandwidth = bandwidth; + } + + /** + * Get current value of the max balancer bandwidth in bytes per second. + * @return bandwidth Blanacer bandwidth in bytes per second for this datanode. + */ + public long getBalancerBandwidthValue() { + return this.bandwidth; + } + + // /////////////////////////////////////////////// + // Writable + // /////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(BalancerBandwidthCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new BalancerBandwidthCommandWritable(); + } + }); + } + + /** + * Writes the bandwidth payload to the Balancer Bandwidth Command packet. + * @param out DataOutput stream used for writing commands to the datanode. + * @throws IOException + */ + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeLong(this.bandwidth); + } + + /** + * Reads the bandwidth payload from the Balancer Bandwidth Command packet. + * @param in DataInput stream used for reading commands to the datanode. + * @throws IOException + */ + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.bandwidth = in.readLong(); + } + + @Override + public DatanodeCommand convert() { + return new BalancerBandwidthCommand(bandwidth); + } + + public static DatanodeCommandWritable convert(BalancerBandwidthCommand cmd) { + return new BalancerBandwidthCommandWritable(cmd.getBalancerBandwidthValue()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BlockCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BlockCommandWritable.java new file mode 100644 index 00000000000..990b235c9fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BlockCommandWritable.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeInfoWritable; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/**************************************************** + * A BlockCommand is an instruction to a datanode regarding some blocks under + * its control. It tells the DataNode to either invalidate a set of indicated + * blocks, or to copy a set of indicated blocks to another DataNode. + * + ****************************************************/ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockCommandWritable extends DatanodeCommandWritable { + + /** + * This constant is used to indicate that the block deletion does not need + * explicit ACK from the datanode. When a block is put into the list of blocks + * to be deleted, it's size is set to this constant. We assume that no block + * would actually have this size. Otherwise, we would miss ACKs for blocks + * with such size. Positive number is used for compatibility reasons. + */ + public static final long NO_ACK = Long.MAX_VALUE; + + String poolId; + BlockWritable blocks[]; + DatanodeInfoWritable targets[][]; + + public BlockCommandWritable() { + } + + /** + * Create BlockCommand for the given action + * + * @param blocks blocks related to the action + */ + public BlockCommandWritable(int action, String poolId, BlockWritable[] blocks, + DatanodeInfoWritable[][] targets) { + super(action); + this.poolId = poolId; + this.blocks = blocks; + this.targets = targets; + } + + // ///////////////////////////////////////// + // Writable + // ///////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(BlockCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new BlockCommandWritable(); + } + }); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, poolId); + out.writeInt(blocks.length); + for (int i = 0; i < blocks.length; i++) { + blocks[i].write(out); + } + out.writeInt(targets.length); + for (int i = 0; i < targets.length; i++) { + out.writeInt(targets[i].length); + for (int j = 0; j < targets[i].length; j++) { + targets[i][j].write(out); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.poolId = Text.readString(in); + this.blocks = new BlockWritable[in.readInt()]; + for (int i = 0; i < blocks.length; i++) { + blocks[i] = new BlockWritable(); + blocks[i].readFields(in); + } + + this.targets = new DatanodeInfoWritable[in.readInt()][]; + for (int i = 0; i < targets.length; i++) { + this.targets[i] = new DatanodeInfoWritable[in.readInt()]; + for (int j = 0; j < targets[i].length; j++) { + targets[i][j] = new DatanodeInfoWritable(); + targets[i][j].readFields(in); + } + } + } + + @Override + public BlockCommand convert() { + DatanodeInfo[][] dinfo = new DatanodeInfo[targets.length][]; + for (int i = 0; i < targets.length; i++) { + dinfo[i] = DatanodeInfoWritable.convertDatanodeInfo(targets[i]); + } + return new BlockCommand(getAction(), poolId, BlockWritable.convert(blocks), + dinfo); + } + + public static BlockCommandWritable convert(BlockCommand cmd) { + if (cmd == null) return null; + DatanodeInfo[][] targets = cmd.getTargets(); + DatanodeInfoWritable[][] dinfo = new DatanodeInfoWritable[targets.length][]; + for (int i = 0; i < targets.length; i++) { + dinfo[i] = DatanodeInfoWritable.convertDatanodeInfo(targets[i]); + } + return new BlockCommandWritable(cmd.getAction(), cmd.getBlockPoolId(), + BlockWritable.convert(cmd.getBlocks()), dinfo); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BlockRecoveryCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BlockRecoveryCommandWritable.java new file mode 100644 index 00000000000..ef7a6dbb23c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/BlockRecoveryCommandWritable.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * BlockRecoveryCommand is an instruction to a data-node to recover the + * specified blocks. + * + * The data-node that receives this command treats itself as a primary data-node + * in the recover process. + * + * Block recovery is identified by a recoveryId, which is also the new + * generation stamp, which the block will have after the recovery succeeds. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockRecoveryCommandWritable extends DatanodeCommandWritable { + Collection recoveringBlocks; + + /** + * Create empty BlockRecoveryCommand. + */ + public BlockRecoveryCommandWritable() { } + + /** + * Create BlockRecoveryCommand with the specified capacity for recovering + * blocks. + */ + public BlockRecoveryCommandWritable(int capacity) { + this(new ArrayList(capacity)); + } + + public BlockRecoveryCommandWritable(Collection blocks) { + super(DatanodeWireProtocol.DNA_RECOVERBLOCK); + recoveringBlocks = blocks; + } + + // ///////////////////////////////////////// + // Writable + // ///////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(BlockRecoveryCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new BlockRecoveryCommandWritable(); + } + }); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(recoveringBlocks.size()); + for (RecoveringBlockWritable block : recoveringBlocks) { + block.write(out); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + int numBlocks = in.readInt(); + recoveringBlocks = new ArrayList(numBlocks); + for (int i = 0; i < numBlocks; i++) { + RecoveringBlockWritable b = new RecoveringBlockWritable(); + b.readFields(in); + recoveringBlocks.add(b); + } + } + + @Override + public DatanodeCommand convert() { + Collection blks = + new ArrayList(recoveringBlocks.size()); + for (RecoveringBlockWritable b : recoveringBlocks) { + blks.add(b.convert()); + } + return new BlockRecoveryCommand(blks); + } + + public static BlockRecoveryCommandWritable convert(BlockRecoveryCommand cmd) { + if (cmd == null) return null; + Collection blks = + new ArrayList(cmd.getRecoveringBlocks().size()); + for (RecoveringBlock b : cmd.getRecoveringBlocks()) { + blks.add(RecoveringBlockWritable.convert(b)); + } + return new BlockRecoveryCommandWritable(blks); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeCommandHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeCommandHelper.java new file mode 100644 index 00000000000..b2e585b99ab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeCommandHelper.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; +import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; + +/** + * Class for translating DatanodeCommandWritable to and from DatanodeCommand. + */ +class DatanodeCommandHelper { + public static final Log LOG = LogFactory.getLog(DatanodeCommandHelper.class); + + private DatanodeCommandHelper() { + /* Private constructor to prevent instantiation */ + } + + static DatanodeCommand convert(DatanodeCommandWritable cmd) { + return cmd.convert(); + } + + /** + * Given a subclass of {@link DatanodeCommand} return the corresponding + * writable type. + */ + static DatanodeCommandWritable convert(DatanodeCommand cmd) { + switch (cmd.getAction()) { + case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: + return BalancerBandwidthCommandWritable + .convert((BalancerBandwidthCommand) cmd); + + case DatanodeProtocol.DNA_FINALIZE: + return FinalizeCommandWritable.convert((FinalizeCommand)cmd); + case DatanodeProtocol.DNA_ACCESSKEYUPDATE: + return KeyUpdateCommandWritable.convert((KeyUpdateCommand)cmd); + case DatanodeProtocol.DNA_REGISTER: + return RegisterCommandWritable.REGISTER; + case DatanodeProtocol.DNA_TRANSFER: + case DatanodeProtocol.DNA_INVALIDATE: + return BlockCommandWritable.convert((BlockCommand)cmd); + case UpgradeCommand.UC_ACTION_START_UPGRADE: + return UpgradeCommandWritable.convert((UpgradeCommand)cmd); + case DatanodeProtocol.DNA_RECOVERBLOCK: + return BlockRecoveryCommandWritable.convert((BlockRecoveryCommand)cmd); + default: + LOG.warn("Unknown DatanodeCommand action - " + cmd.getAction()); + return null; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeCommandWritable.java new file mode 100644 index 00000000000..b3686402aec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeCommandWritable.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; + +/** + * Base class for data-node command. + * Issued by the name-node to notify data-nodes what should be done. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class DatanodeCommandWritable extends ServerCommandWritable { + public DatanodeCommandWritable() { + super(); + } + + DatanodeCommandWritable(int action) { + super(action); + } + + /** Method to convert from writable type to internal type */ + public abstract DatanodeCommand convert(); + + public static DatanodeCommandWritable[] convert(DatanodeCommand[] cmds) { + DatanodeCommandWritable[] ret = new DatanodeCommandWritable[cmds.length]; + for (int i = 0; i < cmds.length; i++) { + ret[i] = DatanodeCommandHelper.convert(cmds[i]); + } + return ret; + } + + public static DatanodeCommand[] convert(DatanodeCommandWritable[] cmds) { + if (cmds == null) return null; + DatanodeCommand[] ret = new DatanodeCommand[cmds.length]; + for (int i = 0; i < cmds.length; i++) { + ret[i] = cmds[i].convert(); + } + return ret; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeProtocolServerSideTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeProtocolServerSideTranslatorR23.java new file mode 100644 index 00000000000..2c806afd449 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeProtocolServerSideTranslatorR23.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.NamespaceInfoWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +/** + * This class is used on the server side. Calls come across the wire for the + * protocol family of Release 23 onwards. This class translates the R23 data + * types to the native data types used inside the NN as specified in the generic + * DatanodeProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class DatanodeProtocolServerSideTranslatorR23 implements + DatanodeWireProtocol { + final private DatanodeProtocol server; + + /** + * Constructor + * @param server - the NN server + * @throws IOException + */ + public DatanodeProtocolServerSideTranslatorR23(DatanodeProtocol server) + throws IOException { + this.server = server; + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client's call will never reach here. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link DatanodeProtocol} + * + */ + if (!protocol.equals(RPC.getProtocolName(DatanodeWireProtocol.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(DatanodeWireProtocol.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + DatanodeWireProtocol.versionID, DatanodeWireProtocol.class); + } + + @Override + public ProtocolSignatureWritable + getProtocolSignature2( + String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link DatanodeProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(RPC.getProtocolName(DatanodeWireProtocol.class))) { + return DatanodeWireProtocol.versionID; + } + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(DatanodeWireProtocol.class) + + ". The following requested protocol is unknown: " + protocol); + } + + @Override + public DatanodeRegistrationWritable registerDatanode( + DatanodeRegistrationWritable registration) throws IOException { + return DatanodeRegistrationWritable.convert(server + .registerDatanode(registration.convert())); + } + + @Override + public DatanodeCommandWritable[] sendHeartbeat( + DatanodeRegistrationWritable registration, long capacity, long dfsUsed, + long remaining, long blockPoolUsed, int xmitsInProgress, + int xceiverCount, int failedVolumes) throws IOException { + return DatanodeCommandWritable.convert(server.sendHeartbeat( + registration.convert(), capacity, dfsUsed, remaining, blockPoolUsed, + xmitsInProgress, xceiverCount, failedVolumes)); + } + + @Override + public DatanodeCommandWritable blockReport( + DatanodeRegistrationWritable registration, String poolId, long[] blocks) + throws IOException { + return DatanodeCommandHelper.convert(server.blockReport( + registration.convert(), poolId, blocks)); + } + + @Override + public void blockReceivedAndDeleted( + DatanodeRegistrationWritable registration, String poolId, + ReceivedDeletedBlockInfoWritable[] receivedAndDeletedBlocks) + throws IOException { + server.blockReceivedAndDeleted(registration.convert(), poolId, + ReceivedDeletedBlockInfoWritable.convert(receivedAndDeletedBlocks)); + } + + @Override + public void errorReport(DatanodeRegistrationWritable registration, + int errorCode, String msg) throws IOException { + server.errorReport(registration.convert(), errorCode, msg); + } + + @Override + public NamespaceInfoWritable versionRequest() throws IOException { + return NamespaceInfoWritable.convert(server.versionRequest()); + } + + @Override + public UpgradeCommandWritable processUpgradeCommand( + UpgradeCommandWritable comm) throws IOException { + return UpgradeCommandWritable.convert(server.processUpgradeCommand(comm.convert())); + } + + @Override + public void reportBadBlocks(LocatedBlockWritable[] blocks) throws IOException { + server.reportBadBlocks(LocatedBlockWritable.convertLocatedBlock(blocks)); + } + + @Override + public void commitBlockSynchronization(ExtendedBlockWritable block, + long newgenerationstamp, long newlength, boolean closeFile, + boolean deleteblock, DatanodeIDWritable[] newtargets) throws IOException { + server.commitBlockSynchronization( + ExtendedBlockWritable.convertExtendedBlock(block), newgenerationstamp, + newlength, closeFile, deleteblock, + DatanodeIDWritable.convertDatanodeID(newtargets)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeProtocolTranslatorR23.java new file mode 100644 index 00000000000..1664940474b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeProtocolTranslatorR23.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * This class forwards NN's ClientProtocol calls as RPC calls to the NN server + * while translating from the parameter types used in ClientProtocol to those + * used in protocolR23Compatile.*. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class DatanodeProtocolTranslatorR23 implements + DatanodeProtocol, Closeable { + final private DatanodeWireProtocol rpcProxy; + + private static DatanodeWireProtocol createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf, + UserGroupInformation ugi) throws IOException { + return RPC.getProxy(DatanodeWireProtocol.class, + DatanodeWireProtocol.versionID, nameNodeAddr, ugi, conf, + NetUtils.getSocketFactory(conf, DatanodeWireProtocol.class)); + } + + /** Create a {@link NameNode} proxy */ + static DatanodeWireProtocol createNamenodeWithRetry( + DatanodeWireProtocol rpcNamenode) { + RetryPolicy createPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(5, + HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); + + Map, RetryPolicy> remoteExceptionToPolicyMap = + new HashMap, RetryPolicy>(); + remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, + createPolicy); + + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(RemoteException.class, RetryPolicies + .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, + remoteExceptionToPolicyMap)); + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + + methodNameToPolicyMap.put("create", methodPolicy); + + return (DatanodeWireProtocol) RetryProxy.create( + DatanodeWireProtocol.class, rpcNamenode, methodNameToPolicyMap); + } + + public DatanodeProtocolTranslatorR23(InetSocketAddress nameNodeAddr, + Configuration conf) throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi)); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocol, clientVersion, clientMethodsHash)); + } + + @Override + public DatanodeRegistration registerDatanode(DatanodeRegistration registration) + throws IOException { + return rpcProxy.registerDatanode( + DatanodeRegistrationWritable.convert(registration)).convert(); + } + + @Override + public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes) + throws IOException { + return DatanodeCommandWritable.convert(rpcProxy.sendHeartbeat( + DatanodeRegistrationWritable.convert(registration), capacity, + dfsUsed, remaining, blockPoolUsed, xmitsInProgress, xceiverCount, + failedVolumes)); + } + + @Override + public DatanodeCommand blockReport(DatanodeRegistration registration, + String poolId, long[] blocks) throws IOException { + return rpcProxy.blockReport( + DatanodeRegistrationWritable.convert(registration), poolId, blocks) + .convert(); + } + + @Override + public void blockReceivedAndDeleted(DatanodeRegistration registration, + String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) + throws IOException { + rpcProxy.blockReceivedAndDeleted( + DatanodeRegistrationWritable.convert(registration), poolId, + ReceivedDeletedBlockInfoWritable.convert(receivedAndDeletedBlocks)); + } + + @Override + public void errorReport(DatanodeRegistration registration, int errorCode, + String msg) throws IOException { + rpcProxy.errorReport(DatanodeRegistrationWritable.convert(registration), + errorCode, msg); + } + + @Override + public NamespaceInfo versionRequest() throws IOException { + return rpcProxy.versionRequest().convert(); + } + + @Override + public UpgradeCommand processUpgradeCommand(UpgradeCommand cmd) + throws IOException { + return rpcProxy.processUpgradeCommand(UpgradeCommandWritable.convert(cmd)) + .convert(); + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + rpcProxy.reportBadBlocks(LocatedBlockWritable.convertLocatedBlock(blocks)); + } + + @Override + public void commitBlockSynchronization(ExtendedBlock block, + long newgenerationstamp, long newlength, boolean closeFile, + boolean deleteblock, DatanodeID[] newtargets) throws IOException { + rpcProxy.commitBlockSynchronization( + ExtendedBlockWritable.convertExtendedBlock(block), newgenerationstamp, + newlength, closeFile, deleteblock, + DatanodeIDWritable.convertDatanodeID(newtargets)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeRegistrationWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeRegistrationWritable.java new file mode 100644 index 00000000000..e2bc2d82a16 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeRegistrationWritable.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExportedBlockKeysWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.StorageInfoWritable; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * DatanodeRegistration class contains all information the name-node needs + * to identify and verify a data-node when it contacts the name-node. + * This information is sent by data-node with each communication request. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DatanodeRegistrationWritable implements Writable { + static { // register a ctor + WritableFactories.setFactory + (DatanodeRegistrationWritable.class, + new WritableFactory() { + public Writable newInstance() { return new DatanodeRegistrationWritable(); } + }); + } + + private DatanodeIDWritable datanodeId; + private StorageInfoWritable storageInfo; + private ExportedBlockKeysWritable exportedKeys; + + /** + * Default constructor. + */ + public DatanodeRegistrationWritable() { + this("", new StorageInfo(), new ExportedBlockKeys()); + } + + /** + * Create DatanodeRegistration + */ + public DatanodeRegistrationWritable(String nodeName, StorageInfo info, + ExportedBlockKeys keys) { + this.datanodeId = new DatanodeIDWritable(nodeName); + this.storageInfo = StorageInfoWritable.convert(info); + this.exportedKeys = ExportedBlockKeysWritable.convert(keys); + } + + ///////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////// + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + datanodeId.write(out); + + //TODO: move it to DatanodeID once HADOOP-2797 has been committed + out.writeShort(datanodeId.ipcPort); + + storageInfo.write(out); + exportedKeys.write(out); + } + + /** {@inheritDoc} */ + public void readFields(DataInput in) throws IOException { + datanodeId.readFields(in); + + //TODO: move it to DatanodeID once HADOOP-2797 has been committed + datanodeId.ipcPort = in.readShort() & 0x0000ffff; + + storageInfo.readFields(in); + exportedKeys.readFields(in); + } + + public DatanodeRegistration convert() { + DatanodeRegistration dnReg = new DatanodeRegistration(datanodeId.name, + storageInfo.convert(), exportedKeys.convert()); + dnReg.setIpcPort(datanodeId.ipcPort); + return dnReg; + } + + public static DatanodeRegistrationWritable convert(DatanodeRegistration dnReg) { + if (dnReg == null) return null; + DatanodeRegistrationWritable ret = new DatanodeRegistrationWritable( + dnReg.getName(), dnReg.storageInfo, dnReg.exportedKeys); + ret.datanodeId.ipcPort = dnReg.ipcPort; + return ret; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeWireProtocol.java new file mode 100644 index 00000000000..f630053bf9a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/DatanodeWireProtocol.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.IOException; + +import org.apache.avro.reflect.Nullable; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; +import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.NamespaceInfoWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; + +/********************************************************************** + * Protocol that a DFS datanode uses to communicate with the NameNode. + * It's used to upload current load information and block reports. + * + * The only way a NameNode can communicate with a DataNode is by + * returning values from these functions. + * + **********************************************************************/ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, + clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) +@InterfaceAudience.Private +public interface DatanodeWireProtocol extends VersionedProtocol { + /** + * The rules for changing this protocol are the same as that for + * {@link ClientNamenodeWireProtocol} - see that java file for details. + */ + public static final long versionID = 28L; + + // error code + final static int NOTIFY = 0; + final static int DISK_ERROR = 1; // there are still valid volumes on DN + final static int INVALID_BLOCK = 2; + final static int FATAL_DISK_ERROR = 3; // no valid volumes left on DN + + /** + * Determines actions that data node should perform + * when receiving a datanode command. + */ + final static int DNA_UNKNOWN = 0; // unknown action + final static int DNA_TRANSFER = 1; // transfer blocks to another datanode + final static int DNA_INVALIDATE = 2; // invalidate blocks + final static int DNA_SHUTDOWN = 3; // shutdown node + final static int DNA_REGISTER = 4; // re-register + final static int DNA_FINALIZE = 5; // finalize previous upgrade + final static int DNA_RECOVERBLOCK = 6; // request a block recovery + final static int DNA_ACCESSKEYUPDATE = 7; // update access key + final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth + + /** + * Register Datanode. + * @return updated {@link DatanodeRegistrationWritable}, which contains + * new storageID if the datanode did not have one and + * registration ID for further communication. + */ + public DatanodeRegistrationWritable registerDatanode( + DatanodeRegistrationWritable registration) throws IOException; + /** + * sendHeartbeat() tells the NameNode that the DataNode is still + * alive and well. Includes some status info, too. + * It also gives the NameNode a chance to return + * an array of "DatanodeCommand" objects. + * A DatanodeCommand tells the DataNode to invalidate local block(s), + * or to copy them to other DataNodes, etc. + * @param registration datanode registration information + * @param capacity total storage capacity available at the datanode + * @param dfsUsed storage used by HDFS + * @param remaining remaining storage available for HDFS + * @param blockPoolUsed storage used by the block pool + * @param xmitsInProgress number of transfers from this datanode to others + * @param xceiverCount number of active transceiver threads + * @param failedVolumes number of failed volumes + * @throws IOException on error + */ + @Nullable + public DatanodeCommandWritable[] sendHeartbeat( + DatanodeRegistrationWritable registration, long capacity, long dfsUsed, + long remaining, long blockPoolUsed, int xmitsInProgress, + int xceiverCount, int failedVolumes) throws IOException; + + /** + * blockReport() tells the NameNode about all the locally-stored blocks. + * The NameNode returns an array of Blocks that have become obsolete + * and should be deleted. This function is meant to upload *all* + * the locally-stored blocks. It's invoked upon startup and then + * infrequently afterwards. + * @param registration + * @param poolId - the block pool ID for the blocks + * @param blocks - the block list as an array of longs. + * Each block is represented as 2 longs. + * This is done instead of Block[] to reduce memory used by block reports. + * + * @return - the next command for DN to process. + * @throws IOException + */ + public DatanodeCommandWritable blockReport( + DatanodeRegistrationWritable registration, String poolId, long[] blocks) + throws IOException; + + /** + * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about + * recently-received and -deleted block data. + * + * For the case of received blocks, a hint for preferred replica to be + * deleted when there is any excessive blocks is provided. + * For example, whenever client code + * writes a new Block here, or another DataNode copies a Block to + * this DataNode, it will call blockReceived(). + */ + public void blockReceivedAndDeleted( + DatanodeRegistrationWritable registration, String poolId, + ReceivedDeletedBlockInfoWritable[] receivedAndDeletedBlocks) + throws IOException; + + /** + * errorReport() tells the NameNode about something that has gone + * awry. Useful for debugging. + */ + public void errorReport(DatanodeRegistrationWritable registration, + int errorCode, String msg) throws IOException; + + public NamespaceInfoWritable versionRequest() throws IOException; + + /** + * This is a very general way to send a command to the name-node during + * distributed upgrade process. + * + * The generosity is because the variety of upgrade commands is unpredictable. + * The reply from the name-node is also received in the form of an upgrade + * command. + * + * @return a reply in the form of an upgrade command + */ + UpgradeCommandWritable processUpgradeCommand(UpgradeCommandWritable comm) + throws IOException; + + /** + * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[])} + * } + */ + public void reportBadBlocks(LocatedBlockWritable[] blocks) throws IOException; + + /** + * Commit block synchronization in lease recovery + */ + public void commitBlockSynchronization(ExtendedBlockWritable block, + long newgenerationstamp, long newlength, boolean closeFile, + boolean deleteblock, DatanodeIDWritable[] newtargets) throws IOException; + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/FinalizeCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/FinalizeCommandWritable.java new file mode 100644 index 00000000000..2de91ad9aea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/FinalizeCommandWritable.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; +import org.apache.hadoop.io.WritableUtils; + +/** + * A FinalizeCommand is an instruction from namenode to finalize the previous + * upgrade to a datanode + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FinalizeCommandWritable extends DatanodeCommandWritable { + // ///////////////////////////////////////// + // Writable + // ///////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(FinalizeCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new FinalizeCommandWritable(); + } + }); + } + + String blockPoolId; + + private FinalizeCommandWritable() { + this(null); + } + + public FinalizeCommandWritable(String bpid) { + super(DatanodeWireProtocol.DNA_FINALIZE); + blockPoolId = bpid; + } + + public String getBlockPoolId() { + return blockPoolId; + } + + @Override + public void readFields(DataInput in) throws IOException { + blockPoolId = WritableUtils.readString(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeString(out, blockPoolId); + } + + @Override + public DatanodeCommand convert() { + return new FinalizeCommand(blockPoolId); + } + + public static FinalizeCommandWritable convert(FinalizeCommand cmd) { + if (cmd == null) { + return null; + } + return new FinalizeCommandWritable(cmd.getBlockPoolId()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/KeyUpdateCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/KeyUpdateCommandWritable.java new file mode 100644 index 00000000000..2de6b21f8e7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/KeyUpdateCommandWritable.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExportedBlockKeysWritable; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class KeyUpdateCommandWritable extends DatanodeCommandWritable { + private ExportedBlockKeysWritable keys; + + KeyUpdateCommandWritable() { + this(new ExportedBlockKeysWritable()); + } + + public KeyUpdateCommandWritable(ExportedBlockKeysWritable keys) { + super(DatanodeWireProtocol.DNA_ACCESSKEYUPDATE); + this.keys = keys; + } + + public ExportedBlockKeysWritable getExportedKeys() { + return this.keys; + } + + // /////////////////////////////////////////////// + // Writable + // /////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(KeyUpdateCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new KeyUpdateCommandWritable(); + } + }); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + keys.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + keys.readFields(in); + } + + @Override + public DatanodeCommand convert() { + return new KeyUpdateCommand(keys.convert()); + } + + public static KeyUpdateCommandWritable convert(KeyUpdateCommand cmd) { + if (cmd == null) { + return null; + } + return new KeyUpdateCommandWritable(ExportedBlockKeysWritable.convert(cmd + .getExportedKeys())); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java new file mode 100644 index 00000000000..5d37890c7fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * A data structure to store Block and delHints together, used to send + * received/deleted ACKs. + */ +public class ReceivedDeletedBlockInfoWritable implements Writable { + BlockWritable block; + String delHints; + + public final static String TODELETE_HINT = "-"; + + public ReceivedDeletedBlockInfoWritable() { + } + + public ReceivedDeletedBlockInfoWritable(BlockWritable blk, String delHints) { + this.block = blk; + this.delHints = delHints; + } + + @Override + public void write(DataOutput out) throws IOException { + this.block.write(out); + Text.writeString(out, this.delHints); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.block = new BlockWritable(); + this.block.readFields(in); + this.delHints = Text.readString(in); + } + + public String toString() { + return block.toString() + ", delHint: " + delHints; + } + + public static ReceivedDeletedBlockInfo[] convert( + ReceivedDeletedBlockInfoWritable[] rdBlocks) { + ReceivedDeletedBlockInfo[] ret = + new ReceivedDeletedBlockInfo[rdBlocks.length]; + for (int i = 0; i < rdBlocks.length; i++) { + ret[i] = rdBlocks[i].convert(); + } + return ret; + } + + public static ReceivedDeletedBlockInfoWritable[] convert( + ReceivedDeletedBlockInfo[] blocks) { + ReceivedDeletedBlockInfoWritable[] ret = + new ReceivedDeletedBlockInfoWritable[blocks.length]; + for (int i = 0; i < blocks.length; i++) { + ret[i] = convert(blocks[i]); + } + return ret; + } + + public ReceivedDeletedBlockInfo convert() { + return new ReceivedDeletedBlockInfo(block.convert(), delHints); + } + + public static ReceivedDeletedBlockInfoWritable convert( + ReceivedDeletedBlockInfo b) { + if (b == null) return null; + return new ReceivedDeletedBlockInfoWritable(BlockWritable.convert(b + .getBlock()), b.getDelHints()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/RegisterCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/RegisterCommandWritable.java new file mode 100644 index 00000000000..ae828d8f183 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/RegisterCommandWritable.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * A RegisterCommand is an instruction to a datanode to register with the + * namenode. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RegisterCommandWritable extends DatanodeCommandWritable { + public static final RegisterCommandWritable REGISTER = + new RegisterCommandWritable(); + + // ///////////////////////////////////////// + // Writable + // ///////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(RegisterCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new RegisterCommandWritable(); + } + }); + } + + public RegisterCommandWritable() { + super(DatanodeWireProtocol.DNA_REGISTER); + } + + @Override + public void readFields(DataInput in) { /* Nothing to read */ + } + + @Override + public void write(DataOutput out) { /* Nothing to write */ + } + + @Override + public DatanodeCommand convert() { + return RegisterCommand.REGISTER; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ServerCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ServerCommandWritable.java new file mode 100644 index 00000000000..e4dcfc10c9e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ServerCommandWritable.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; + +/** + * Base class for a server command. + * Issued by the name-node to notify other servers what should be done. + * Commands are defined by actions defined in respective protocols. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class ServerCommandWritable implements Writable { + private int action; + + /** + * Unknown server command constructor. + * Creates a command with action 0. + */ + public ServerCommandWritable() { + this(0); + } + + /** + * Create a command for the specified action. + * Actions are protocol specific. + * @param action + */ + public ServerCommandWritable(int action) { + this.action = action; + } + + /** + * Get server command action. + * @return action code. + */ + public int getAction() { + return this.action; + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.action); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.action = in.readInt(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/UpgradeCommandWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/UpgradeCommandWritable.java new file mode 100644 index 00000000000..ed3a70f0773 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/UpgradeCommandWritable.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * This as a generic distributed upgrade command. + * + * During the upgrade cluster components send upgrade commands to each other + * in order to obtain or share information with them. + * It is supposed that each upgrade defines specific upgrade command by + * deriving them from this class. + * The upgrade command contains version of the upgrade, which is verified + * on the receiving side and current status of the upgrade. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class UpgradeCommandWritable extends DatanodeCommandWritable { + final static int UC_ACTION_UNKNOWN = DatanodeWireProtocol.DNA_UNKNOWN; + public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status + public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade + + private int version; + private short upgradeStatus; + + public UpgradeCommandWritable() { + super(UC_ACTION_UNKNOWN); + this.version = 0; + this.upgradeStatus = 0; + } + + public UpgradeCommandWritable(int action, int version, short status) { + super(action); + this.version = version; + this.upgradeStatus = status; + } + + public int getVersion() { + return this.version; + } + + public short getCurrentStatus() { + return this.upgradeStatus; + } + + ///////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory + (UpgradeCommandWritable.class, + new WritableFactory() { + public Writable newInstance() { return new UpgradeCommandWritable(); } + }); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(this.version); + out.writeShort(this.upgradeStatus); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.version = in.readInt(); + this.upgradeStatus = in.readShort(); + } + + @Override + public UpgradeCommand convert() { + return new UpgradeCommand(getAction(), version, upgradeStatus); + } + + public static UpgradeCommandWritable convert(UpgradeCommand cmd) { + if (cmd == null) return null; + return new UpgradeCommandWritable(cmd.getAction(), cmd.getVersion(), + cmd.getCurrentStatus()); + } +}