diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java deleted file mode 100644 index edf658ab4c0..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocolPB; - -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedInputStream; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.hdfs.util.ExactSizeInputStream; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; - -/** - * Utilities for converting protobuf classes to and from implementation classes - * and other helper utilities to help in dealing with protobuf. - * - * Note that when converting from an internal type to protobuf type, the - * converter never return null for protobuf type. The check for internal type - * being null must be done before calling the convert() method. - */ -public class PBHelperClient { - private PBHelperClient() { - /** Hidden constructor */ - } - - public static ByteString getByteString(byte[] bytes) { - return ByteString.copyFrom(bytes); - } - - public static ShmId convert(ShortCircuitShmIdProto shmId) { - return new ShmId(shmId.getHi(), shmId.getLo()); - } - - public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { - return DataChecksum.Type.valueOf(type.getNumber()); - } - - public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { - return HdfsProtos.ChecksumTypeProto.valueOf(type.id); - } - - public static ExtendedBlockProto convert(final ExtendedBlock b) { - if (b == null) return null; - return ExtendedBlockProto.newBuilder(). - setPoolId(b.getBlockPoolId()). - setBlockId(b.getBlockId()). - setNumBytes(b.getNumBytes()). - setGenerationStamp(b.getGenerationStamp()). - build(); - } - - public static TokenProto convert(Token tok) { - return TokenProto.newBuilder(). - setIdentifier(ByteString.copyFrom(tok.getIdentifier())). - setPassword(ByteString.copyFrom(tok.getPassword())). - setKind(tok.getKind().toString()). - setService(tok.getService().toString()).build(); - } - - public static ShortCircuitShmIdProto convert(ShmId shmId) { - return ShortCircuitShmIdProto.newBuilder(). - setHi(shmId.getHi()). - setLo(shmId.getLo()). - build(); - - } - - public static ShortCircuitShmSlotProto convert(SlotId slotId) { - return ShortCircuitShmSlotProto.newBuilder(). - setShmId(convert(slotId.getShmId())). - setSlotIdx(slotId.getSlotIdx()). - build(); - } - - public static DatanodeIDProto convert(DatanodeID dn) { - // For wire compatibility with older versions we transmit the StorageID - // which is the same as the DatanodeUuid. Since StorageID is a required - // field we pass the empty string if the DatanodeUuid is not yet known. - return DatanodeIDProto.newBuilder() - .setIpAddr(dn.getIpAddr()) - .setHostName(dn.getHostName()) - .setXferPort(dn.getXferPort()) - .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") - .setInfoPort(dn.getInfoPort()) - .setInfoSecurePort(dn.getInfoSecurePort()) - .setIpcPort(dn.getIpcPort()).build(); - } - - public static DatanodeInfoProto.AdminState convert( - final DatanodeInfo.AdminStates inAs) { - switch (inAs) { - case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; - case DECOMMISSION_INPROGRESS: - return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; - case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; - default: return DatanodeInfoProto.AdminState.NORMAL; - } - } - - public static DatanodeInfoProto convert(DatanodeInfo info) { - DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); - if (info.getNetworkLocation() != null) { - builder.setLocation(info.getNetworkLocation()); - } - builder - .setId(convert((DatanodeID) info)) - .setCapacity(info.getCapacity()) - .setDfsUsed(info.getDfsUsed()) - .setRemaining(info.getRemaining()) - .setBlockPoolUsed(info.getBlockPoolUsed()) - .setCacheCapacity(info.getCacheCapacity()) - .setCacheUsed(info.getCacheUsed()) - .setLastUpdate(info.getLastUpdate()) - .setLastUpdateMonotonic(info.getLastUpdateMonotonic()) - .setXceiverCount(info.getXceiverCount()) - .setAdminState(convert(info.getAdminState())) - .build(); - return builder.build(); - } - - public static List convert( - DatanodeInfo[] dnInfos) { - return convert(dnInfos, 0); - } - - /** - * Copy from {@code dnInfos} to a target of list of same size starting at - * {@code startIdx}. - */ - public static List convert( - DatanodeInfo[] dnInfos, int startIdx) { - if (dnInfos == null) - return null; - ArrayList protos = Lists - .newArrayListWithCapacity(dnInfos.length); - for (int i = startIdx; i < dnInfos.length; i++) { - protos.add(convert(dnInfos[i])); - } - return protos; - } - - public static List convert(boolean[] targetPinnings, int idx) { - List pinnings = new ArrayList<>(); - if (targetPinnings == null) { - pinnings.add(Boolean.FALSE); - } else { - for (; idx < targetPinnings.length; ++idx) { - pinnings.add(targetPinnings[idx]); - } - } - return pinnings; - } - - static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { - if (di == null) return null; - return convert(di); - } - - public static StorageTypeProto convertStorageType(StorageType type) { - switch(type) { - case DISK: - return StorageTypeProto.DISK; - case SSD: - return StorageTypeProto.SSD; - case ARCHIVE: - return StorageTypeProto.ARCHIVE; - case RAM_DISK: - return StorageTypeProto.RAM_DISK; - default: - throw new IllegalStateException( - "BUG: StorageType not found, type=" + type); - } - } - - public static StorageType convertStorageType(StorageTypeProto type) { - switch(type) { - case DISK: - return StorageType.DISK; - case SSD: - return StorageType.SSD; - case ARCHIVE: - return StorageType.ARCHIVE; - case RAM_DISK: - return StorageType.RAM_DISK; - default: - throw new IllegalStateException( - "BUG: StorageTypeProto not found, type=" + type); - } - } - - public static List convertStorageTypes( - StorageType[] types) { - return convertStorageTypes(types, 0); - } - - public static List convertStorageTypes( - StorageType[] types, int startIdx) { - if (types == null) { - return null; - } - final List protos = new ArrayList<>( - types.length); - for (int i = startIdx; i < types.length; ++i) { - protos.add(PBHelperClient.convertStorageType(types[i])); - } - return protos; - } - - public static InputStream vintPrefixed(final InputStream input) - throws IOException { - final int firstByte = input.read(); - if (firstByte == -1) { - throw new EOFException("Premature EOF: no length prefix available"); - } - - int size = CodedInputStream.readRawVarint32(firstByte, input); - assert size >= 0; - return new ExactSizeInputStream(input, size); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a52367bc093..9c538746a05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -482,8 +482,6 @@ Release 2.8.0 - UNRELEASED HDFS-8823. Move replication factor into individual blocks. (wheat9) - HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9) - OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index fec6b85ad27..8517173ded0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -592,7 +592,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { failureInjector.getSupportsReceiptVerification()); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); DomainSocket sock = peer.getDomainSocket(); failureInjector.injectRequestFileDescriptorsFailure(); switch (resp.getStatus()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 12646b5037a..a7b518ee07c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -149,7 +149,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; @@ -1928,7 +1928,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new Sender(out).blockChecksum(block, lb.getBlockToken()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "for block " + block + " from datanode " + datanodes[j]; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); @@ -1960,7 +1960,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData + ct = PBHelper.convert(checksumData .getCrcType()); } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + @@ -2088,11 +2088,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); + return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index a9753128604..8dd85b72375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -1245,7 +1245,7 @@ class DataStreamer extends Daemon { //ack BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } @@ -1524,7 +1524,7 @@ class DataStreamer extends Daemon { // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(blockReplyStream)); + PBHelper.vintPrefixed(blockReplyStream)); pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 05a9f2caa07..d70f41904bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; @@ -414,7 +414,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 4c23d363d6b..c368d6515f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; @@ -417,7 +417,7 @@ public class RemoteBlockReader2 implements BlockReader { DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java similarity index 93% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index 28097ab7e0f..284281a762b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -21,6 +21,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -30,7 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationH import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.security.token.Token; @@ -58,7 +60,7 @@ public abstract class DataTransferProtoUtil { } public static ChecksumProto toProto(DataChecksum checksum) { - ChecksumTypeProto type = PBHelperClient.convert(checksum.getChecksumType()); + ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType()); // ChecksumType#valueOf never returns null return ChecksumProto.newBuilder() .setBytesPerChecksum(checksum.getBytesPerChecksum()) @@ -70,7 +72,7 @@ public abstract class DataTransferProtoUtil { if (proto == null) return null; int bytesPerChecksum = proto.getBytesPerChecksum(); - DataChecksum.Type type = PBHelperClient.convert(proto.getType()); + DataChecksum.Type type = PBHelper.convert(proto.getType()); return DataChecksum.newDataChecksum(type, bytesPerChecksum); } @@ -87,8 +89,8 @@ public abstract class DataTransferProtoUtil { static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token blockToken) { BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() - .setBlock(PBHelperClient.convert(blk)) - .setToken(PBHelperClient.convert(blockToken)); + .setBlock(PBHelper.convert(blk)) + .setToken(PBHelper.convert(blockToken)); if (Trace.isTracing()) { Span s = Trace.currentSpan(); builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 1f7e378d870..48e931d741a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; @@ -30,16 +32,13 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Transfer data to/from datanode using a streaming protocol. */ @InterfaceAudience.Private @InterfaceStability.Evolving public interface DataTransferProtocol { - public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class); + public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class); /** Version for data transfers between clients and datanodes * This should change when serialization of DatanodeInfo, not just diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 44f38c66736..a811f39ece0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; import java.io.InputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 694f5212b99..d435543f650 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.DataInputStream; import java.io.IOException; @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.htrace.TraceScope; @@ -137,7 +136,7 @@ public abstract class Receiver implements DataTransferProtocol { proto.getClass().getSimpleName()); try { writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelperClient.convertStorageType(proto.getStorageType()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, @@ -229,7 +228,7 @@ public abstract class Receiver implements DataTransferProtocol { proto.getClass().getSimpleName()); try { replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelperClient.convertStorageType(proto.getStorageType()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java similarity index 92% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 2d11dc26c3a..df69125882b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; @@ -140,9 +140,9 @@ public class Sender implements DataTransferProtocol { OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) - .setStorageType(PBHelperClient.convertStorageType(storageType)) - .addAllTargets(PBHelperClient.convert(targets, 1)) - .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1)) + .setStorageType(PBHelper.convertStorageType(storageType)) + .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -152,10 +152,10 @@ public class Sender implements DataTransferProtocol { .setCachingStrategy(getCachingStrategy(cachingStrategy)) .setAllowLazyPersist(allowLazyPersist) .setPinning(pinning) - .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); + .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1)); if (source != null) { - proto.setSource(PBHelperClient.convertDatanodeInfo(source)); + proto.setSource(PBHelper.convertDatanodeInfo(source)); } send(out, Op.WRITE_BLOCK, proto.build()); @@ -171,8 +171,8 @@ public class Sender implements DataTransferProtocol { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) - .addAllTargets(PBHelperClient.convert(targets)) - .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes)) + .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -188,7 +188,7 @@ public class Sender implements DataTransferProtocol { .setHeader(DataTransferProtoUtil.buildBaseHeader( blk, blockToken)).setMaxVersion(maxVersion); if (slotId != null) { - builder.setSlotId(PBHelperClient.convert(slotId)); + builder.setSlotId(PBHelper.convert(slotId)); } builder.setSupportsReceiptVerification(supportsReceiptVerification); OpRequestShortCircuitAccessProto proto = builder.build(); @@ -199,7 +199,7 @@ public class Sender implements DataTransferProtocol { public void releaseShortCircuitFds(SlotId slotId) throws IOException { ReleaseShortCircuitAccessRequestProto.Builder builder = ReleaseShortCircuitAccessRequestProto.newBuilder(). - setSlotId(PBHelperClient.convert(slotId)); + setSlotId(PBHelper.convert(slotId)); if (Trace.isTracing()) { Span s = Trace.currentSpan(); builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() @@ -231,9 +231,9 @@ public class Sender implements DataTransferProtocol { final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .setStorageType(PBHelperClient.convertStorageType(storageType)) + .setStorageType(PBHelper.convertStorageType(storageType)) .setDelHint(delHint) - .setSource(PBHelperClient.convertDatanodeInfo(source)) + .setSource(PBHelper.convertDatanodeInfo(source)) .build(); send(out, Op.REPLACE_BLOCK, proto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java index 852819f1b1a..398d44cc094 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -24,7 +24,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; import java.io.InputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index a628287debd..2bc6a180ffb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -137,7 +137,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } return GetBlockLocalPathInfoResponseProto.newBuilder() - .setBlock(PBHelperClient.convert(resp.getBlock())) + .setBlock(PBHelper.convert(resp.getBlock())) .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 53ca1470e7b..9d6375b29a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -185,7 +185,7 @@ public class ClientDatanodeProtocolTranslatorPB implements @Override public long getReplicaVisibleLength(ExtendedBlock b) throws IOException { GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto - .newBuilder().setBlock(PBHelperClient.convert(b)).build(); + .newBuilder().setBlock(PBHelper.convert(b)).build(); try { return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength(); } catch (ServiceException e) { @@ -218,8 +218,8 @@ public class ClientDatanodeProtocolTranslatorPB implements Token token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() - .setBlock(PBHelperClient.convert(block)) - .setToken(PBHelperClient.convert(token)).build(); + .setBlock(PBHelper.convert(block)) + .setToken(PBHelper.convert(token)).build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index a95f3978144..02b20d62804 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -698,7 +698,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements RpcController controller, GetDatanodeReportRequestProto req) throws ServiceException { try { - List result = PBHelperClient.convert(server + List result = PBHelper.convert(server .getDatanodeReport(PBHelper.convert(req.getType()))); return GetDatanodeReportResponseProto.newBuilder() .addAllDi(result).build(); @@ -892,7 +892,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements server.setQuota(req.getPath(), req.getNamespaceQuota(), req.getStoragespaceQuota(), req.hasStorageType() ? - PBHelperClient.convertStorageType(req.getStorageType()): null); + PBHelper.convertStorageType(req.getStorageType()): null); return VOID_SETQUOTA_RESPONSE; } catch (IOException e) { throw new ServiceException(e); @@ -992,7 +992,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements GetDelegationTokenResponseProto.Builder rspBuilder = GetDelegationTokenResponseProto.newBuilder(); if (token != null) { - rspBuilder.setToken(PBHelperClient.convert(token)); + rspBuilder.setToken(PBHelper.convert(token)); } return rspBuilder.build(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index a0431b1d721..7e57b9731ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -390,7 +390,7 @@ public class ClientNamenodeProtocolTranslatorPB implements String holder) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder() - .setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder) + .setB(PBHelper.convert(b)).setSrc(src).setHolder(holder) .setFileId(fileId).build(); try { rpcProxy.abandonBlock(null, req); @@ -409,9 +409,9 @@ public class ClientNamenodeProtocolTranslatorPB implements AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) - req.setPrevious(PBHelperClient.convert(previous)); - if (excludeNodes != null) - req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes)); + req.setPrevious(PBHelper.convert(previous)); + if (excludeNodes != null) + req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } @@ -433,10 +433,10 @@ public class ClientNamenodeProtocolTranslatorPB implements .newBuilder() .setSrc(src) .setFileId(fileId) - .setBlk(PBHelperClient.convert(blk)) - .addAllExistings(PBHelperClient.convert(existings)) + .setBlk(PBHelper.convert(blk)) + .addAllExistings(PBHelper.convert(existings)) .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs)) - .addAllExcludes(PBHelperClient.convert(excludes)) + .addAllExcludes(PBHelper.convert(excludes)) .setNumAdditionalNodes(numAdditionalNodes) .setClientName(clientName) .build(); @@ -458,7 +458,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setClientName(clientName) .setFileId(fileId); if (last != null) - req.setLast(PBHelperClient.convert(last)); + req.setLast(PBHelper.convert(last)); try { return rpcProxy.complete(null, req.build()).getResult(); } catch (ServiceException e) { @@ -817,7 +817,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setNamespaceQuota(namespaceQuota) .setStoragespaceQuota(storagespaceQuota); if (type != null) { - builder.setStorageType(PBHelperClient.convertStorageType(type)); + builder.setStorageType(PBHelper.convertStorageType(type)); } final SetQuotaRequestProto req = builder.build(); try { @@ -895,7 +895,7 @@ public class ClientNamenodeProtocolTranslatorPB implements String clientName) throws IOException { UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto .newBuilder() - .setBlock(PBHelperClient.convert(block)) + .setBlock(PBHelper.convert(block)) .setClientName(clientName) .build(); try { @@ -911,8 +911,8 @@ public class ClientNamenodeProtocolTranslatorPB implements ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException { UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() .setClientName(clientName) - .setOldBlock(PBHelperClient.convert(oldBlock)) - .setNewBlock(PBHelperClient.convert(newBlock)) + .setOldBlock(PBHelper.convert(oldBlock)) + .setNewBlock(PBHelper.convert(newBlock)) .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes))) .addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs)) .build(); @@ -943,7 +943,7 @@ public class ClientNamenodeProtocolTranslatorPB implements public long renewDelegationToken(Token token) throws IOException { RenewDelegationTokenRequestProto req = RenewDelegationTokenRequestProto.newBuilder(). - setToken(PBHelperClient.convert(token)). + setToken(PBHelper.convert(token)). build(); try { return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime(); @@ -957,7 +957,7 @@ public class ClientNamenodeProtocolTranslatorPB implements throws IOException { CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto .newBuilder() - .setToken(PBHelperClient.convert(token)) + .setToken(PBHelper.convert(token)) .build(); try { rpcProxy.cancelDelegationToken(null, req); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 0b46927af29..94028a2523f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -298,11 +298,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements ) throws IOException { CommitBlockSynchronizationRequestProto.Builder builder = CommitBlockSynchronizationRequestProto.newBuilder() - .setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp) + .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp) .setNewLength(newlength).setCloseFile(closeFile) .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { - builder.addNewTaragets(PBHelperClient.convert(newtargets[i])); + builder.addNewTaragets(PBHelper.convert(newtargets[i])); builder.addNewTargetStorages(newtargetstorages[i]); } CommitBlockSynchronizationRequestProto req = builder.build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 17ba1967f1c..fee62a4e994 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -105,7 +105,7 @@ public class InterDatanodeProtocolTranslatorPB implements long recoveryId, long newBlockId, long newLength) throws IOException { UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto.newBuilder() - .setBlock(PBHelperClient.convert(oldBlock)) + .setBlock(PBHelper.convert(oldBlock)) .setNewLength(newLength).setNewBlockId(newBlockId) .setRecoveryId(recoveryId).build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index bcb96ba4d3a..82c5c4c8170 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -101,7 +101,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() - .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) + .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size) .build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index dbb1861858d..a2522622879 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -347,7 +347,7 @@ public class PBHelper { if (types == null || types.length == 0) { return null; } - List list = PBHelperClient.convertStorageTypes(types); + List list = convertStorageTypes(types); return StorageTypesProto.newBuilder().addAllStorageTypes(list).build(); } @@ -382,6 +382,20 @@ public class PBHelper { .getInfoSecurePort() : 0, dn.getIpcPort()); } + public static DatanodeIDProto convert(DatanodeID dn) { + // For wire compatibility with older versions we transmit the StorageID + // which is the same as the DatanodeUuid. Since StorageID is a required + // field we pass the empty string if the DatanodeUuid is not yet known. + return DatanodeIDProto.newBuilder() + .setIpAddr(dn.getIpAddr()) + .setHostName(dn.getHostName()) + .setXferPort(dn.getXferPort()) + .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") + .setInfoPort(dn.getInfoPort()) + .setInfoSecurePort(dn.getInfoSecurePort()) + .setIpcPort(dn.getIpcPort()).build(); + } + // Arrays of DatanodeId public static DatanodeIDProto[] convert(DatanodeID[] did) { if (did == null) @@ -389,7 +403,7 @@ public class PBHelper { final int len = did.length; DatanodeIDProto[] result = new DatanodeIDProto[len]; for (int i = 0; i < len; ++i) { - result[i] = PBHelperClient.convert(did[i]); + result[i] = convert(did[i]); } return result; } @@ -420,7 +434,7 @@ public class PBHelper { .setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) - .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())) + .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())) .build(); } @@ -582,6 +596,16 @@ public class PBHelper { eb.getGenerationStamp()); } + public static ExtendedBlockProto convert(final ExtendedBlock b) { + if (b == null) return null; + return ExtendedBlockProto.newBuilder(). + setPoolId(b.getBlockPoolId()). + setBlockId(b.getBlockId()). + setNumBytes(b.getNumBytes()). + setGenerationStamp(b.getGenerationStamp()). + build(); + } + public static RecoveringBlockProto convert(RecoveringBlock b) { if (b == null) { return null; @@ -602,6 +626,17 @@ public class PBHelper { new RecoveringBlock(block, locs, b.getNewGenStamp()); } + public static DatanodeInfoProto.AdminState convert( + final DatanodeInfo.AdminStates inAs) { + switch (inAs) { + case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; + case DECOMMISSION_INPROGRESS: + return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; + case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; + default: return DatanodeInfoProto.AdminState.NORMAL; + } + } + static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) return null; return new DatanodeInfo( @@ -613,6 +648,12 @@ public class PBHelper { di.getXceiverCount(), PBHelper.convert(di.getAdminState())); } + static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { + if (di == null) return null; + return convert(di); + } + + static public DatanodeInfo[] convert(DatanodeInfoProto di[]) { if (di == null) return null; DatanodeInfo[] result = new DatanodeInfo[di.length]; @@ -622,6 +663,27 @@ public class PBHelper { return result; } + public static List convert( + DatanodeInfo[] dnInfos) { + return convert(dnInfos, 0); + } + + /** + * Copy from {@code dnInfos} to a target of list of same size starting at + * {@code startIdx}. + */ + public static List convert( + DatanodeInfo[] dnInfos, int startIdx) { + if (dnInfos == null) + return null; + ArrayList protos = Lists + .newArrayListWithCapacity(dnInfos.length); + for (int i = startIdx; i < dnInfos.length; i++) { + protos.add(convert(dnInfos[i])); + } + return protos; + } + public static DatanodeInfo[] convert(List list) { DatanodeInfo[] info = new DatanodeInfo[list.size()]; for (int i = 0; i < info.length; i++) { @@ -629,11 +691,32 @@ public class PBHelper { } return info; } + + public static DatanodeInfoProto convert(DatanodeInfo info) { + DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); + if (info.getNetworkLocation() != null) { + builder.setLocation(info.getNetworkLocation()); + } + builder + .setId(PBHelper.convert((DatanodeID)info)) + .setCapacity(info.getCapacity()) + .setDfsUsed(info.getDfsUsed()) + .setRemaining(info.getRemaining()) + .setBlockPoolUsed(info.getBlockPoolUsed()) + .setCacheCapacity(info.getCacheCapacity()) + .setCacheUsed(info.getCacheUsed()) + .setLastUpdate(info.getLastUpdate()) + .setLastUpdateMonotonic(info.getLastUpdateMonotonic()) + .setXceiverCount(info.getXceiverCount()) + .setAdminState(PBHelper.convert(info.getAdminState())) + .build(); + return builder.build(); + } public static DatanodeStorageReportProto convertDatanodeStorageReport( DatanodeStorageReport report) { return DatanodeStorageReportProto.newBuilder() - .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo())) + .setDatanodeInfo(convert(report.getDatanodeInfo())) .addAllStorageReports(convertStorageReports(report.getStorageReports())) .build(); } @@ -685,7 +768,7 @@ public class PBHelper { Lists.newLinkedList(Arrays.asList(b.getCachedLocations())); for (int i = 0; i < locs.length; i++) { DatanodeInfo loc = locs[i]; - builder.addLocs(i, PBHelperClient.convert(loc)); + builder.addLocs(i, PBHelper.convert(loc)); boolean locIsCached = cachedLocs.contains(loc); builder.addIsCached(locIsCached); if (locIsCached) { @@ -699,7 +782,7 @@ public class PBHelper { StorageType[] storageTypes = b.getStorageTypes(); if (storageTypes != null) { for (int i = 0; i < storageTypes.length; ++i) { - builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i])); + builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); } } final String[] storageIDs = b.getStorageIDs(); @@ -707,8 +790,8 @@ public class PBHelper { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } - return builder.setB(PBHelperClient.convert(b.getBlock())) - .setBlockToken(PBHelperClient.convert(b.getBlockToken())) + return builder.setB(PBHelper.convert(b.getBlock())) + .setBlockToken(PBHelper.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } @@ -749,6 +832,14 @@ public class PBHelper { return lb; } + public static TokenProto convert(Token tok) { + return TokenProto.newBuilder(). + setIdentifier(ByteString.copyFrom(tok.getIdentifier())). + setPassword(ByteString.copyFrom(tok.getPassword())). + setKind(tok.getKind().toString()). + setService(tok.getService().toString()).build(); + } + public static Token convert( TokenProto blockToken) { return new Token(blockToken.getIdentifier() @@ -800,7 +891,7 @@ public class PBHelper { DatanodeRegistration registration) { DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto .newBuilder(); - return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) registration)) + return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration)) .setStorageInfo(PBHelper.convert(registration.getStorageInfo())) .setKeys(PBHelper.convert(registration.getExportedKeys())) .setSoftwareVersion(registration.getSoftwareVersion()).build(); @@ -892,7 +983,7 @@ public class PBHelper { if (types != null) { for (StorageType[] ts : types) { StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); - builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts)); + builder.addAllStorageTypes(convertStorageTypes(ts)); list.add(builder.build()); } } @@ -923,7 +1014,7 @@ public class PBHelper { DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; for (int i = 0; i < targets.length; i++) { ret[i] = DatanodeInfosProto.newBuilder() - .addAllDatanodes(PBHelperClient.convert(targets[i])).build(); + .addAllDatanodes(PBHelper.convert(targets[i])).build(); } return Arrays.asList(ret); } @@ -1247,7 +1338,7 @@ public class PBHelper { fs.getFileBufferSize(), fs.getEncryptDataTransfer(), fs.getTrashInterval(), - PBHelperClient.convert(fs.getChecksumType())); + PBHelper.convert(fs.getChecksumType())); } public static FsServerDefaultsProto convert(FsServerDefaults fs) { @@ -1260,7 +1351,7 @@ public class PBHelper { .setFileBufferSize(fs.getFileBufferSize()) .setEncryptDataTransfer(fs.getEncryptDataTransfer()) .setTrashInterval(fs.getTrashInterval()) - .setChecksumType(PBHelperClient.convert(fs.getChecksumType())) + .setChecksumType(PBHelper.convert(fs.getChecksumType())) .build(); } @@ -1648,7 +1739,7 @@ public class PBHelper { if (cs.hasTypeQuotaInfos()) { for (HdfsProtos.StorageTypeQuotaInfoProto info : cs.getTypeQuotaInfos().getTypeQuotaInfoList()) { - StorageType type = PBHelperClient.convertStorageType(info.getType()); + StorageType type = PBHelper.convertStorageType(info.getType()); builder.typeConsumed(type, info.getConsumed()); builder.typeQuota(type, info.getQuota()); } @@ -1672,7 +1763,7 @@ public class PBHelper { for (StorageType t: StorageType.getTypesSupportingQuota()) { HdfsProtos.StorageTypeQuotaInfoProto info = HdfsProtos.StorageTypeQuotaInfoProto.newBuilder(). - setType(PBHelperClient.convertStorageType(t)). + setType(convertStorageType(t)). setConsumed(cs.getTypeConsumed(t)). setQuota(cs.getTypeQuota(t)). build(); @@ -1717,7 +1808,7 @@ public class PBHelper { public static DatanodeStorageProto convert(DatanodeStorage s) { return DatanodeStorageProto.newBuilder() .setState(PBHelper.convertState(s.getState())) - .setStorageType(PBHelperClient.convertStorageType(s.getStorageType())) + .setStorageType(PBHelper.convertStorageType(s.getStorageType())) .setStorageUuid(s.getStorageID()).build(); } @@ -1731,10 +1822,44 @@ public class PBHelper { } } + public static List convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List protos = new ArrayList( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(convertStorageType(types[i])); + } + return protos; + } + + public static StorageTypeProto convertStorageType(StorageType type) { + switch(type) { + case DISK: + return StorageTypeProto.DISK; + case SSD: + return StorageTypeProto.SSD; + case ARCHIVE: + return StorageTypeProto.ARCHIVE; + case RAM_DISK: + return StorageTypeProto.RAM_DISK; + default: + throw new IllegalStateException( + "BUG: StorageType not found, type=" + type); + } + } + public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelperClient.convertStorageType(s.getStorageType())); + PBHelper.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1747,6 +1872,22 @@ public class PBHelper { } } + public static StorageType convertStorageType(StorageTypeProto type) { + switch(type) { + case DISK: + return StorageType.DISK; + case SSD: + return StorageType.SSD; + case ARCHIVE: + return StorageType.ARCHIVE; + case RAM_DISK: + return StorageType.RAM_DISK; + default: + throw new IllegalStateException( + "BUG: StorageTypeProto not found, type=" + type); + } + } + public static StorageType[] convertStorageTypes( List storageTypesList, int expectedSize) { final StorageType[] storageTypes = new StorageType[expectedSize]; @@ -1755,7 +1896,7 @@ public class PBHelper { Arrays.fill(storageTypes, StorageType.DEFAULT); } else { for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = PBHelperClient.convertStorageType(storageTypesList.get(i)); + storageTypes[i] = convertStorageType(storageTypesList.get(i)); } } return storageTypes; @@ -1939,6 +2080,10 @@ public class PBHelper { return reportProto; } + public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { + return DataChecksum.Type.valueOf(type.getNumber()); + } + public static CacheDirectiveInfoProto convert (CacheDirectiveInfo info) { CacheDirectiveInfoProto.Builder builder = @@ -2111,6 +2256,9 @@ public class PBHelper { return new CachePoolEntry(info, stats); } + public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { + return HdfsProtos.ChecksumTypeProto.valueOf(type.id); + } public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) { DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder(); @@ -2125,6 +2273,17 @@ public class PBHelper { proto.getConfigVersion(), proto.getUptime()); } + public static InputStream vintPrefixed(final InputStream input) + throws IOException { + final int firstByte = input.read(); + if (firstByte == -1) { + throw new EOFException("Premature EOF: no length prefix available"); + } + + int size = CodedInputStream.readRawVarint32(firstByte, input); + assert size >= 0; + return new ExactSizeInputStream(input, size); + } private static AclEntryScopeProto convert(AclEntryScope v) { return AclEntryScopeProto.valueOf(v.ordinal()); @@ -2348,11 +2507,30 @@ public class PBHelper { proto.getKeyName()); } + public static ShortCircuitShmSlotProto convert(SlotId slotId) { + return ShortCircuitShmSlotProto.newBuilder(). + setShmId(convert(slotId.getShmId())). + setSlotIdx(slotId.getSlotIdx()). + build(); + } + + public static ShortCircuitShmIdProto convert(ShmId shmId) { + return ShortCircuitShmIdProto.newBuilder(). + setHi(shmId.getHi()). + setLo(shmId.getLo()). + build(); + + } + public static SlotId convert(ShortCircuitShmSlotProto slotId) { - return new SlotId(PBHelperClient.convert(slotId.getShmId()), + return new SlotId(PBHelper.convert(slotId.getShmId()), slotId.getSlotIdx()); } + public static ShmId convert(ShortCircuitShmIdProto shmId) { + return new ShmId(shmId.getHi(), shmId.getLo()); + } + private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType type) { switch (type) { @@ -2859,6 +3037,18 @@ public class PBHelper { ezKeyVersionName); } + public static List convert(boolean[] targetPinnings, int idx) { + List pinnings = new ArrayList(); + if (targetPinnings == null) { + pinnings.add(Boolean.FALSE); + } else { + for (; idx < targetPinnings.length; ++idx) { + pinnings.add(Boolean.valueOf(targetPinnings[idx])); + } + } + return pinnings; + } + public static boolean[] convertBooleanList( List targetPinningsList) { final boolean[] targetPinnings = new boolean[targetPinningsList.size()]; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index dc967fff95d..40564dfab6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.balancer; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index df038081078..fa3b78c508f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -137,7 +137,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; @@ -2172,7 +2172,7 @@ public class DataNode extends ReconfigurableBase // read ack if (isClient) { DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index dfaa5252819..e9cf4362942 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; @@ -427,7 +427,7 @@ class DataXceiver extends Receiver implements Runnable { throws IOException { DataNodeFaultInjector.get().sendShortCircuitShmResponse(); ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). - setId(PBHelperClient.convert(shmInfo.shmId)).build(). + setId(PBHelper.convert(shmInfo.shmId)).build(). writeDelimitedTo(socketOut); // Send the file descriptor for the shared memory segment. byte buf[] = new byte[] { (byte)0 }; @@ -559,7 +559,7 @@ class DataXceiver extends Receiver implements Runnable { // to respond with a Status enum. try { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (!stat.hasStatus()) { LOG.warn("Client " + peer.getRemoteAddressString() + " did not send a valid status code after reading. " + @@ -745,7 +745,7 @@ class DataXceiver extends Receiver implements Runnable { // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { @@ -962,7 +962,7 @@ class DataXceiver extends Receiver implements Runnable { .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelperClient.convert(checksum.getChecksumType()))) + .setCrcType(PBHelper.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); @@ -1147,8 +1147,8 @@ class DataXceiver extends Receiver implements Runnable { // receive the response from the proxy BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(proxyReply)); - + PBHelper.vintPrefixed(proxyReply)); + String logInfo = "copy block " + block + " from " + proxySock.getRemoteSocketAddress(); DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 0afb06c0862..9df1713023c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -155,7 +154,7 @@ public final class FSImageFormatPBINode { QuotaByStorageTypeFeatureProto proto) { ImmutableList.Builder b = ImmutableList.builder(); for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) { - StorageType type = PBHelperClient.convertStorageType(quotaEntry.getStorageType()); + StorageType type = PBHelper.convertStorageType(quotaEntry.getStorageType()); long quota = quotaEntry.getQuota(); b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type) .setQuota(quota).build()); @@ -460,7 +459,7 @@ public final class FSImageFormatPBINode { if (q.getTypeSpace(t) >= 0) { QuotaByStorageTypeEntryProto.Builder eb = QuotaByStorageTypeEntryProto.newBuilder(). - setStorageType(PBHelperClient.convertStorageType(t)). + setStorageType(PBHelper.convertStorageType(t)). setQuota(q.getTypeSpace(t)); b.addQuotas(eb); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java index f70398aecda..062539a0721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java @@ -30,6 +30,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.net.DomainPeer; @@ -37,18 +39,17 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocketWatcher; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Manages short-circuit memory segments for an HDFS client. * @@ -62,8 +63,7 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private public class DfsClientShmManager implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger( - DfsClientShmManager.class); + private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class); /** * Manages short-circuit memory segments that pertain to a given DataNode. @@ -168,7 +168,7 @@ public class DfsClientShmManager implements Closeable { new Sender(out).requestShortCircuitShm(clientName); ShortCircuitShmResponseProto resp = ShortCircuitShmResponseProto.parseFrom( - PBHelperClient.vintPrefixed(peer.getInputStream())); + PBHelper.vintPrefixed(peer.getInputStream())); String error = resp.hasError() ? resp.getError() : "(unknown)"; switch (resp.getStatus()) { case SUCCESS: @@ -185,18 +185,14 @@ public class DfsClientShmManager implements Closeable { } try { DfsClientShm shm = - new DfsClientShm(PBHelperClient.convert(resp.getId()), + new DfsClientShm(PBHelper.convert(resp.getId()), fis[0], this, peer); if (LOG.isTraceEnabled()) { LOG.trace(this + ": createNewShm: created " + shm); } return shm; } finally { - try { - fis[0].close(); - } catch (Throwable e) { - LOG.debug("Exception in closing " + fis[0], e); - } + IOUtils.cleanup(LOG, fis[0]); } case ERROR_UNSUPPORTED: // The DataNode just does not support short-circuit shared memory @@ -501,11 +497,7 @@ public class DfsClientShmManager implements Closeable { } // When closed, the domainSocketWatcher will issue callbacks that mark // all the outstanding DfsClientShm segments as stale. - try { - domainSocketWatcher.close(); - } catch (Throwable e) { - LOG.debug("Exception in closing " + domainSocketWatcher, e); - } + IOUtils.cleanup(LOG, domainSocketWatcher); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index 15b8dea8e0e..db4cbe29df5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; @@ -201,7 +201,7 @@ public class ShortCircuitCache implements Closeable { DataInputStream in = new DataInputStream(sock.getInputStream()); ReleaseShortCircuitAccessResponseProto resp = ReleaseShortCircuitAccessResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (resp.getStatus() != Status.SUCCESS) { String error = resp.hasError() ? resp.getError() : "(unknown)"; throw new IOException(resp.getStatus().toString() + ": " + error); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java similarity index 99% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java index 78325a389b4..7b89d0a978d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java @@ -27,6 +27,8 @@ import java.util.Random; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.io.nativeio.NativeIO; @@ -34,9 +36,6 @@ import org.apache.hadoop.io.nativeio.NativeIO.POSIX; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import sun.misc.Unsafe; import com.google.common.base.Preconditions; @@ -47,7 +46,7 @@ import com.google.common.primitives.Ints; * A shared memory segment used to implement short-circuit reads. */ public class ShortCircuitShm { - private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class); + private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class); protected static final int BYTES_PER_SLOT = 64; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index f25fb1b0aab..c7233bd5da9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -153,7 +153,7 @@ public class TestPBHelper { @Test public void testConvertDatanodeID() { DatanodeID dn = DFSTestUtil.getLocalDatanodeID(); - DatanodeIDProto dnProto = PBHelperClient.convert(dn); + DatanodeIDProto dnProto = PBHelper.convert(dn); DatanodeID dn2 = PBHelper.convert(dnProto); compare(dn, dn2); } @@ -332,12 +332,12 @@ public class TestPBHelper { @Test public void testConvertExtendedBlock() { ExtendedBlock b = getExtendedBlock(); - ExtendedBlockProto bProto = PBHelperClient.convert(b); + ExtendedBlockProto bProto = PBHelper.convert(b); ExtendedBlock b1 = PBHelper.convert(bProto); assertEquals(b, b1); b.setBlockId(-1); - bProto = PBHelperClient.convert(b); + bProto = PBHelper.convert(b); b1 = PBHelper.convert(bProto); assertEquals(b, b1); } @@ -398,7 +398,7 @@ public class TestPBHelper { Token token = new Token( "identifier".getBytes(), "password".getBytes(), new Text("kind"), new Text("service")); - TokenProto tokenProto = PBHelperClient.convert(token); + TokenProto tokenProto = PBHelper.convert(token); Token token2 = PBHelper.convert(tokenProto); compare(token, token2); } @@ -592,16 +592,16 @@ public class TestPBHelper { @Test public void testChecksumTypeProto() { assertEquals(DataChecksum.Type.NULL, - PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL)); + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL)); assertEquals(DataChecksum.Type.CRC32, - PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32)); + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32)); assertEquals(DataChecksum.Type.CRC32C, - PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C)); - assertEquals(PBHelperClient.convert(DataChecksum.Type.NULL), + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C)); + assertEquals(PBHelper.convert(DataChecksum.Type.NULL), HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL); - assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32), + assertEquals(PBHelper.convert(DataChecksum.Type.CRC32), HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32); - assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32C), + assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C), HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C); }