HDFS-4363. Combine PBHelper and HdfsProtoUtil and remove redundant methods. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1431088 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
106e2e27ff
commit
3cd17b614e
|
@ -466,6 +466,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
HDFS-4035. LightWeightGSet and LightWeightHashSet increment a
|
||||
volatile without synchronization. (eli)
|
||||
|
||||
HDFS-4363. Combine PBHelper and HdfsProtoUtil and remove redundant
|
||||
methods. (suresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -79,7 +79,6 @@ import javax.net.SocketFactory;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||
|
@ -115,7 +114,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
|
@ -128,6 +126,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
|
@ -363,7 +362,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
|
||||
/**
|
||||
* Same as this(nameNodeUri, conf, null);
|
||||
* @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
|
||||
* @see #DFSClient(URI, Configuration, FileSystem.Statistics)
|
||||
*/
|
||||
public DFSClient(URI nameNodeUri, Configuration conf
|
||||
) throws IOException {
|
||||
|
@ -372,7 +371,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
|
||||
/**
|
||||
* Same as this(nameNodeUri, null, conf, stats);
|
||||
* @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
|
||||
* @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
|
||||
*/
|
||||
public DFSClient(URI nameNodeUri, Configuration conf,
|
||||
FileSystem.Statistics stats)
|
||||
|
@ -1157,7 +1156,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
|
||||
/**
|
||||
* Call {@link #create(String, FsPermission, EnumSet, short, long,
|
||||
* Progressable, int)} with default <code>permission</code>
|
||||
* Progressable, int, ChecksumOpt)} with default <code>permission</code>
|
||||
* {@link FsPermission#getDefault()}.
|
||||
*
|
||||
* @param src File name
|
||||
|
@ -1268,7 +1267,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
|
||||
/**
|
||||
* Same as {{@link #create(String, FsPermission, EnumSet, short, long,
|
||||
* Progressable, int)} except that the permission
|
||||
* Progressable, int, ChecksumOpt)} except that the permission
|
||||
* is absolute (ie has already been masked with umask.
|
||||
*/
|
||||
public DFSOutputStream primitiveCreate(String src,
|
||||
|
@ -1453,7 +1452,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
}
|
||||
/**
|
||||
* Delete file or directory.
|
||||
* See {@link ClientProtocol#delete(String)}.
|
||||
* See {@link ClientProtocol#delete(String, boolean)}.
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean delete(String src) throws IOException {
|
||||
|
@ -1678,7 +1677,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
new Sender(out).blockChecksum(block, lb.getBlockToken());
|
||||
|
||||
final BlockOpResponseProto reply =
|
||||
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
|
||||
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
|
||||
|
||||
if (reply.getStatus() != Status.SUCCESS) {
|
||||
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
|
||||
|
@ -1725,8 +1724,8 @@ public class DFSClient implements java.io.Closeable {
|
|||
md5.write(md5out);
|
||||
|
||||
// read crc-type
|
||||
final DataChecksum.Type ct = HdfsProtoUtil.
|
||||
fromProto(checksumData.getCrcType());
|
||||
final DataChecksum.Type ct = PBHelper.convert(checksumData
|
||||
.getCrcType());
|
||||
if (i == 0) { // first block
|
||||
crcType = ct;
|
||||
} else if (crcType != DataChecksum.Type.MIXED
|
||||
|
@ -1888,7 +1887,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
* @param isChecked
|
||||
* If true, then check only active namenode's safemode status, else
|
||||
* check first namenode's status.
|
||||
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeActio,boolean)
|
||||
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
|
||||
*/
|
||||
public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
|
||||
return namenode.setSafeMode(action, isChecked);
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
|
@ -66,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
|||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
|
@ -883,7 +883,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
|
||||
//ack
|
||||
BlockOpResponseProto response =
|
||||
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
|
||||
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
|
||||
if (SUCCESS != response.getStatus()) {
|
||||
throw new IOException("Failed to add a datanode");
|
||||
}
|
||||
|
@ -1073,7 +1073,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
|
||||
// receive ack for connect
|
||||
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
||||
HdfsProtoUtil.vintPrefixed(blockReplyStream));
|
||||
PBHelper.vintPrefixed(blockReplyStream));
|
||||
pipelineStatus = resp.getStatus();
|
||||
firstBadLink = resp.getFirstBadLink();
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
|
@ -39,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -392,7 +391,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
|||
bufferSize));
|
||||
|
||||
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
|
||||
vintPrefixed(in));
|
||||
PBHelper.vintPrefixed(in));
|
||||
RemoteBlockReader2.checkSuccess(status, sock, block, file);
|
||||
ReadOpChecksumInfoProto checksumInfo =
|
||||
status.getReadOpChecksumInfo();
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -43,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.net.SocketInputWrapper;
|
||||
|
@ -401,7 +400,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
|||
DataInputStream in = new DataInputStream(ioStreams.in);
|
||||
|
||||
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
|
||||
vintPrefixed(in));
|
||||
PBHelper.vintPrefixed(in));
|
||||
checkSuccess(status, sock, block, file);
|
||||
ReadOpChecksumInfoProto checksumInfo =
|
||||
status.getReadOpChecksumInfo();
|
||||
|
|
|
@ -1,180 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
|
||||
/**
|
||||
* Utilities for converting to and from protocol buffers used in the
|
||||
* HDFS wire protocol, as well as some generic utilities useful
|
||||
* for dealing with protocol buffers.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class HdfsProtoUtil {
|
||||
|
||||
//// Block Token ////
|
||||
|
||||
public static TokenProto toProto(Token<?> blockToken) {
|
||||
return TokenProto.newBuilder()
|
||||
.setIdentifier(ByteString.copyFrom(blockToken.getIdentifier()))
|
||||
.setPassword(ByteString.copyFrom(blockToken.getPassword()))
|
||||
.setKind(blockToken.getKind().toString())
|
||||
.setService(blockToken.getService().toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static Token<BlockTokenIdentifier> fromProto(TokenProto proto) {
|
||||
return new Token<BlockTokenIdentifier>(proto.getIdentifier().toByteArray(),
|
||||
proto.getPassword().toByteArray(),
|
||||
new Text(proto.getKind()),
|
||||
new Text(proto.getService()));
|
||||
}
|
||||
|
||||
//// Extended Block ////
|
||||
|
||||
public static HdfsProtos.ExtendedBlockProto toProto(ExtendedBlock block) {
|
||||
return HdfsProtos.ExtendedBlockProto.newBuilder()
|
||||
.setBlockId(block.getBlockId())
|
||||
.setPoolId(block.getBlockPoolId())
|
||||
.setNumBytes(block.getNumBytes())
|
||||
.setGenerationStamp(block.getGenerationStamp())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static ExtendedBlock fromProto(HdfsProtos.ExtendedBlockProto proto) {
|
||||
return new ExtendedBlock(
|
||||
proto.getPoolId(), proto.getBlockId(),
|
||||
proto.getNumBytes(), proto.getGenerationStamp());
|
||||
}
|
||||
|
||||
//// DatanodeID ////
|
||||
|
||||
private static HdfsProtos.DatanodeIDProto toProto(
|
||||
DatanodeID dni) {
|
||||
return HdfsProtos.DatanodeIDProto.newBuilder()
|
||||
.setIpAddr(dni.getIpAddr())
|
||||
.setHostName(dni.getHostName())
|
||||
.setStorageID(dni.getStorageID())
|
||||
.setXferPort(dni.getXferPort())
|
||||
.setInfoPort(dni.getInfoPort())
|
||||
.setIpcPort(dni.getIpcPort())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) {
|
||||
return new DatanodeID(
|
||||
idProto.getIpAddr(),
|
||||
idProto.getHostName(),
|
||||
idProto.getStorageID(),
|
||||
idProto.getXferPort(),
|
||||
idProto.getInfoPort(),
|
||||
idProto.getIpcPort());
|
||||
}
|
||||
|
||||
//// DatanodeInfo ////
|
||||
|
||||
public static HdfsProtos.DatanodeInfoProto toProto(DatanodeInfo dni) {
|
||||
return HdfsProtos.DatanodeInfoProto.newBuilder()
|
||||
.setId(toProto((DatanodeID)dni))
|
||||
.setCapacity(dni.getCapacity())
|
||||
.setDfsUsed(dni.getDfsUsed())
|
||||
.setRemaining(dni.getRemaining())
|
||||
.setBlockPoolUsed(dni.getBlockPoolUsed())
|
||||
.setLastUpdate(dni.getLastUpdate())
|
||||
.setXceiverCount(dni.getXceiverCount())
|
||||
.setLocation(dni.getNetworkLocation())
|
||||
.setAdminState(HdfsProtos.DatanodeInfoProto.AdminState.valueOf(
|
||||
dni.getAdminState().name()))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static DatanodeInfo fromProto(HdfsProtos.DatanodeInfoProto dniProto) {
|
||||
DatanodeInfo dniObj = new DatanodeInfo(fromProto(dniProto.getId()),
|
||||
dniProto.getLocation());
|
||||
|
||||
dniObj.setCapacity(dniProto.getCapacity());
|
||||
dniObj.setDfsUsed(dniProto.getDfsUsed());
|
||||
dniObj.setRemaining(dniProto.getRemaining());
|
||||
dniObj.setBlockPoolUsed(dniProto.getBlockPoolUsed());
|
||||
dniObj.setLastUpdate(dniProto.getLastUpdate());
|
||||
dniObj.setXceiverCount(dniProto.getXceiverCount());
|
||||
dniObj.setAdminState(DatanodeInfo.AdminStates.valueOf(
|
||||
dniProto.getAdminState().name()));
|
||||
return dniObj;
|
||||
}
|
||||
|
||||
public static ArrayList<? extends HdfsProtos.DatanodeInfoProto> toProtos(
|
||||
DatanodeInfo[] dnInfos, int startIdx) {
|
||||
ArrayList<HdfsProtos.DatanodeInfoProto> protos =
|
||||
Lists.newArrayListWithCapacity(dnInfos.length);
|
||||
for (int i = startIdx; i < dnInfos.length; i++) {
|
||||
protos.add(toProto(dnInfos[i]));
|
||||
}
|
||||
return protos;
|
||||
}
|
||||
|
||||
public static DatanodeInfo[] fromProtos(
|
||||
List<HdfsProtos.DatanodeInfoProto> targetsList) {
|
||||
DatanodeInfo[] ret = new DatanodeInfo[targetsList.size()];
|
||||
int i = 0;
|
||||
for (HdfsProtos.DatanodeInfoProto proto : targetsList) {
|
||||
ret[i++] = fromProto(proto);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
|
||||
return DataChecksum.Type.valueOf(type.getNumber());
|
||||
}
|
||||
|
||||
public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
|
||||
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
|
||||
}
|
||||
|
||||
public static InputStream vintPrefixed(final InputStream input)
|
||||
throws IOException {
|
||||
final int firstByte = input.read();
|
||||
if (firstByte == -1) {
|
||||
throw new EOFException("Premature EOF: no length prefix available");
|
||||
}
|
||||
|
||||
int size = CodedInputStream.readRawVarint32(firstByte, input);
|
||||
assert size >= 0;
|
||||
|
||||
return new ExactSizeInputStream(input, size);
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
|
||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
|
|
@ -21,12 +21,12 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -41,18 +41,16 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
public abstract class DataTransferProtoUtil {
|
||||
static BlockConstructionStage fromProto(
|
||||
OpWriteBlockProto.BlockConstructionStage stage) {
|
||||
return BlockConstructionStage.valueOf(BlockConstructionStage.class,
|
||||
stage.name());
|
||||
return BlockConstructionStage.valueOf(stage.name());
|
||||
}
|
||||
|
||||
static OpWriteBlockProto.BlockConstructionStage toProto(
|
||||
BlockConstructionStage stage) {
|
||||
return OpWriteBlockProto.BlockConstructionStage.valueOf(
|
||||
stage.name());
|
||||
return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
|
||||
}
|
||||
|
||||
public static ChecksumProto toProto(DataChecksum checksum) {
|
||||
ChecksumTypeProto type = HdfsProtoUtil.toProto(checksum.getChecksumType());
|
||||
ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
|
||||
// ChecksumType#valueOf never returns null
|
||||
return ChecksumProto.newBuilder()
|
||||
.setBytesPerChecksum(checksum.getBytesPerChecksum())
|
||||
|
@ -64,8 +62,7 @@ public abstract class DataTransferProtoUtil {
|
|||
if (proto == null) return null;
|
||||
|
||||
int bytesPerChecksum = proto.getBytesPerChecksum();
|
||||
DataChecksum.Type type = HdfsProtoUtil.fromProto(proto.getType());
|
||||
|
||||
DataChecksum.Type type = PBHelper.convert(proto.getType());
|
||||
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
|
||||
}
|
||||
|
||||
|
@ -82,8 +79,8 @@ public abstract class DataTransferProtoUtil {
|
|||
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
|
||||
Token<BlockTokenIdentifier> blockToken) {
|
||||
return BaseHeaderProto.newBuilder()
|
||||
.setBlock(HdfsProtoUtil.toProto(blk))
|
||||
.setToken(HdfsProtoUtil.toProto(blockToken))
|
||||
.setBlock(PBHelper.convert(blk))
|
||||
.setToken(PBHelper.convert(blockToken))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
|
||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
|
|
@ -17,9 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
|
||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
|
@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
|
||||
/** Receiver */
|
||||
@InterfaceAudience.Private
|
||||
|
@ -85,8 +84,8 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
/** Receive OP_READ_BLOCK */
|
||||
private void opReadBlock() throws IOException {
|
||||
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
|
||||
readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
|
||||
fromProto(proto.getHeader().getBaseHeader().getToken()),
|
||||
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
proto.getOffset(),
|
||||
proto.getLen());
|
||||
|
@ -95,11 +94,11 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
/** Receive OP_WRITE_BLOCK */
|
||||
private void opWriteBlock(DataInputStream in) throws IOException {
|
||||
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
|
||||
writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
|
||||
fromProto(proto.getHeader().getBaseHeader().getToken()),
|
||||
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
fromProtos(proto.getTargetsList()),
|
||||
fromProto(proto.getSource()),
|
||||
PBHelper.convert(proto.getTargetsList()),
|
||||
PBHelper.convert(proto.getSource()),
|
||||
fromProto(proto.getStage()),
|
||||
proto.getPipelineSize(),
|
||||
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
|
||||
|
@ -111,33 +110,33 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
private void opTransferBlock(DataInputStream in) throws IOException {
|
||||
final OpTransferBlockProto proto =
|
||||
OpTransferBlockProto.parseFrom(vintPrefixed(in));
|
||||
transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
|
||||
fromProto(proto.getHeader().getBaseHeader().getToken()),
|
||||
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
fromProtos(proto.getTargetsList()));
|
||||
PBHelper.convert(proto.getTargetsList()));
|
||||
}
|
||||
|
||||
/** Receive OP_REPLACE_BLOCK */
|
||||
private void opReplaceBlock(DataInputStream in) throws IOException {
|
||||
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
|
||||
replaceBlock(fromProto(proto.getHeader().getBlock()),
|
||||
fromProto(proto.getHeader().getToken()),
|
||||
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
proto.getDelHint(),
|
||||
fromProto(proto.getSource()));
|
||||
PBHelper.convert(proto.getSource()));
|
||||
}
|
||||
|
||||
/** Receive OP_COPY_BLOCK */
|
||||
private void opCopyBlock(DataInputStream in) throws IOException {
|
||||
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
|
||||
copyBlock(fromProto(proto.getHeader().getBlock()),
|
||||
fromProto(proto.getHeader().getToken()));
|
||||
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()));
|
||||
}
|
||||
|
||||
/** Receive OP_BLOCK_CHECKSUM */
|
||||
private void opBlockChecksum(DataInputStream in) throws IOException {
|
||||
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
|
||||
|
||||
blockChecksum(fromProto(proto.getHeader().getBlock()),
|
||||
fromProto(proto.getHeader().getToken()));
|
||||
blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
|
||||
|
||||
import java.io.DataOutput;
|
||||
|
@ -37,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -105,7 +104,7 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
|
||||
.setHeader(header)
|
||||
.addAllTargets(toProtos(targets, 1))
|
||||
.addAllTargets(PBHelper.convert(targets, 1))
|
||||
.setStage(toProto(stage))
|
||||
.setPipelineSize(pipelineSize)
|
||||
.setMinBytesRcvd(minBytesRcvd)
|
||||
|
@ -114,7 +113,7 @@ public class Sender implements DataTransferProtocol {
|
|||
.setRequestedChecksum(checksumProto);
|
||||
|
||||
if (source != null) {
|
||||
proto.setSource(toProto(source));
|
||||
proto.setSource(PBHelper.convertDatanodeInfo(source));
|
||||
}
|
||||
|
||||
send(out, Op.WRITE_BLOCK, proto.build());
|
||||
|
@ -129,7 +128,7 @@ public class Sender implements DataTransferProtocol {
|
|||
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildClientHeader(
|
||||
blk, clientName, blockToken))
|
||||
.addAllTargets(toProtos(targets, 0))
|
||||
.addAllTargets(PBHelper.convert(targets))
|
||||
.build();
|
||||
|
||||
send(out, Op.TRANSFER_BLOCK, proto);
|
||||
|
@ -143,7 +142,7 @@ public class Sender implements DataTransferProtocol {
|
|||
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
|
||||
.setDelHint(delHint)
|
||||
.setSource(toProto(source))
|
||||
.setSource(PBHelper.convertDatanodeInfo(source))
|
||||
.build();
|
||||
|
||||
send(out, Op.REPLACE_BLOCK, proto);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.protocolPB;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -131,7 +130,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
|||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
|
@ -494,10 +492,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
RpcController controller, GetDatanodeReportRequestProto req)
|
||||
throws ServiceException {
|
||||
try {
|
||||
DatanodeInfoProto[] result = PBHelper.convert(server
|
||||
List<? extends DatanodeInfoProto> result = PBHelper.convert(server
|
||||
.getDatanodeReport(PBHelper.convert(req.getType())));
|
||||
return GetDatanodeReportResponseProto.newBuilder()
|
||||
.addAllDi(Arrays.asList(result)).build();
|
||||
.addAllDi(result).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
|
|
@ -282,7 +282,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
if (previous != null)
|
||||
req.setPrevious(PBHelper.convert(previous));
|
||||
if (excludeNodes != null)
|
||||
req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
|
||||
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
|
||||
try {
|
||||
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
|
||||
} catch (ServiceException e) {
|
||||
|
@ -300,8 +300,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
.newBuilder()
|
||||
.setSrc(src)
|
||||
.setBlk(PBHelper.convert(blk))
|
||||
.addAllExistings(Arrays.asList(PBHelper.convert(existings)))
|
||||
.addAllExcludes(Arrays.asList(PBHelper.convert(excludes)))
|
||||
.addAllExistings(PBHelper.convert(existings))
|
||||
.addAllExcludes(PBHelper.convert(excludes))
|
||||
.setNumAdditionalNodes(numAdditionalNodes)
|
||||
.setClientName(clientName)
|
||||
.build();
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocolPB;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
|
@ -40,10 +43,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
|
||||
|
@ -127,15 +130,20 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
|
|||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
|
||||
/**
|
||||
* Utilities for converting protobuf classes to and from implementation classes.
|
||||
* Utilities for converting protobuf classes to and from implementation classes
|
||||
* and other helper utilities to help in dealing with protobuf.
|
||||
*
|
||||
* Note that when converting from an internal type to protobuf type, the
|
||||
* converter never return null for protobuf type. The check for internal type
|
||||
|
@ -219,7 +227,8 @@ public class PBHelper {
|
|||
|
||||
// Arrays of DatanodeId
|
||||
public static DatanodeIDProto[] convert(DatanodeID[] did) {
|
||||
if (did == null) return null;
|
||||
if (did == null)
|
||||
return null;
|
||||
final int len = did.length;
|
||||
DatanodeIDProto[] result = new DatanodeIDProto[len];
|
||||
for (int i = 0; i < len; ++i) {
|
||||
|
@ -482,14 +491,26 @@ public class PBHelper {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
|
||||
DatanodeInfo[] dnInfos) {
|
||||
return convert(dnInfos, 0);
|
||||
}
|
||||
|
||||
static public DatanodeInfoProto[] convert(DatanodeInfo[] di) {
|
||||
if (di == null) return null;
|
||||
DatanodeInfoProto[] result = new DatanodeInfoProto[di.length];
|
||||
for (int i = 0; i < di.length; i++) {
|
||||
result[i] = PBHelper.convertDatanodeInfo(di[i]);
|
||||
/**
|
||||
* Copy from {@code dnInfos} to a target of list of same size starting at
|
||||
* {@code startIdx}.
|
||||
*/
|
||||
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
|
||||
DatanodeInfo[] dnInfos, int startIdx) {
|
||||
if (dnInfos == null)
|
||||
return null;
|
||||
ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
|
||||
.newArrayListWithCapacity(dnInfos.length);
|
||||
for (int i = startIdx; i < dnInfos.length; i++) {
|
||||
protos.add(convert(dnInfos[i]));
|
||||
}
|
||||
return result;
|
||||
return protos;
|
||||
}
|
||||
|
||||
public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
|
||||
|
@ -694,7 +715,7 @@ public class PBHelper {
|
|||
DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
ret[i] = DatanodeInfosProto.newBuilder()
|
||||
.addAllDatanodes(Arrays.asList(PBHelper.convert(targets[i]))).build();
|
||||
.addAllDatanodes(PBHelper.convert(targets[i])).build();
|
||||
}
|
||||
return Arrays.asList(ret);
|
||||
}
|
||||
|
@ -963,7 +984,7 @@ public class PBHelper {
|
|||
fs.getFileBufferSize(),
|
||||
fs.getEncryptDataTransfer(),
|
||||
fs.getTrashInterval(),
|
||||
HdfsProtoUtil.fromProto(fs.getChecksumType()));
|
||||
PBHelper.convert(fs.getChecksumType()));
|
||||
}
|
||||
|
||||
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
|
||||
|
@ -976,7 +997,7 @@ public class PBHelper {
|
|||
.setFileBufferSize(fs.getFileBufferSize())
|
||||
.setEncryptDataTransfer(fs.getEncryptDataTransfer())
|
||||
.setTrashInterval(fs.getTrashInterval())
|
||||
.setChecksumType(HdfsProtoUtil.toProto(fs.getChecksumType()))
|
||||
.setChecksumType(PBHelper.convert(fs.getChecksumType()))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -1314,4 +1335,24 @@ public class PBHelper {
|
|||
.setLayoutVersion(j.getLayoutVersion())
|
||||
.setNamespaceID(j.getNamespaceId()).build();
|
||||
}
|
||||
|
||||
public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
|
||||
return DataChecksum.Type.valueOf(type.getNumber());
|
||||
}
|
||||
|
||||
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
|
||||
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
|
||||
}
|
||||
|
||||
public static InputStream vintPrefixed(final InputStream input)
|
||||
throws IOException {
|
||||
final int firstByte = input.read();
|
||||
if (firstByte == -1) {
|
||||
throw new EOFException("Premature EOF: no length prefix available");
|
||||
}
|
||||
|
||||
int size = CodedInputStream.readRawVarint32(firstByte, input);
|
||||
assert size >= 0;
|
||||
return new ExactSizeInputStream(input, size);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
|
|
|
@ -98,7 +98,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
|
||||
|
@ -115,6 +114,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
|
@ -1468,7 +1468,7 @@ public class DataNode extends Configured
|
|||
// read ack
|
||||
if (isClient) {
|
||||
DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
|
||||
HdfsProtoUtil.vintPrefixed(in));
|
||||
PBHelper.vintPrefixed(in));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||
|
@ -56,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
|
@ -144,7 +144,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
/** Return the datanode object. */
|
||||
DataNode getDataNode() {return datanode;}
|
||||
|
||||
private OutputStream getOutputStream() throws IOException {
|
||||
private OutputStream getOutputStream() {
|
||||
return socketOut;
|
||||
}
|
||||
|
||||
|
@ -284,7 +284,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
// to respond with a Status enum.
|
||||
try {
|
||||
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
|
||||
HdfsProtoUtil.vintPrefixed(in));
|
||||
PBHelper.vintPrefixed(in));
|
||||
if (!stat.hasStatus()) {
|
||||
LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
|
||||
"code after reading. Will close connection.");
|
||||
|
@ -445,7 +445,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
// read connect ack (only for clients, not for replication req)
|
||||
if (isClient) {
|
||||
BlockOpResponseProto connectAck =
|
||||
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
|
||||
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
|
||||
mirrorInStatus = connectAck.getStatus();
|
||||
firstBadLink = connectAck.getFirstBadLink();
|
||||
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
|
||||
|
@ -606,7 +606,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
.setBytesPerCrc(bytesPerCRC)
|
||||
.setCrcPerBlock(crcPerBlock)
|
||||
.setMd5(ByteString.copyFrom(md5.getDigest()))
|
||||
.setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
|
||||
.setCrcType(PBHelper.convert(checksum.getChecksumType()))
|
||||
)
|
||||
.build()
|
||||
.writeDelimitedTo(out);
|
||||
|
@ -765,7 +765,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
// receive the response from the proxy
|
||||
|
||||
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
|
||||
HdfsProtoUtil.vintPrefixed(proxyReply));
|
||||
PBHelper.vintPrefixed(proxyReply));
|
||||
|
||||
if (copyResponse.getStatus() != SUCCESS) {
|
||||
if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestHdfsProtoUtil {
|
||||
@Test
|
||||
public void testChecksumTypeProto() {
|
||||
assertEquals(DataChecksum.Type.NULL,
|
||||
HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
|
||||
assertEquals(DataChecksum.Type.CRC32,
|
||||
HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
|
||||
assertEquals(DataChecksum.Type.CRC32C,
|
||||
HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
|
||||
assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.NULL),
|
||||
HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
|
||||
assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32),
|
||||
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
|
||||
assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32C),
|
||||
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
|
||||
|
@ -70,6 +71,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -471,4 +473,20 @@ public class TestPBHelper {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChecksumTypeProto() {
|
||||
assertEquals(DataChecksum.Type.NULL,
|
||||
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
|
||||
assertEquals(DataChecksum.Type.CRC32,
|
||||
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
|
||||
assertEquals(DataChecksum.Type.CRC32C,
|
||||
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
|
||||
assertEquals(PBHelper.convert(DataChecksum.Type.NULL),
|
||||
HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
|
||||
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32),
|
||||
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
|
||||
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C),
|
||||
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue