Revert "HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu."

This reverts commit 8e4afa3a67.
This commit is contained in:
Andrew Wang 2015-08-24 11:51:46 -07:00
parent 87d0133703
commit a727c6db05
38 changed files with 312 additions and 388 deletions

View File

@ -1,254 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Utilities for converting protobuf classes to and from implementation classes
* and other helper utilities to help in dealing with protobuf.
*
* Note that when converting from an internal type to protobuf type, the
* converter never return null for protobuf type. The check for internal type
* being null must be done before calling the convert() method.
*/
public class PBHelperClient {
private PBHelperClient() {
/** Hidden constructor */
}
public static ByteString getByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}
public static ShmId convert(ShortCircuitShmIdProto shmId) {
return new ShmId(shmId.getHi(), shmId.getLo());
}
public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static ExtendedBlockProto convert(final ExtendedBlock b) {
if (b == null) return null;
return ExtendedBlockProto.newBuilder().
setPoolId(b.getBlockPoolId()).
setBlockId(b.getBlockId()).
setNumBytes(b.getNumBytes()).
setGenerationStamp(b.getGenerationStamp()).
build();
}
public static TokenProto convert(Token<?> tok) {
return TokenProto.newBuilder().
setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
setPassword(ByteString.copyFrom(tok.getPassword())).
setKind(tok.getKind().toString()).
setService(tok.getService().toString()).build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
return ShortCircuitShmIdProto.newBuilder().
setHi(shmId.getHi()).
setLo(shmId.getLo()).
build();
}
public static ShortCircuitShmSlotProto convert(SlotId slotId) {
return ShortCircuitShmSlotProto.newBuilder().
setShmId(convert(slotId.getShmId())).
setSlotIdx(slotId.getSlotIdx()).
build();
}
public static DatanodeIDProto convert(DatanodeID dn) {
// For wire compatibility with older versions we transmit the StorageID
// which is the same as the DatanodeUuid. Since StorageID is a required
// field we pass the empty string if the DatanodeUuid is not yet known.
return DatanodeIDProto.newBuilder()
.setIpAddr(dn.getIpAddr())
.setHostName(dn.getHostName())
.setXferPort(dn.getXferPort())
.setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
.setInfoPort(dn.getInfoPort())
.setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
}
public static DatanodeInfoProto.AdminState convert(
final DatanodeInfo.AdminStates inAs) {
switch (inAs) {
case NORMAL: return DatanodeInfoProto.AdminState.NORMAL;
case DECOMMISSION_INPROGRESS:
return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
default: return DatanodeInfoProto.AdminState.NORMAL;
}
}
public static DatanodeInfoProto convert(DatanodeInfo info) {
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
if (info.getNetworkLocation() != null) {
builder.setLocation(info.getNetworkLocation());
}
builder
.setId(convert((DatanodeID) info))
.setCapacity(info.getCapacity())
.setDfsUsed(info.getDfsUsed())
.setRemaining(info.getRemaining())
.setBlockPoolUsed(info.getBlockPoolUsed())
.setCacheCapacity(info.getCacheCapacity())
.setCacheUsed(info.getCacheUsed())
.setLastUpdate(info.getLastUpdate())
.setLastUpdateMonotonic(info.getLastUpdateMonotonic())
.setXceiverCount(info.getXceiverCount())
.setAdminState(convert(info.getAdminState()))
.build();
return builder.build();
}
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos) {
return convert(dnInfos, 0);
}
/**
* Copy from {@code dnInfos} to a target of list of same size starting at
* {@code startIdx}.
*/
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos, int startIdx) {
if (dnInfos == null)
return null;
ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
.newArrayListWithCapacity(dnInfos.length);
for (int i = startIdx; i < dnInfos.length; i++) {
protos.add(convert(dnInfos[i]));
}
return protos;
}
public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
List<Boolean> pinnings = new ArrayList<>();
if (targetPinnings == null) {
pinnings.add(Boolean.FALSE);
} else {
for (; idx < targetPinnings.length; ++idx) {
pinnings.add(targetPinnings[idx]);
}
}
return pinnings;
}
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null;
return convert(di);
}
public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
case SSD:
return StorageTypeProto.SSD;
case ARCHIVE:
return StorageTypeProto.ARCHIVE;
case RAM_DISK:
return StorageTypeProto.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
}
}
public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
case SSD:
return StorageType.SSD;
case ARCHIVE:
return StorageType.ARCHIVE;
case RAM_DISK:
return StorageType.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
}
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types) {
return convertStorageTypes(types, 0);
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types, int startIdx) {
if (types == null) {
return null;
}
final List<StorageTypeProto> protos = new ArrayList<>(
types.length);
for (int i = startIdx; i < types.length; ++i) {
protos.add(PBHelperClient.convertStorageType(types[i]));
}
return protos;
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}
int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
}

View File

@ -482,8 +482,6 @@ Release 2.8.0 - UNRELEASED
HDFS-8823. Move replication factor into individual blocks. (wheat9)
HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -45,7 +45,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -592,7 +592,7 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
failureInjector.getSupportsReceiptVerification());
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
PBHelper.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) {

View File

@ -149,7 +149,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -1928,7 +1928,7 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
String logInfo = "for block " + block + " from datanode " + datanodes[j];
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
@ -1960,7 +1960,7 @@ else if (bpc != bytesPerCRC) {
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData
ct = PBHelper.convert(checksumData
.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
@ -2088,11 +2088,11 @@ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
} finally {
IOUtils.cleanup(null, pair.in, pair.out);
}

View File

@ -67,7 +67,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -1245,7 +1245,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
//ack
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
}
@ -1524,7 +1524,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(blockReplyStream));
PBHelper.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@ -414,7 +414,7 @@ public static RemoteBlockReader newBlockReader(String file,
new BufferedInputStream(peer.getInputStream(), bufferSize));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
PBHelper.vintPrefixed(in));
RemoteBlockReader2.checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();

View File

@ -43,7 +43,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@ -417,7 +417,7 @@ public static BlockReader newBlockReader(String file,
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
PBHelper.vintPrefixed(in));
checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@ -30,7 +32,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.security.token.Token;
@ -58,7 +60,7 @@ static OpWriteBlockProto.BlockConstructionStage toProto(
}
public static ChecksumProto toProto(DataChecksum checksum) {
ChecksumTypeProto type = PBHelperClient.convert(checksum.getChecksumType());
ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
// ChecksumType#valueOf never returns null
return ChecksumProto.newBuilder()
.setBytesPerChecksum(checksum.getBytesPerChecksum())
@ -70,7 +72,7 @@ public static DataChecksum fromProto(ChecksumProto proto) {
if (proto == null) return null;
int bytesPerChecksum = proto.getBytesPerChecksum();
DataChecksum.Type type = PBHelperClient.convert(proto.getType());
DataChecksum.Type type = PBHelper.convert(proto.getType());
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
}
@ -87,8 +89,8 @@ static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken) {
BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder()
.setBlock(PBHelperClient.convert(blk))
.setToken(PBHelperClient.convert(blockToken));
.setBlock(PBHelper.convert(blk))
.setToken(PBHelper.convert(blockToken));
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()

View File

@ -19,6 +19,8 @@
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
@ -30,16 +32,13 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Transfer data to/from datanode using a streaming protocol.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface DataTransferProtocol {
public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class);
public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
/** Version for data transfers between clients and datanodes
* This should change when serialization of DatanodeInfo, not just

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException;
import java.io.InputStream;

View File

@ -19,7 +19,7 @@
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.DataInputStream;
import java.io.IOException;
@ -39,7 +39,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.htrace.TraceScope;
@ -137,7 +136,7 @@ private void opWriteBlock(DataInputStream in) throws IOException {
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
@ -229,7 +228,7 @@ private void opReplaceBlock(DataInputStream in) throws IOException {
proto.getClass().getSimpleName());
try {
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));

View File

@ -41,7 +41,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@ -140,9 +140,9 @@ public void writeBlock(final ExtendedBlock blk,
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.setStorageType(PBHelperClient.convertStorageType(storageType))
.addAllTargets(PBHelperClient.convert(targets, 1))
.addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1))
.setStorageType(PBHelper.convertStorageType(storageType))
.addAllTargets(PBHelper.convert(targets, 1))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
@ -152,10 +152,10 @@ public void writeBlock(final ExtendedBlock blk,
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist)
.setPinning(pinning)
.addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
.addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
if (source != null) {
proto.setSource(PBHelperClient.convertDatanodeInfo(source));
proto.setSource(PBHelper.convertDatanodeInfo(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
@ -171,8 +171,8 @@ public void transferBlock(final ExtendedBlock blk,
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken))
.addAllTargets(PBHelperClient.convert(targets))
.addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes))
.addAllTargets(PBHelper.convert(targets))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@ -188,7 +188,7 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
.setHeader(DataTransferProtoUtil.buildBaseHeader(
blk, blockToken)).setMaxVersion(maxVersion);
if (slotId != null) {
builder.setSlotId(PBHelperClient.convert(slotId));
builder.setSlotId(PBHelper.convert(slotId));
}
builder.setSupportsReceiptVerification(supportsReceiptVerification);
OpRequestShortCircuitAccessProto proto = builder.build();
@ -199,7 +199,7 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
public void releaseShortCircuitFds(SlotId slotId) throws IOException {
ReleaseShortCircuitAccessRequestProto.Builder builder =
ReleaseShortCircuitAccessRequestProto.newBuilder().
setSlotId(PBHelperClient.convert(slotId));
setSlotId(PBHelper.convert(slotId));
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
@ -231,9 +231,9 @@ public void replaceBlock(final ExtendedBlock blk,
final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setStorageType(PBHelperClient.convertStorageType(storageType))
.setStorageType(PBHelper.convertStorageType(storageType))
.setDelHint(delHint)
.setSource(PBHelperClient.convertDatanodeInfo(source))
.setSource(PBHelper.convertDatanodeInfo(source))
.build();
send(out, Op.REPLACE_BLOCK, proto);

View File

@ -24,7 +24,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException;
import java.io.InputStream;

View File

@ -137,7 +137,7 @@ public GetBlockLocalPathInfoResponseProto getBlockLocalPathInfo(
throw new ServiceException(e);
}
return GetBlockLocalPathInfoResponseProto.newBuilder()
.setBlock(PBHelperClient.convert(resp.getBlock()))
.setBlock(PBHelper.convert(resp.getBlock()))
.setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
.build();
}

View File

@ -185,7 +185,7 @@ public void close() {
@Override
public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
.newBuilder().setBlock(PBHelperClient.convert(b)).build();
.newBuilder().setBlock(PBHelper.convert(b)).build();
try {
return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
} catch (ServiceException e) {
@ -218,8 +218,8 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException {
GetBlockLocalPathInfoRequestProto req =
GetBlockLocalPathInfoRequestProto.newBuilder()
.setBlock(PBHelperClient.convert(block))
.setToken(PBHelperClient.convert(token)).build();
.setBlock(PBHelper.convert(block))
.setToken(PBHelper.convert(token)).build();
GetBlockLocalPathInfoResponseProto resp;
try {
resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);

View File

@ -698,7 +698,7 @@ public GetDatanodeReportResponseProto getDatanodeReport(
RpcController controller, GetDatanodeReportRequestProto req)
throws ServiceException {
try {
List<? extends DatanodeInfoProto> result = PBHelperClient.convert(server
List<? extends DatanodeInfoProto> result = PBHelper.convert(server
.getDatanodeReport(PBHelper.convert(req.getType())));
return GetDatanodeReportResponseProto.newBuilder()
.addAllDi(result).build();
@ -892,7 +892,7 @@ public SetQuotaResponseProto setQuota(RpcController controller,
server.setQuota(req.getPath(), req.getNamespaceQuota(),
req.getStoragespaceQuota(),
req.hasStorageType() ?
PBHelperClient.convertStorageType(req.getStorageType()): null);
PBHelper.convertStorageType(req.getStorageType()): null);
return VOID_SETQUOTA_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
@ -992,7 +992,7 @@ public GetDelegationTokenResponseProto getDelegationToken(
GetDelegationTokenResponseProto.Builder rspBuilder =
GetDelegationTokenResponseProto.newBuilder();
if (token != null) {
rspBuilder.setToken(PBHelperClient.convert(token));
rspBuilder.setToken(PBHelper.convert(token));
}
return rspBuilder.build();
} catch (IOException e) {

View File

@ -390,7 +390,7 @@ public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder()
.setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder)
.setB(PBHelper.convert(b)).setSrc(src).setHolder(holder)
.setFileId(fileId).build();
try {
rpcProxy.abandonBlock(null, req);
@ -409,9 +409,9 @@ public LocatedBlock addBlock(String src, String clientName,
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
.setSrc(src).setClientName(clientName).setFileId(fileId);
if (previous != null)
req.setPrevious(PBHelperClient.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes));
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
if (favoredNodes != null) {
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
@ -433,10 +433,10 @@ public LocatedBlock getAdditionalDatanode(String src, long fileId,
.newBuilder()
.setSrc(src)
.setFileId(fileId)
.setBlk(PBHelperClient.convert(blk))
.addAllExistings(PBHelperClient.convert(existings))
.setBlk(PBHelper.convert(blk))
.addAllExistings(PBHelper.convert(existings))
.addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
.addAllExcludes(PBHelperClient.convert(excludes))
.addAllExcludes(PBHelper.convert(excludes))
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)
.build();
@ -458,7 +458,7 @@ public boolean complete(String src, String clientName,
.setClientName(clientName)
.setFileId(fileId);
if (last != null)
req.setLast(PBHelperClient.convert(last));
req.setLast(PBHelper.convert(last));
try {
return rpcProxy.complete(null, req.build()).getResult();
} catch (ServiceException e) {
@ -817,7 +817,7 @@ public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
.setNamespaceQuota(namespaceQuota)
.setStoragespaceQuota(storagespaceQuota);
if (type != null) {
builder.setStorageType(PBHelperClient.convertStorageType(type));
builder.setStorageType(PBHelper.convertStorageType(type));
}
final SetQuotaRequestProto req = builder.build();
try {
@ -895,7 +895,7 @@ public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
String clientName) throws IOException {
UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto
.newBuilder()
.setBlock(PBHelperClient.convert(block))
.setBlock(PBHelper.convert(block))
.setClientName(clientName)
.build();
try {
@ -911,8 +911,8 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException {
UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder()
.setClientName(clientName)
.setOldBlock(PBHelperClient.convert(oldBlock))
.setNewBlock(PBHelperClient.convert(newBlock))
.setOldBlock(PBHelper.convert(oldBlock))
.setNewBlock(PBHelper.convert(newBlock))
.addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes)))
.addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs))
.build();
@ -943,7 +943,7 @@ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
RenewDelegationTokenRequestProto req = RenewDelegationTokenRequestProto.newBuilder().
setToken(PBHelperClient.convert(token)).
setToken(PBHelper.convert(token)).
build();
try {
return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime();
@ -957,7 +957,7 @@ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
.newBuilder()
.setToken(PBHelperClient.convert(token))
.setToken(PBHelper.convert(token))
.build();
try {
rpcProxy.cancelDelegationToken(null, req);

View File

@ -298,11 +298,11 @@ public void commitBlockSynchronization(ExtendedBlock block,
) throws IOException {
CommitBlockSynchronizationRequestProto.Builder builder =
CommitBlockSynchronizationRequestProto.newBuilder()
.setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp)
.setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
.setNewLength(newlength).setCloseFile(closeFile)
.setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) {
builder.addNewTaragets(PBHelperClient.convert(newtargets[i]));
builder.addNewTaragets(PBHelper.convert(newtargets[i]));
builder.addNewTargetStorages(newtargetstorages[i]);
}
CommitBlockSynchronizationRequestProto req = builder.build();

View File

@ -105,7 +105,7 @@ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newBlockId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelperClient.convert(oldBlock))
.setBlock(PBHelper.convert(oldBlock))
.setNewLength(newLength).setNewBlockId(newBlockId)
.setRecoveryId(recoveryId).build();
try {

View File

@ -101,7 +101,7 @@ public Object getUnderlyingProxyObject() {
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size)
.build();
try {
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)

View File

@ -347,7 +347,7 @@ public static StorageTypesProto convert(StorageType[] types) {
if (types == null || types.length == 0) {
return null;
}
List<StorageTypeProto> list = PBHelperClient.convertStorageTypes(types);
List<StorageTypeProto> list = convertStorageTypes(types);
return StorageTypesProto.newBuilder().addAllStorageTypes(list).build();
}
@ -382,6 +382,20 @@ public static DatanodeID convert(DatanodeIDProto dn) {
.getInfoSecurePort() : 0, dn.getIpcPort());
}
public static DatanodeIDProto convert(DatanodeID dn) {
// For wire compatibility with older versions we transmit the StorageID
// which is the same as the DatanodeUuid. Since StorageID is a required
// field we pass the empty string if the DatanodeUuid is not yet known.
return DatanodeIDProto.newBuilder()
.setIpAddr(dn.getIpAddr())
.setHostName(dn.getHostName())
.setXferPort(dn.getXferPort())
.setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
.setInfoPort(dn.getInfoPort())
.setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
}
// Arrays of DatanodeId
public static DatanodeIDProto[] convert(DatanodeID[] did) {
if (did == null)
@ -389,7 +403,7 @@ public static DatanodeIDProto[] convert(DatanodeID[] did) {
final int len = did.length;
DatanodeIDProto[] result = new DatanodeIDProto[len];
for (int i = 0; i < len; ++i) {
result[i] = PBHelperClient.convert(did[i]);
result[i] = convert(did[i]);
}
return result;
}
@ -420,7 +434,7 @@ public static BlockWithLocationsProto convert(BlockWithLocations blk) {
.setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
.addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()))
.addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
.build();
}
@ -582,6 +596,16 @@ public static ExtendedBlock convert(ExtendedBlockProto eb) {
eb.getGenerationStamp());
}
public static ExtendedBlockProto convert(final ExtendedBlock b) {
if (b == null) return null;
return ExtendedBlockProto.newBuilder().
setPoolId(b.getBlockPoolId()).
setBlockId(b.getBlockId()).
setNumBytes(b.getNumBytes()).
setGenerationStamp(b.getGenerationStamp()).
build();
}
public static RecoveringBlockProto convert(RecoveringBlock b) {
if (b == null) {
return null;
@ -602,6 +626,17 @@ public static RecoveringBlock convert(RecoveringBlockProto b) {
new RecoveringBlock(block, locs, b.getNewGenStamp());
}
public static DatanodeInfoProto.AdminState convert(
final DatanodeInfo.AdminStates inAs) {
switch (inAs) {
case NORMAL: return DatanodeInfoProto.AdminState.NORMAL;
case DECOMMISSION_INPROGRESS:
return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
default: return DatanodeInfoProto.AdminState.NORMAL;
}
}
static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) return null;
return new DatanodeInfo(
@ -613,6 +648,12 @@ static public DatanodeInfo convert(DatanodeInfoProto di) {
di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
}
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null;
return convert(di);
}
static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
if (di == null) return null;
DatanodeInfo[] result = new DatanodeInfo[di.length];
@ -622,6 +663,27 @@ static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
return result;
}
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos) {
return convert(dnInfos, 0);
}
/**
* Copy from {@code dnInfos} to a target of list of same size starting at
* {@code startIdx}.
*/
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos, int startIdx) {
if (dnInfos == null)
return null;
ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
.newArrayListWithCapacity(dnInfos.length);
for (int i = startIdx; i < dnInfos.length; i++) {
protos.add(convert(dnInfos[i]));
}
return protos;
}
public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
DatanodeInfo[] info = new DatanodeInfo[list.size()];
for (int i = 0; i < info.length; i++) {
@ -629,11 +691,32 @@ public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
}
return info;
}
public static DatanodeInfoProto convert(DatanodeInfo info) {
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
if (info.getNetworkLocation() != null) {
builder.setLocation(info.getNetworkLocation());
}
builder
.setId(PBHelper.convert((DatanodeID)info))
.setCapacity(info.getCapacity())
.setDfsUsed(info.getDfsUsed())
.setRemaining(info.getRemaining())
.setBlockPoolUsed(info.getBlockPoolUsed())
.setCacheCapacity(info.getCacheCapacity())
.setCacheUsed(info.getCacheUsed())
.setLastUpdate(info.getLastUpdate())
.setLastUpdateMonotonic(info.getLastUpdateMonotonic())
.setXceiverCount(info.getXceiverCount())
.setAdminState(PBHelper.convert(info.getAdminState()))
.build();
return builder.build();
}
public static DatanodeStorageReportProto convertDatanodeStorageReport(
DatanodeStorageReport report) {
return DatanodeStorageReportProto.newBuilder()
.setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo()))
.setDatanodeInfo(convert(report.getDatanodeInfo()))
.addAllStorageReports(convertStorageReports(report.getStorageReports()))
.build();
}
@ -685,7 +768,7 @@ public static LocatedBlockProto convert(LocatedBlock b) {
Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
for (int i = 0; i < locs.length; i++) {
DatanodeInfo loc = locs[i];
builder.addLocs(i, PBHelperClient.convert(loc));
builder.addLocs(i, PBHelper.convert(loc));
boolean locIsCached = cachedLocs.contains(loc);
builder.addIsCached(locIsCached);
if (locIsCached) {
@ -699,7 +782,7 @@ public static LocatedBlockProto convert(LocatedBlock b) {
StorageType[] storageTypes = b.getStorageTypes();
if (storageTypes != null) {
for (int i = 0; i < storageTypes.length; ++i) {
builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i]));
builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
}
}
final String[] storageIDs = b.getStorageIDs();
@ -707,8 +790,8 @@ public static LocatedBlockProto convert(LocatedBlock b) {
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
return builder.setB(PBHelperClient.convert(b.getBlock()))
.setBlockToken(PBHelperClient.convert(b.getBlockToken()))
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
@ -749,6 +832,14 @@ public static LocatedBlock convert(LocatedBlockProto proto) {
return lb;
}
public static TokenProto convert(Token<?> tok) {
return TokenProto.newBuilder().
setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
setPassword(ByteString.copyFrom(tok.getPassword())).
setKind(tok.getKind().toString()).
setService(tok.getService().toString()).build();
}
public static Token<BlockTokenIdentifier> convert(
TokenProto blockToken) {
return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
@ -800,7 +891,7 @@ public static DatanodeRegistrationProto convert(
DatanodeRegistration registration) {
DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
.newBuilder();
return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) registration))
return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
.setStorageInfo(PBHelper.convert(registration.getStorageInfo()))
.setKeys(PBHelper.convert(registration.getExportedKeys()))
.setSoftwareVersion(registration.getSoftwareVersion()).build();
@ -892,7 +983,7 @@ private static List<StorageTypesProto> convert(StorageType[][] types) {
if (types != null) {
for (StorageType[] ts : types) {
StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts));
builder.addAllStorageTypes(convertStorageTypes(ts));
list.add(builder.build());
}
}
@ -923,7 +1014,7 @@ private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
for (int i = 0; i < targets.length; i++) {
ret[i] = DatanodeInfosProto.newBuilder()
.addAllDatanodes(PBHelperClient.convert(targets[i])).build();
.addAllDatanodes(PBHelper.convert(targets[i])).build();
}
return Arrays.asList(ret);
}
@ -1247,7 +1338,7 @@ public static FsServerDefaults convert(FsServerDefaultsProto fs) {
fs.getFileBufferSize(),
fs.getEncryptDataTransfer(),
fs.getTrashInterval(),
PBHelperClient.convert(fs.getChecksumType()));
PBHelper.convert(fs.getChecksumType()));
}
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@ -1260,7 +1351,7 @@ public static FsServerDefaultsProto convert(FsServerDefaults fs) {
.setFileBufferSize(fs.getFileBufferSize())
.setEncryptDataTransfer(fs.getEncryptDataTransfer())
.setTrashInterval(fs.getTrashInterval())
.setChecksumType(PBHelperClient.convert(fs.getChecksumType()))
.setChecksumType(PBHelper.convert(fs.getChecksumType()))
.build();
}
@ -1648,7 +1739,7 @@ public static ContentSummary convert(ContentSummaryProto cs) {
if (cs.hasTypeQuotaInfos()) {
for (HdfsProtos.StorageTypeQuotaInfoProto info :
cs.getTypeQuotaInfos().getTypeQuotaInfoList()) {
StorageType type = PBHelperClient.convertStorageType(info.getType());
StorageType type = PBHelper.convertStorageType(info.getType());
builder.typeConsumed(type, info.getConsumed());
builder.typeQuota(type, info.getQuota());
}
@ -1672,7 +1763,7 @@ public static ContentSummaryProto convert(ContentSummary cs) {
for (StorageType t: StorageType.getTypesSupportingQuota()) {
HdfsProtos.StorageTypeQuotaInfoProto info =
HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
setType(PBHelperClient.convertStorageType(t)).
setType(convertStorageType(t)).
setConsumed(cs.getTypeConsumed(t)).
setQuota(cs.getTypeQuota(t)).
build();
@ -1717,7 +1808,7 @@ public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) {
public static DatanodeStorageProto convert(DatanodeStorage s) {
return DatanodeStorageProto.newBuilder()
.setState(PBHelper.convertState(s.getState()))
.setStorageType(PBHelperClient.convertStorageType(s.getStorageType()))
.setStorageType(PBHelper.convertStorageType(s.getStorageType()))
.setStorageUuid(s.getStorageID()).build();
}
@ -1731,10 +1822,44 @@ private static StorageState convertState(State state) {
}
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types) {
return convertStorageTypes(types, 0);
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types, int startIdx) {
if (types == null) {
return null;
}
final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
types.length);
for (int i = startIdx; i < types.length; ++i) {
protos.add(convertStorageType(types[i]));
}
return protos;
}
public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
case SSD:
return StorageTypeProto.SSD;
case ARCHIVE:
return StorageTypeProto.ARCHIVE;
case RAM_DISK:
return StorageTypeProto.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
}
}
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
PBHelperClient.convertStorageType(s.getStorageType()));
PBHelper.convertStorageType(s.getStorageType()));
}
private static State convertState(StorageState state) {
@ -1747,6 +1872,22 @@ private static State convertState(StorageState state) {
}
}
public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
case SSD:
return StorageType.SSD;
case ARCHIVE:
return StorageType.ARCHIVE;
case RAM_DISK:
return StorageType.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
}
}
public static StorageType[] convertStorageTypes(
List<StorageTypeProto> storageTypesList, int expectedSize) {
final StorageType[] storageTypes = new StorageType[expectedSize];
@ -1755,7 +1896,7 @@ public static StorageType[] convertStorageTypes(
Arrays.fill(storageTypes, StorageType.DEFAULT);
} else {
for (int i = 0; i < storageTypes.length; ++i) {
storageTypes[i] = PBHelperClient.convertStorageType(storageTypesList.get(i));
storageTypes[i] = convertStorageType(storageTypesList.get(i));
}
}
return storageTypes;
@ -1939,6 +2080,10 @@ public static SnapshotDiffReportProto convert(SnapshotDiffReport report) {
return reportProto;
}
public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static CacheDirectiveInfoProto convert
(CacheDirectiveInfo info) {
CacheDirectiveInfoProto.Builder builder =
@ -2111,6 +2256,9 @@ public static CachePoolEntry convert (CachePoolEntryProto proto) {
return new CachePoolEntry(info, stats);
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) {
DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder();
@ -2125,6 +2273,17 @@ public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) {
proto.getConfigVersion(), proto.getUptime());
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}
int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
private static AclEntryScopeProto convert(AclEntryScope v) {
return AclEntryScopeProto.valueOf(v.ordinal());
@ -2348,11 +2507,30 @@ public static EncryptionZone convert(EncryptionZoneProto proto) {
proto.getKeyName());
}
public static ShortCircuitShmSlotProto convert(SlotId slotId) {
return ShortCircuitShmSlotProto.newBuilder().
setShmId(convert(slotId.getShmId())).
setSlotIdx(slotId.getSlotIdx()).
build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
return ShortCircuitShmIdProto.newBuilder().
setHi(shmId.getHi()).
setLo(shmId.getLo()).
build();
}
public static SlotId convert(ShortCircuitShmSlotProto slotId) {
return new SlotId(PBHelperClient.convert(slotId.getShmId()),
return new SlotId(PBHelper.convert(slotId.getShmId()),
slotId.getSlotIdx());
}
public static ShmId convert(ShortCircuitShmIdProto shmId) {
return new ShmId(shmId.getHi(), shmId.getLo());
}
private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType
type) {
switch (type) {
@ -2859,6 +3037,18 @@ public static FileEncryptionInfo convert(
ezKeyVersionName);
}
public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
List<Boolean> pinnings = new ArrayList<Boolean>();
if (targetPinnings == null) {
pinnings.add(Boolean.FALSE);
} else {
for (; idx < targetPinnings.length; ++idx) {
pinnings.add(Boolean.valueOf(targetPinnings[idx]));
}
}
return pinnings;
}
public static boolean[] convertBooleanList(
List<Boolean> targetPinningsList) {
final boolean[] targetPinnings = new boolean[targetPinningsList.size()];

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

View File

@ -137,7 +137,7 @@
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
@ -2172,7 +2172,7 @@ public void run() {
// read ack
if (isClient) {
DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
PBHelperClient.vintPrefixed(in));
PBHelper.vintPrefixed(in));
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
}

View File

@ -70,7 +70,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
@ -427,7 +427,7 @@ private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
throws IOException {
DataNodeFaultInjector.get().sendShortCircuitShmResponse();
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
setId(PBHelperClient.convert(shmInfo.shmId)).build().
setId(PBHelper.convert(shmInfo.shmId)).build().
writeDelimitedTo(socketOut);
// Send the file descriptor for the shared memory segment.
byte buf[] = new byte[] { (byte)0 };
@ -559,7 +559,7 @@ public void readBlock(final ExtendedBlock block,
// to respond with a Status enum.
try {
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
PBHelperClient.vintPrefixed(in));
PBHelper.vintPrefixed(in));
if (!stat.hasStatus()) {
LOG.warn("Client " + peer.getRemoteAddressString() +
" did not send a valid status code after reading. " +
@ -745,7 +745,7 @@ public void writeBlock(final ExtendedBlock block,
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@ -962,7 +962,7 @@ public void blockChecksum(final ExtendedBlock block,
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
.setCrcType(PBHelperClient.convert(checksum.getChecksumType())))
.setCrcType(PBHelper.convert(checksum.getChecksumType())))
.build()
.writeDelimitedTo(out);
out.flush();
@ -1147,8 +1147,8 @@ public void replaceBlock(final ExtendedBlock block,
// receive the response from the proxy
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(proxyReply));
PBHelper.vintPrefixed(proxyReply));
String logInfo = "copy block " + block + " from "
+ proxySock.getRemoteSocketAddress();
DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);

View File

@ -42,7 +42,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -155,7 +154,7 @@ public static ImmutableList<QuotaByStorageTypeEntry> loadQuotaByStorageTypeEntri
QuotaByStorageTypeFeatureProto proto) {
ImmutableList.Builder<QuotaByStorageTypeEntry> b = ImmutableList.builder();
for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) {
StorageType type = PBHelperClient.convertStorageType(quotaEntry.getStorageType());
StorageType type = PBHelper.convertStorageType(quotaEntry.getStorageType());
long quota = quotaEntry.getQuota();
b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type)
.setQuota(quota).build());
@ -460,7 +459,7 @@ private static XAttrFeatureProto.Builder buildXAttrs(XAttrFeature f,
if (q.getTypeSpace(t) >= 0) {
QuotaByStorageTypeEntryProto.Builder eb =
QuotaByStorageTypeEntryProto.newBuilder().
setStorageType(PBHelperClient.convertStorageType(t)).
setStorageType(PBHelper.convertStorageType(t)).
setQuota(q.getTypeSpace(t));
b.addQuotas(eb);
}

View File

@ -30,6 +30,8 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.DomainPeer;
@ -37,18 +39,17 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages short-circuit memory segments for an HDFS client.
*
@ -62,8 +63,7 @@
*/
@InterfaceAudience.Private
public class DfsClientShmManager implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(
DfsClientShmManager.class);
private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
/**
* Manages short-circuit memory segments that pertain to a given DataNode.
@ -168,7 +168,7 @@ private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
new Sender(out).requestShortCircuitShm(clientName);
ShortCircuitShmResponseProto resp =
ShortCircuitShmResponseProto.parseFrom(
PBHelperClient.vintPrefixed(peer.getInputStream()));
PBHelper.vintPrefixed(peer.getInputStream()));
String error = resp.hasError() ? resp.getError() : "(unknown)";
switch (resp.getStatus()) {
case SUCCESS:
@ -185,18 +185,14 @@ private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
}
try {
DfsClientShm shm =
new DfsClientShm(PBHelperClient.convert(resp.getId()),
new DfsClientShm(PBHelper.convert(resp.getId()),
fis[0], this, peer);
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": createNewShm: created " + shm);
}
return shm;
} finally {
try {
fis[0].close();
} catch (Throwable e) {
LOG.debug("Exception in closing " + fis[0], e);
}
IOUtils.cleanup(LOG, fis[0]);
}
case ERROR_UNSUPPORTED:
// The DataNode just does not support short-circuit shared memory
@ -501,11 +497,7 @@ public void close() throws IOException {
}
// When closed, the domainSocketWatcher will issue callbacks that mark
// all the outstanding DfsClientShm segments as stale.
try {
domainSocketWatcher.close();
} catch (Throwable e) {
LOG.debug("Exception in closing " + domainSocketWatcher, e);
}
IOUtils.cleanup(LOG, domainSocketWatcher);
}

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException;
@ -201,7 +201,7 @@ public void run() {
DataInputStream in = new DataInputStream(sock.getInputStream());
ReleaseShortCircuitAccessResponseProto resp =
ReleaseShortCircuitAccessResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
PBHelper.vintPrefixed(in));
if (resp.getStatus() != Status.SUCCESS) {
String error = resp.hasError() ? resp.getError() : "(unknown)";
throw new IOException(resp.getStatus().toString() + ": " + error);

View File

@ -27,6 +27,8 @@
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -34,9 +36,6 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;
import com.google.common.base.Preconditions;
@ -47,7 +46,7 @@
* A shared memory segment used to implement short-circuit reads.
*/
public class ShortCircuitShm {
private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class);
private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class);
protected static final int BYTES_PER_SLOT = 64;

View File

@ -153,7 +153,7 @@ public void testConvertNamenodeRegistration() {
@Test
public void testConvertDatanodeID() {
DatanodeID dn = DFSTestUtil.getLocalDatanodeID();
DatanodeIDProto dnProto = PBHelperClient.convert(dn);
DatanodeIDProto dnProto = PBHelper.convert(dn);
DatanodeID dn2 = PBHelper.convert(dnProto);
compare(dn, dn2);
}
@ -332,12 +332,12 @@ private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
@Test
public void testConvertExtendedBlock() {
ExtendedBlock b = getExtendedBlock();
ExtendedBlockProto bProto = PBHelperClient.convert(b);
ExtendedBlockProto bProto = PBHelper.convert(b);
ExtendedBlock b1 = PBHelper.convert(bProto);
assertEquals(b, b1);
b.setBlockId(-1);
bProto = PBHelperClient.convert(b);
bProto = PBHelper.convert(b);
b1 = PBHelper.convert(bProto);
assertEquals(b, b1);
}
@ -398,7 +398,7 @@ public void testConvertBlockToken() {
Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service"));
TokenProto tokenProto = PBHelperClient.convert(token);
TokenProto tokenProto = PBHelper.convert(token);
Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto);
compare(token, token2);
}
@ -592,16 +592,16 @@ public void testConvertBlockCommand() {
@Test
public void testChecksumTypeProto() {
assertEquals(DataChecksum.Type.NULL,
PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
assertEquals(DataChecksum.Type.CRC32,
PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
assertEquals(DataChecksum.Type.CRC32C,
PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
assertEquals(PBHelperClient.convert(DataChecksum.Type.NULL),
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
assertEquals(PBHelper.convert(DataChecksum.Type.NULL),
HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32),
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32C),
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
}