diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 75e97ff11b1..23fb55b3de4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -176,6 +176,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4035. LightWeightGSet and LightWeightHashSet increment a volatile without synchronization. (eli) + + HDFS-4363. Combine PBHelper and HdfsProtoUtil and remove redundant + methods. (suresh) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 9a34263ad86..2b6bb264ed5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -116,7 +116,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; @@ -130,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -364,7 +364,7 @@ public class DFSClient implements java.io.Closeable { /** * Same as this(nameNodeUri, conf, null); - * @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics) + * @see #DFSClient(URI, Configuration, FileSystem.Statistics) */ public DFSClient(URI nameNodeUri, Configuration conf ) throws IOException { @@ -373,7 +373,7 @@ public class DFSClient implements java.io.Closeable { /** * Same as this(nameNodeUri, null, conf, stats); - * @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics) + * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics) */ public DFSClient(URI nameNodeUri, Configuration conf, FileSystem.Statistics stats) @@ -1183,7 +1183,7 @@ public class DFSClient implements java.io.Closeable { /** * Call {@link #create(String, FsPermission, EnumSet, short, long, - * Progressable, int)} with default permission + * Progressable, int, ChecksumOpt)} with default permission * {@link FsPermission#getDefault()}. * * @param src File name @@ -1294,7 +1294,7 @@ public class DFSClient implements java.io.Closeable { /** * Same as {{@link #create(String, FsPermission, EnumSet, short, long, - * Progressable, int)} except that the permission + * Progressable, int, ChecksumOpt)} except that the permission * is absolute (ie has already been masked with umask. */ public DFSOutputStream primitiveCreate(String src, @@ -1479,7 +1479,7 @@ public class DFSClient implements java.io.Closeable { } /** * Delete file or directory. - * See {@link ClientProtocol#delete(String)}. + * See {@link ClientProtocol#delete(String, boolean)}. */ @Deprecated public boolean delete(String src) throws IOException { @@ -1704,7 +1704,7 @@ public class DFSClient implements java.io.Closeable { new Sender(out).blockChecksum(block, lb.getBlockToken()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN @@ -1751,8 +1751,8 @@ public class DFSClient implements java.io.Closeable { md5.write(md5out); // read crc-type - final DataChecksum.Type ct = HdfsProtoUtil. - fromProto(checksumData.getCrcType()); + final DataChecksum.Type ct = PBHelper.convert(checksumData + .getCrcType()); if (i == 0) { // first block crcType = ct; } else if (crcType != DataChecksum.Type.MIXED @@ -1914,7 +1914,7 @@ public class DFSClient implements java.io.Closeable { * @param isChecked * If true, then check only active namenode's safemode status, else * check first namenode's status. - * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeActio,boolean) + * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean) */ public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{ return namenode.setSafeMode(action, isChecked); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f4c13272bbb..e1c1f3da4b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; @@ -66,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; @@ -888,7 +888,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { //ack BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } @@ -1078,7 +1078,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - HdfsProtoUtil.vintPrefixed(blockReplyStream)); + PBHelper.vintPrefixed(blockReplyStream)); pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 2bcd96e7644..dc449ee2f24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -39,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.io.IOUtils; @@ -392,7 +391,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - vintPrefixed(in)); + PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, sock, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index b9a5c76ec31..3450cd1524d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; - import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -43,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.net.SocketInputWrapper; @@ -401,7 +400,7 @@ public class RemoteBlockReader2 implements BlockReader { DataInputStream in = new DataInputStream(ioStreams.in); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - vintPrefixed(in)); + PBHelper.vintPrefixed(in)); checkSuccess(status, sock, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java deleted file mode 100644 index ab8b95534b9..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.util.ExactSizeInputStream; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; -import org.apache.hadoop.security.token.Token; - -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedInputStream; - -/** - * Utilities for converting to and from protocol buffers used in the - * HDFS wire protocol, as well as some generic utilities useful - * for dealing with protocol buffers. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class HdfsProtoUtil { - - //// Block Token //// - - public static TokenProto toProto(Token blockToken) { - return TokenProto.newBuilder() - .setIdentifier(ByteString.copyFrom(blockToken.getIdentifier())) - .setPassword(ByteString.copyFrom(blockToken.getPassword())) - .setKind(blockToken.getKind().toString()) - .setService(blockToken.getService().toString()) - .build(); - } - - public static Token fromProto(TokenProto proto) { - return new Token(proto.getIdentifier().toByteArray(), - proto.getPassword().toByteArray(), - new Text(proto.getKind()), - new Text(proto.getService())); - } - - //// Extended Block //// - - public static HdfsProtos.ExtendedBlockProto toProto(ExtendedBlock block) { - return HdfsProtos.ExtendedBlockProto.newBuilder() - .setBlockId(block.getBlockId()) - .setPoolId(block.getBlockPoolId()) - .setNumBytes(block.getNumBytes()) - .setGenerationStamp(block.getGenerationStamp()) - .build(); - } - - public static ExtendedBlock fromProto(HdfsProtos.ExtendedBlockProto proto) { - return new ExtendedBlock( - proto.getPoolId(), proto.getBlockId(), - proto.getNumBytes(), proto.getGenerationStamp()); - } - - //// DatanodeID //// - - private static HdfsProtos.DatanodeIDProto toProto( - DatanodeID dni) { - return HdfsProtos.DatanodeIDProto.newBuilder() - .setIpAddr(dni.getIpAddr()) - .setHostName(dni.getHostName()) - .setStorageID(dni.getStorageID()) - .setXferPort(dni.getXferPort()) - .setInfoPort(dni.getInfoPort()) - .setIpcPort(dni.getIpcPort()) - .build(); - } - - private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) { - return new DatanodeID( - idProto.getIpAddr(), - idProto.getHostName(), - idProto.getStorageID(), - idProto.getXferPort(), - idProto.getInfoPort(), - idProto.getIpcPort()); - } - - //// DatanodeInfo //// - - public static HdfsProtos.DatanodeInfoProto toProto(DatanodeInfo dni) { - return HdfsProtos.DatanodeInfoProto.newBuilder() - .setId(toProto((DatanodeID)dni)) - .setCapacity(dni.getCapacity()) - .setDfsUsed(dni.getDfsUsed()) - .setRemaining(dni.getRemaining()) - .setBlockPoolUsed(dni.getBlockPoolUsed()) - .setLastUpdate(dni.getLastUpdate()) - .setXceiverCount(dni.getXceiverCount()) - .setLocation(dni.getNetworkLocation()) - .setAdminState(HdfsProtos.DatanodeInfoProto.AdminState.valueOf( - dni.getAdminState().name())) - .build(); - } - - public static DatanodeInfo fromProto(HdfsProtos.DatanodeInfoProto dniProto) { - DatanodeInfo dniObj = new DatanodeInfo(fromProto(dniProto.getId()), - dniProto.getLocation()); - - dniObj.setCapacity(dniProto.getCapacity()); - dniObj.setDfsUsed(dniProto.getDfsUsed()); - dniObj.setRemaining(dniProto.getRemaining()); - dniObj.setBlockPoolUsed(dniProto.getBlockPoolUsed()); - dniObj.setLastUpdate(dniProto.getLastUpdate()); - dniObj.setXceiverCount(dniProto.getXceiverCount()); - dniObj.setAdminState(DatanodeInfo.AdminStates.valueOf( - dniProto.getAdminState().name())); - return dniObj; - } - - public static ArrayList toProtos( - DatanodeInfo[] dnInfos, int startIdx) { - ArrayList protos = - Lists.newArrayListWithCapacity(dnInfos.length); - for (int i = startIdx; i < dnInfos.length; i++) { - protos.add(toProto(dnInfos[i])); - } - return protos; - } - - public static DatanodeInfo[] fromProtos( - List targetsList) { - DatanodeInfo[] ret = new DatanodeInfo[targetsList.size()]; - int i = 0; - for (HdfsProtos.DatanodeInfoProto proto : targetsList) { - ret[i++] = fromProto(proto); - } - return ret; - } - - public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) { - return DataChecksum.Type.valueOf(type.getNumber()); - } - - public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) { - return HdfsProtos.ChecksumTypeProto.valueOf(type.id); - } - - public static InputStream vintPrefixed(final InputStream input) - throws IOException { - final int firstByte = input.read(); - if (firstByte == -1) { - throw new EOFException("Premature EOF: no length prefix available"); - } - - int size = CodedInputStream.readRawVarint32(firstByte, input); - assert size >= 0; - - return new ExactSizeInputStream(input, size); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java index ce81135a40c..229480b927b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index bdd4df06843..6be3810c918 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -21,12 +21,12 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -41,18 +41,16 @@ import org.apache.hadoop.util.DataChecksum; public abstract class DataTransferProtoUtil { static BlockConstructionStage fromProto( OpWriteBlockProto.BlockConstructionStage stage) { - return BlockConstructionStage.valueOf(BlockConstructionStage.class, - stage.name()); + return BlockConstructionStage.valueOf(stage.name()); } static OpWriteBlockProto.BlockConstructionStage toProto( BlockConstructionStage stage) { - return OpWriteBlockProto.BlockConstructionStage.valueOf( - stage.name()); + return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()); } public static ChecksumProto toProto(DataChecksum checksum) { - ChecksumTypeProto type = HdfsProtoUtil.toProto(checksum.getChecksumType()); + ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType()); // ChecksumType#valueOf never returns null return ChecksumProto.newBuilder() .setBytesPerChecksum(checksum.getBytesPerChecksum()) @@ -64,8 +62,7 @@ public abstract class DataTransferProtoUtil { if (proto == null) return null; int bytesPerChecksum = proto.getBytesPerChecksum(); - DataChecksum.Type type = HdfsProtoUtil.fromProto(proto.getType()); - + DataChecksum.Type type = PBHelper.convert(proto.getType()); return DataChecksum.newDataChecksum(type, bytesPerChecksum); } @@ -82,8 +79,8 @@ public abstract class DataTransferProtoUtil { static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token blockToken) { return BaseHeaderProto.newBuilder() - .setBlock(HdfsProtoUtil.toProto(blk)) - .setToken(HdfsProtoUtil.toProto(blockToken)) + .setBlock(PBHelper.convert(blk)) + .setToken(PBHelper.convert(blockToken)) .build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 94d9724b8c6..b743e29f217 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; import java.io.InputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index ff7a81babd7..b1edc20e3a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; import java.io.DataInputStream; @@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; /** Receiver */ @InterfaceAudience.Private @@ -85,8 +84,8 @@ public abstract class Receiver implements DataTransferProtocol { /** Receive OP_READ_BLOCK */ private void opReadBlock() throws IOException { OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); - readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()), - fromProto(proto.getHeader().getBaseHeader().getToken()), + readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), proto.getLen()); @@ -95,11 +94,11 @@ public abstract class Receiver implements DataTransferProtocol { /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); - writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()), - fromProto(proto.getHeader().getBaseHeader().getToken()), + writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - fromProtos(proto.getTargetsList()), - fromProto(proto.getSource()), + PBHelper.convert(proto.getTargetsList()), + PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), @@ -111,33 +110,33 @@ public abstract class Receiver implements DataTransferProtocol { private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); - transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()), - fromProto(proto.getHeader().getBaseHeader().getToken()), + transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - fromProtos(proto.getTargetsList())); + PBHelper.convert(proto.getTargetsList())); } /** Receive OP_REPLACE_BLOCK */ private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); - replaceBlock(fromProto(proto.getHeader().getBlock()), - fromProto(proto.getHeader().getToken()), + replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), - fromProto(proto.getSource())); + PBHelper.convert(proto.getSource())); } /** Receive OP_COPY_BLOCK */ private void opCopyBlock(DataInputStream in) throws IOException { OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in)); - copyBlock(fromProto(proto.getHeader().getBlock()), - fromProto(proto.getHeader().getToken())); + copyBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken())); } /** Receive OP_BLOCK_CHECKSUM */ private void opBlockChecksum(DataInputStream in) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in)); - blockChecksum(fromProto(proto.getHeader().getBlock()), - fromProto(proto.getHeader().getToken())); + blockChecksum(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken())); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 03e13080612..8184c500f8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; import java.io.DataOutput; @@ -37,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -105,7 +104,7 @@ public class Sender implements DataTransferProtocol { OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) - .addAllTargets(toProtos(targets, 1)) + .addAllTargets(PBHelper.convert(targets, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -114,7 +113,7 @@ public class Sender implements DataTransferProtocol { .setRequestedChecksum(checksumProto); if (source != null) { - proto.setSource(toProto(source)); + proto.setSource(PBHelper.convertDatanodeInfo(source)); } send(out, Op.WRITE_BLOCK, proto.build()); @@ -129,7 +128,7 @@ public class Sender implements DataTransferProtocol { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) - .addAllTargets(toProtos(targets, 0)) + .addAllTargets(PBHelper.convert(targets)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -143,7 +142,7 @@ public class Sender implements DataTransferProtocol { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) .setDelHint(delHint) - .setSource(toProto(source)) + .setSource(PBHelper.convertDatanodeInfo(source)) .build(); send(out, Op.REPLACE_BLOCK, proto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 86bf98b17a3..82eb8169cec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; -import java.util.Arrays; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -131,7 +130,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import com.google.protobuf.RpcController; @@ -494,10 +492,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements RpcController controller, GetDatanodeReportRequestProto req) throws ServiceException { try { - DatanodeInfoProto[] result = PBHelper.convert(server + List result = PBHelper.convert(server .getDatanodeReport(PBHelper.convert(req.getType()))); return GetDatanodeReportResponseProto.newBuilder() - .addAllDi(Arrays.asList(result)).build(); + .addAllDi(result).build(); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 3a89dd9bf54..74927ea4ad8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -282,7 +282,7 @@ public class ClientNamenodeProtocolTranslatorPB implements if (previous != null) req.setPrevious(PBHelper.convert(previous)); if (excludeNodes != null) - req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes))); + req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { @@ -300,8 +300,8 @@ public class ClientNamenodeProtocolTranslatorPB implements .newBuilder() .setSrc(src) .setBlk(PBHelper.convert(blk)) - .addAllExistings(Arrays.asList(PBHelper.convert(existings))) - .addAllExcludes(Arrays.asList(PBHelper.convert(excludes))) + .addAllExistings(PBHelper.convert(existings)) + .addAllExcludes(PBHelper.convert(excludes)) .setNumAdditionalNodes(numAdditionalNodes) .setClientName(clientName) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 38710b98ddc..1ef79b52c37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.protocolPB; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -40,10 +43,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; @@ -127,15 +130,20 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.util.ExactSizeInputStream; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; /** - * Utilities for converting protobuf classes to and from implementation classes. + * Utilities for converting protobuf classes to and from implementation classes + * and other helper utilities to help in dealing with protobuf. * * Note that when converting from an internal type to protobuf type, the * converter never return null for protobuf type. The check for internal type @@ -219,7 +227,8 @@ public class PBHelper { // Arrays of DatanodeId public static DatanodeIDProto[] convert(DatanodeID[] did) { - if (did == null) return null; + if (did == null) + return null; final int len = did.length; DatanodeIDProto[] result = new DatanodeIDProto[len]; for (int i = 0; i < len; ++i) { @@ -482,14 +491,26 @@ public class PBHelper { } return result; } + + public static List convert( + DatanodeInfo[] dnInfos) { + return convert(dnInfos, 0); + } - static public DatanodeInfoProto[] convert(DatanodeInfo[] di) { - if (di == null) return null; - DatanodeInfoProto[] result = new DatanodeInfoProto[di.length]; - for (int i = 0; i < di.length; i++) { - result[i] = PBHelper.convertDatanodeInfo(di[i]); + /** + * Copy from {@code dnInfos} to a target of list of same size starting at + * {@code startIdx}. + */ + public static List convert( + DatanodeInfo[] dnInfos, int startIdx) { + if (dnInfos == null) + return null; + ArrayList protos = Lists + .newArrayListWithCapacity(dnInfos.length); + for (int i = startIdx; i < dnInfos.length; i++) { + protos.add(convert(dnInfos[i])); } - return result; + return protos; } public static DatanodeInfo[] convert(List list) { @@ -694,7 +715,7 @@ public class PBHelper { DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; for (int i = 0; i < targets.length; i++) { ret[i] = DatanodeInfosProto.newBuilder() - .addAllDatanodes(Arrays.asList(PBHelper.convert(targets[i]))).build(); + .addAllDatanodes(PBHelper.convert(targets[i])).build(); } return Arrays.asList(ret); } @@ -963,7 +984,7 @@ public class PBHelper { fs.getFileBufferSize(), fs.getEncryptDataTransfer(), fs.getTrashInterval(), - HdfsProtoUtil.fromProto(fs.getChecksumType())); + PBHelper.convert(fs.getChecksumType())); } public static FsServerDefaultsProto convert(FsServerDefaults fs) { @@ -976,7 +997,7 @@ public class PBHelper { .setFileBufferSize(fs.getFileBufferSize()) .setEncryptDataTransfer(fs.getEncryptDataTransfer()) .setTrashInterval(fs.getTrashInterval()) - .setChecksumType(HdfsProtoUtil.toProto(fs.getChecksumType())) + .setChecksumType(PBHelper.convert(fs.getChecksumType())) .build(); } @@ -1314,4 +1335,24 @@ public class PBHelper { .setLayoutVersion(j.getLayoutVersion()) .setNamespaceID(j.getNamespaceId()).build(); } + + public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { + return DataChecksum.Type.valueOf(type.getNumber()); + } + + public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { + return HdfsProtos.ChecksumTypeProto.valueOf(type.id); + } + + public static InputStream vintPrefixed(final InputStream input) + throws IOException { + final int firstByte = input.read(); + if (firstByte == -1) { + throw new EOFException("Premature EOF: no length prefix available"); + } + + int size = CodedInputStream.readRawVarint32(firstByte, input); + assert size >= 0; + return new ExactSizeInputStream(input, size); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 4b961d72744..c211fbff990 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; + +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 92c58eb32c3..77747ab9deb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -99,7 +99,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; @@ -117,6 +116,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -1468,7 +1468,7 @@ public class DataNode extends Configured // read ack if (isClient) { DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - HdfsProtoUtil.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 31b896caf93..255fd35ff35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -42,7 +42,6 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; @@ -56,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -144,7 +144,7 @@ class DataXceiver extends Receiver implements Runnable { /** Return the datanode object. */ DataNode getDataNode() {return datanode;} - private OutputStream getOutputStream() throws IOException { + private OutputStream getOutputStream() { return socketOut; } @@ -284,7 +284,7 @@ class DataXceiver extends Receiver implements Runnable { // to respond with a Status enum. try { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( - HdfsProtoUtil.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (!stat.hasStatus()) { LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + "code after reading. Will close connection."); @@ -445,7 +445,7 @@ class DataXceiver extends Receiver implements Runnable { // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = - BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { @@ -606,7 +606,7 @@ class DataXceiver extends Receiver implements Runnable { .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType())) + .setCrcType(PBHelper.convert(checksum.getChecksumType())) ) .build() .writeDelimitedTo(out); @@ -765,7 +765,7 @@ class DataXceiver extends Receiver implements Runnable { // receive the response from the proxy BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( - HdfsProtoUtil.vintPrefixed(proxyReply)); + PBHelper.vintPrefixed(proxyReply)); if (copyResponse.getStatus() != SUCCESS) { if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java deleted file mode 100644 index 0a04e3c521f..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol; - -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; -import org.apache.hadoop.util.DataChecksum; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class TestHdfsProtoUtil { - @Test - public void testChecksumTypeProto() { - assertEquals(DataChecksum.Type.NULL, - HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL)); - assertEquals(DataChecksum.Type.CRC32, - HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32)); - assertEquals(DataChecksum.Type.CRC32C, - HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C)); - assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.NULL), - HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL); - assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32), - HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32); - assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32C), - HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 65a6ed0fe2d..23cd46e7323 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; @@ -70,6 +71,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; import org.junit.Test; import com.google.common.base.Joiner; @@ -471,4 +473,20 @@ public class TestPBHelper { } } } + + @Test + public void testChecksumTypeProto() { + assertEquals(DataChecksum.Type.NULL, + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL)); + assertEquals(DataChecksum.Type.CRC32, + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32)); + assertEquals(DataChecksum.Type.CRC32C, + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C)); + assertEquals(PBHelper.convert(DataChecksum.Type.NULL), + HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL); + assertEquals(PBHelper.convert(DataChecksum.Type.CRC32), + HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32); + assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C), + HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C); + } }