HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-08-22 13:30:19 -07:00
parent 8c205267c8
commit 8e4afa3a67
38 changed files with 388 additions and 312 deletions

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@ -32,7 +30,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationH
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.security.token.Token;
@ -60,7 +58,7 @@ public abstract class DataTransferProtoUtil {
}
public static ChecksumProto toProto(DataChecksum checksum) {
ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
ChecksumTypeProto type = PBHelperClient.convert(checksum.getChecksumType());
// ChecksumType#valueOf never returns null
return ChecksumProto.newBuilder()
.setBytesPerChecksum(checksum.getBytesPerChecksum())
@ -72,7 +70,7 @@ public abstract class DataTransferProtoUtil {
if (proto == null) return null;
int bytesPerChecksum = proto.getBytesPerChecksum();
DataChecksum.Type type = PBHelper.convert(proto.getType());
DataChecksum.Type type = PBHelperClient.convert(proto.getType());
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
}
@ -89,8 +87,8 @@ public abstract class DataTransferProtoUtil {
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken) {
BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder()
.setBlock(PBHelper.convert(blk))
.setToken(PBHelper.convert(blockToken));
.setBlock(PBHelperClient.convert(blk))
.setToken(PBHelperClient.convert(blockToken));
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
@ -32,13 +30,16 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Transfer data to/from datanode using a streaming protocol.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface DataTransferProtocol {
public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class);
/** Version for data transfers between clients and datanodes
* This should change when serialization of DatanodeInfo, not just

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@ -140,9 +140,9 @@ public class Sender implements DataTransferProtocol {
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.setStorageType(PBHelper.convertStorageType(storageType))
.addAllTargets(PBHelper.convert(targets, 1))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStorageType(PBHelperClient.convertStorageType(storageType))
.addAllTargets(PBHelperClient.convert(targets, 1))
.addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
@ -152,10 +152,10 @@ public class Sender implements DataTransferProtocol {
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist)
.setPinning(pinning)
.addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
.addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
proto.setSource(PBHelperClient.convertDatanodeInfo(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
@ -171,8 +171,8 @@ public class Sender implements DataTransferProtocol {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken))
.addAllTargets(PBHelper.convert(targets))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
.addAllTargets(PBHelperClient.convert(targets))
.addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@ -188,7 +188,7 @@ public class Sender implements DataTransferProtocol {
.setHeader(DataTransferProtoUtil.buildBaseHeader(
blk, blockToken)).setMaxVersion(maxVersion);
if (slotId != null) {
builder.setSlotId(PBHelper.convert(slotId));
builder.setSlotId(PBHelperClient.convert(slotId));
}
builder.setSupportsReceiptVerification(supportsReceiptVerification);
OpRequestShortCircuitAccessProto proto = builder.build();
@ -199,7 +199,7 @@ public class Sender implements DataTransferProtocol {
public void releaseShortCircuitFds(SlotId slotId) throws IOException {
ReleaseShortCircuitAccessRequestProto.Builder builder =
ReleaseShortCircuitAccessRequestProto.newBuilder().
setSlotId(PBHelper.convert(slotId));
setSlotId(PBHelperClient.convert(slotId));
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
@ -231,9 +231,9 @@ public class Sender implements DataTransferProtocol {
final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setStorageType(PBHelper.convertStorageType(storageType))
.setStorageType(PBHelperClient.convertStorageType(storageType))
.setDelHint(delHint)
.setSource(PBHelper.convertDatanodeInfo(source))
.setSource(PBHelperClient.convertDatanodeInfo(source))
.build();
send(out, Op.REPLACE_BLOCK, proto);

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Utilities for converting protobuf classes to and from implementation classes
* and other helper utilities to help in dealing with protobuf.
*
* Note that when converting from an internal type to protobuf type, the
* converter never return null for protobuf type. The check for internal type
* being null must be done before calling the convert() method.
*/
public class PBHelperClient {
private PBHelperClient() {
/** Hidden constructor */
}
public static ByteString getByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}
public static ShmId convert(ShortCircuitShmIdProto shmId) {
return new ShmId(shmId.getHi(), shmId.getLo());
}
public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static ExtendedBlockProto convert(final ExtendedBlock b) {
if (b == null) return null;
return ExtendedBlockProto.newBuilder().
setPoolId(b.getBlockPoolId()).
setBlockId(b.getBlockId()).
setNumBytes(b.getNumBytes()).
setGenerationStamp(b.getGenerationStamp()).
build();
}
public static TokenProto convert(Token<?> tok) {
return TokenProto.newBuilder().
setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
setPassword(ByteString.copyFrom(tok.getPassword())).
setKind(tok.getKind().toString()).
setService(tok.getService().toString()).build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
return ShortCircuitShmIdProto.newBuilder().
setHi(shmId.getHi()).
setLo(shmId.getLo()).
build();
}
public static ShortCircuitShmSlotProto convert(SlotId slotId) {
return ShortCircuitShmSlotProto.newBuilder().
setShmId(convert(slotId.getShmId())).
setSlotIdx(slotId.getSlotIdx()).
build();
}
public static DatanodeIDProto convert(DatanodeID dn) {
// For wire compatibility with older versions we transmit the StorageID
// which is the same as the DatanodeUuid. Since StorageID is a required
// field we pass the empty string if the DatanodeUuid is not yet known.
return DatanodeIDProto.newBuilder()
.setIpAddr(dn.getIpAddr())
.setHostName(dn.getHostName())
.setXferPort(dn.getXferPort())
.setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
.setInfoPort(dn.getInfoPort())
.setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
}
public static DatanodeInfoProto.AdminState convert(
final DatanodeInfo.AdminStates inAs) {
switch (inAs) {
case NORMAL: return DatanodeInfoProto.AdminState.NORMAL;
case DECOMMISSION_INPROGRESS:
return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
default: return DatanodeInfoProto.AdminState.NORMAL;
}
}
public static DatanodeInfoProto convert(DatanodeInfo info) {
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
if (info.getNetworkLocation() != null) {
builder.setLocation(info.getNetworkLocation());
}
builder
.setId(convert((DatanodeID) info))
.setCapacity(info.getCapacity())
.setDfsUsed(info.getDfsUsed())
.setRemaining(info.getRemaining())
.setBlockPoolUsed(info.getBlockPoolUsed())
.setCacheCapacity(info.getCacheCapacity())
.setCacheUsed(info.getCacheUsed())
.setLastUpdate(info.getLastUpdate())
.setLastUpdateMonotonic(info.getLastUpdateMonotonic())
.setXceiverCount(info.getXceiverCount())
.setAdminState(convert(info.getAdminState()))
.build();
return builder.build();
}
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos) {
return convert(dnInfos, 0);
}
/**
* Copy from {@code dnInfos} to a target of list of same size starting at
* {@code startIdx}.
*/
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos, int startIdx) {
if (dnInfos == null)
return null;
ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
.newArrayListWithCapacity(dnInfos.length);
for (int i = startIdx; i < dnInfos.length; i++) {
protos.add(convert(dnInfos[i]));
}
return protos;
}
public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
List<Boolean> pinnings = new ArrayList<>();
if (targetPinnings == null) {
pinnings.add(Boolean.FALSE);
} else {
for (; idx < targetPinnings.length; ++idx) {
pinnings.add(targetPinnings[idx]);
}
}
return pinnings;
}
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null;
return convert(di);
}
public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
case SSD:
return StorageTypeProto.SSD;
case ARCHIVE:
return StorageTypeProto.ARCHIVE;
case RAM_DISK:
return StorageTypeProto.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
}
}
public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
case SSD:
return StorageType.SSD;
case ARCHIVE:
return StorageType.ARCHIVE;
case RAM_DISK:
return StorageType.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
}
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types) {
return convertStorageTypes(types, 0);
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types, int startIdx) {
if (types == null) {
return null;
}
final List<StorageTypeProto> protos = new ArrayList<>(
types.length);
for (int i = startIdx; i < types.length; ++i) {
protos.add(PBHelperClient.convertStorageType(types[i]));
}
return protos;
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}
int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
}

View File

@ -30,8 +30,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.DomainPeer;
@ -39,17 +37,18 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages short-circuit memory segments for an HDFS client.
*
@ -63,7 +62,8 @@ import com.google.common.base.Preconditions;
*/
@InterfaceAudience.Private
public class DfsClientShmManager implements Closeable {
private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
private static final Logger LOG = LoggerFactory.getLogger(
DfsClientShmManager.class);
/**
* Manages short-circuit memory segments that pertain to a given DataNode.
@ -168,7 +168,7 @@ public class DfsClientShmManager implements Closeable {
new Sender(out).requestShortCircuitShm(clientName);
ShortCircuitShmResponseProto resp =
ShortCircuitShmResponseProto.parseFrom(
PBHelper.vintPrefixed(peer.getInputStream()));
PBHelperClient.vintPrefixed(peer.getInputStream()));
String error = resp.hasError() ? resp.getError() : "(unknown)";
switch (resp.getStatus()) {
case SUCCESS:
@ -185,14 +185,18 @@ public class DfsClientShmManager implements Closeable {
}
try {
DfsClientShm shm =
new DfsClientShm(PBHelper.convert(resp.getId()),
new DfsClientShm(PBHelperClient.convert(resp.getId()),
fis[0], this, peer);
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": createNewShm: created " + shm);
}
return shm;
} finally {
IOUtils.cleanup(LOG, fis[0]);
try {
fis[0].close();
} catch (Throwable e) {
LOG.debug("Exception in closing " + fis[0], e);
}
}
case ERROR_UNSUPPORTED:
// The DataNode just does not support short-circuit shared memory
@ -497,7 +501,11 @@ public class DfsClientShmManager implements Closeable {
}
// When closed, the domainSocketWatcher will issue callbacks that mark
// all the outstanding DfsClientShm segments as stale.
IOUtils.cleanup(LOG, domainSocketWatcher);
try {
domainSocketWatcher.close();
} catch (Throwable e) {
LOG.debug("Exception in closing " + domainSocketWatcher, e);
}
}

View File

@ -27,8 +27,6 @@ import java.util.Random;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -36,6 +34,9 @@ import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;
import com.google.common.base.Preconditions;
@ -46,7 +47,7 @@ import com.google.common.primitives.Ints;
* A shared memory segment used to implement short-circuit reads.
*/
public class ShortCircuitShm {
private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class);
private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class);
protected static final int BYTES_PER_SLOT = 64;

View File

@ -482,6 +482,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8823. Move replication factor into individual blocks. (wheat9)
HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -592,7 +592,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
failureInjector.getSupportsReceiptVerification());
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
PBHelperClient.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) {

View File

@ -149,7 +149,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -1928,7 +1928,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
String logInfo = "for block " + block + " from datanode " + datanodes[j];
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
@ -1960,7 +1960,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelper.convert(checksumData
ct = PBHelperClient.convert(checksumData
.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
@ -2088,11 +2088,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
} finally {
IOUtils.cleanup(null, pair.in, pair.out);
}

View File

@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -1245,7 +1245,7 @@ class DataStreamer extends Daemon {
//ack
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
}
@ -1524,7 +1524,7 @@ class DataStreamer extends Daemon {
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(blockReplyStream));
PBHelperClient.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@ -414,7 +414,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
new BufferedInputStream(peer.getInputStream(), bufferSize));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
PBHelperClient.vintPrefixed(in));
RemoteBlockReader2.checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@ -417,7 +417,7 @@ public class RemoteBlockReader2 implements BlockReader {
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
PBHelperClient.vintPrefixed(in));
checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.IOException;
import java.io.InputStream;

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.DataInputStream;
import java.io.IOException;
@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.htrace.TraceScope;
@ -136,7 +137,7 @@ public abstract class Receiver implements DataTransferProtocol {
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
@ -228,7 +229,7 @@ public abstract class Receiver implements DataTransferProtocol {
proto.getClass().getSimpleName());
try {
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));

View File

@ -24,7 +24,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.IOException;
import java.io.InputStream;

View File

@ -137,7 +137,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
return GetBlockLocalPathInfoResponseProto.newBuilder()
.setBlock(PBHelper.convert(resp.getBlock()))
.setBlock(PBHelperClient.convert(resp.getBlock()))
.setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
.build();
}

View File

@ -185,7 +185,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
@Override
public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
.newBuilder().setBlock(PBHelper.convert(b)).build();
.newBuilder().setBlock(PBHelperClient.convert(b)).build();
try {
return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
} catch (ServiceException e) {
@ -218,8 +218,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
Token<BlockTokenIdentifier> token) throws IOException {
GetBlockLocalPathInfoRequestProto req =
GetBlockLocalPathInfoRequestProto.newBuilder()
.setBlock(PBHelper.convert(block))
.setToken(PBHelper.convert(token)).build();
.setBlock(PBHelperClient.convert(block))
.setToken(PBHelperClient.convert(token)).build();
GetBlockLocalPathInfoResponseProto resp;
try {
resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);

View File

@ -698,7 +698,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetDatanodeReportRequestProto req)
throws ServiceException {
try {
List<? extends DatanodeInfoProto> result = PBHelper.convert(server
List<? extends DatanodeInfoProto> result = PBHelperClient.convert(server
.getDatanodeReport(PBHelper.convert(req.getType())));
return GetDatanodeReportResponseProto.newBuilder()
.addAllDi(result).build();
@ -892,7 +892,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
server.setQuota(req.getPath(), req.getNamespaceQuota(),
req.getStoragespaceQuota(),
req.hasStorageType() ?
PBHelper.convertStorageType(req.getStorageType()): null);
PBHelperClient.convertStorageType(req.getStorageType()): null);
return VOID_SETQUOTA_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
@ -992,7 +992,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
GetDelegationTokenResponseProto.Builder rspBuilder =
GetDelegationTokenResponseProto.newBuilder();
if (token != null) {
rspBuilder.setToken(PBHelper.convert(token));
rspBuilder.setToken(PBHelperClient.convert(token));
}
return rspBuilder.build();
} catch (IOException e) {

View File

@ -390,7 +390,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
String holder) throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder()
.setB(PBHelper.convert(b)).setSrc(src).setHolder(holder)
.setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder)
.setFileId(fileId).build();
try {
rpcProxy.abandonBlock(null, req);
@ -409,9 +409,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
.setSrc(src).setClientName(clientName).setFileId(fileId);
if (previous != null)
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
req.setPrevious(PBHelperClient.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes));
if (favoredNodes != null) {
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
@ -433,10 +433,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
.newBuilder()
.setSrc(src)
.setFileId(fileId)
.setBlk(PBHelper.convert(blk))
.addAllExistings(PBHelper.convert(existings))
.setBlk(PBHelperClient.convert(blk))
.addAllExistings(PBHelperClient.convert(existings))
.addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
.addAllExcludes(PBHelper.convert(excludes))
.addAllExcludes(PBHelperClient.convert(excludes))
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)
.build();
@ -458,7 +458,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName)
.setFileId(fileId);
if (last != null)
req.setLast(PBHelper.convert(last));
req.setLast(PBHelperClient.convert(last));
try {
return rpcProxy.complete(null, req.build()).getResult();
} catch (ServiceException e) {
@ -817,7 +817,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setNamespaceQuota(namespaceQuota)
.setStoragespaceQuota(storagespaceQuota);
if (type != null) {
builder.setStorageType(PBHelper.convertStorageType(type));
builder.setStorageType(PBHelperClient.convertStorageType(type));
}
final SetQuotaRequestProto req = builder.build();
try {
@ -895,7 +895,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
String clientName) throws IOException {
UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto
.newBuilder()
.setBlock(PBHelper.convert(block))
.setBlock(PBHelperClient.convert(block))
.setClientName(clientName)
.build();
try {
@ -911,8 +911,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException {
UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder()
.setClientName(clientName)
.setOldBlock(PBHelper.convert(oldBlock))
.setNewBlock(PBHelper.convert(newBlock))
.setOldBlock(PBHelperClient.convert(oldBlock))
.setNewBlock(PBHelperClient.convert(newBlock))
.addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes)))
.addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs))
.build();
@ -943,7 +943,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
RenewDelegationTokenRequestProto req = RenewDelegationTokenRequestProto.newBuilder().
setToken(PBHelper.convert(token)).
setToken(PBHelperClient.convert(token)).
build();
try {
return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime();
@ -957,7 +957,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
.newBuilder()
.setToken(PBHelper.convert(token))
.setToken(PBHelperClient.convert(token))
.build();
try {
rpcProxy.cancelDelegationToken(null, req);

View File

@ -298,11 +298,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
) throws IOException {
CommitBlockSynchronizationRequestProto.Builder builder =
CommitBlockSynchronizationRequestProto.newBuilder()
.setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
.setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp)
.setNewLength(newlength).setCloseFile(closeFile)
.setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) {
builder.addNewTaragets(PBHelper.convert(newtargets[i]));
builder.addNewTaragets(PBHelperClient.convert(newtargets[i]));
builder.addNewTargetStorages(newtargetstorages[i]);
}
CommitBlockSynchronizationRequestProto req = builder.build();

View File

@ -105,7 +105,7 @@ public class InterDatanodeProtocolTranslatorPB implements
long recoveryId, long newBlockId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock))
.setBlock(PBHelperClient.convert(oldBlock))
.setNewLength(newLength).setNewBlockId(newBlockId)
.setRecoveryId(recoveryId).build();
try {

View File

@ -101,7 +101,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size)
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.build();
try {
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)

View File

@ -347,7 +347,7 @@ public class PBHelper {
if (types == null || types.length == 0) {
return null;
}
List<StorageTypeProto> list = convertStorageTypes(types);
List<StorageTypeProto> list = PBHelperClient.convertStorageTypes(types);
return StorageTypesProto.newBuilder().addAllStorageTypes(list).build();
}
@ -382,20 +382,6 @@ public class PBHelper {
.getInfoSecurePort() : 0, dn.getIpcPort());
}
public static DatanodeIDProto convert(DatanodeID dn) {
// For wire compatibility with older versions we transmit the StorageID
// which is the same as the DatanodeUuid. Since StorageID is a required
// field we pass the empty string if the DatanodeUuid is not yet known.
return DatanodeIDProto.newBuilder()
.setIpAddr(dn.getIpAddr())
.setHostName(dn.getHostName())
.setXferPort(dn.getXferPort())
.setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
.setInfoPort(dn.getInfoPort())
.setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
}
// Arrays of DatanodeId
public static DatanodeIDProto[] convert(DatanodeID[] did) {
if (did == null)
@ -403,7 +389,7 @@ public class PBHelper {
final int len = did.length;
DatanodeIDProto[] result = new DatanodeIDProto[len];
for (int i = 0; i < len; ++i) {
result[i] = convert(did[i]);
result[i] = PBHelperClient.convert(did[i]);
}
return result;
}
@ -434,7 +420,7 @@ public class PBHelper {
.setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
.addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
.addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()))
.build();
}
@ -596,16 +582,6 @@ public class PBHelper {
eb.getGenerationStamp());
}
public static ExtendedBlockProto convert(final ExtendedBlock b) {
if (b == null) return null;
return ExtendedBlockProto.newBuilder().
setPoolId(b.getBlockPoolId()).
setBlockId(b.getBlockId()).
setNumBytes(b.getNumBytes()).
setGenerationStamp(b.getGenerationStamp()).
build();
}
public static RecoveringBlockProto convert(RecoveringBlock b) {
if (b == null) {
return null;
@ -626,17 +602,6 @@ public class PBHelper {
new RecoveringBlock(block, locs, b.getNewGenStamp());
}
public static DatanodeInfoProto.AdminState convert(
final DatanodeInfo.AdminStates inAs) {
switch (inAs) {
case NORMAL: return DatanodeInfoProto.AdminState.NORMAL;
case DECOMMISSION_INPROGRESS:
return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
default: return DatanodeInfoProto.AdminState.NORMAL;
}
}
static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) return null;
return new DatanodeInfo(
@ -648,12 +613,6 @@ public class PBHelper {
di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
}
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null;
return convert(di);
}
static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
if (di == null) return null;
DatanodeInfo[] result = new DatanodeInfo[di.length];
@ -663,27 +622,6 @@ public class PBHelper {
return result;
}
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos) {
return convert(dnInfos, 0);
}
/**
* Copy from {@code dnInfos} to a target of list of same size starting at
* {@code startIdx}.
*/
public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
DatanodeInfo[] dnInfos, int startIdx) {
if (dnInfos == null)
return null;
ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
.newArrayListWithCapacity(dnInfos.length);
for (int i = startIdx; i < dnInfos.length; i++) {
protos.add(convert(dnInfos[i]));
}
return protos;
}
public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
DatanodeInfo[] info = new DatanodeInfo[list.size()];
for (int i = 0; i < info.length; i++) {
@ -691,32 +629,11 @@ public class PBHelper {
}
return info;
}
public static DatanodeInfoProto convert(DatanodeInfo info) {
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
if (info.getNetworkLocation() != null) {
builder.setLocation(info.getNetworkLocation());
}
builder
.setId(PBHelper.convert((DatanodeID)info))
.setCapacity(info.getCapacity())
.setDfsUsed(info.getDfsUsed())
.setRemaining(info.getRemaining())
.setBlockPoolUsed(info.getBlockPoolUsed())
.setCacheCapacity(info.getCacheCapacity())
.setCacheUsed(info.getCacheUsed())
.setLastUpdate(info.getLastUpdate())
.setLastUpdateMonotonic(info.getLastUpdateMonotonic())
.setXceiverCount(info.getXceiverCount())
.setAdminState(PBHelper.convert(info.getAdminState()))
.build();
return builder.build();
}
public static DatanodeStorageReportProto convertDatanodeStorageReport(
DatanodeStorageReport report) {
return DatanodeStorageReportProto.newBuilder()
.setDatanodeInfo(convert(report.getDatanodeInfo()))
.setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo()))
.addAllStorageReports(convertStorageReports(report.getStorageReports()))
.build();
}
@ -768,7 +685,7 @@ public class PBHelper {
Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
for (int i = 0; i < locs.length; i++) {
DatanodeInfo loc = locs[i];
builder.addLocs(i, PBHelper.convert(loc));
builder.addLocs(i, PBHelperClient.convert(loc));
boolean locIsCached = cachedLocs.contains(loc);
builder.addIsCached(locIsCached);
if (locIsCached) {
@ -782,7 +699,7 @@ public class PBHelper {
StorageType[] storageTypes = b.getStorageTypes();
if (storageTypes != null) {
for (int i = 0; i < storageTypes.length; ++i) {
builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i]));
}
}
final String[] storageIDs = b.getStorageIDs();
@ -790,8 +707,8 @@ public class PBHelper {
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
return builder.setB(PBHelperClient.convert(b.getBlock()))
.setBlockToken(PBHelperClient.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
@ -832,14 +749,6 @@ public class PBHelper {
return lb;
}
public static TokenProto convert(Token<?> tok) {
return TokenProto.newBuilder().
setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
setPassword(ByteString.copyFrom(tok.getPassword())).
setKind(tok.getKind().toString()).
setService(tok.getService().toString()).build();
}
public static Token<BlockTokenIdentifier> convert(
TokenProto blockToken) {
return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
@ -891,7 +800,7 @@ public class PBHelper {
DatanodeRegistration registration) {
DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
.newBuilder();
return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) registration))
.setStorageInfo(PBHelper.convert(registration.getStorageInfo()))
.setKeys(PBHelper.convert(registration.getExportedKeys()))
.setSoftwareVersion(registration.getSoftwareVersion()).build();
@ -983,7 +892,7 @@ public class PBHelper {
if (types != null) {
for (StorageType[] ts : types) {
StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
builder.addAllStorageTypes(convertStorageTypes(ts));
builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts));
list.add(builder.build());
}
}
@ -1014,7 +923,7 @@ public class PBHelper {
DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
for (int i = 0; i < targets.length; i++) {
ret[i] = DatanodeInfosProto.newBuilder()
.addAllDatanodes(PBHelper.convert(targets[i])).build();
.addAllDatanodes(PBHelperClient.convert(targets[i])).build();
}
return Arrays.asList(ret);
}
@ -1338,7 +1247,7 @@ public class PBHelper {
fs.getFileBufferSize(),
fs.getEncryptDataTransfer(),
fs.getTrashInterval(),
PBHelper.convert(fs.getChecksumType()));
PBHelperClient.convert(fs.getChecksumType()));
}
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@ -1351,7 +1260,7 @@ public class PBHelper {
.setFileBufferSize(fs.getFileBufferSize())
.setEncryptDataTransfer(fs.getEncryptDataTransfer())
.setTrashInterval(fs.getTrashInterval())
.setChecksumType(PBHelper.convert(fs.getChecksumType()))
.setChecksumType(PBHelperClient.convert(fs.getChecksumType()))
.build();
}
@ -1739,7 +1648,7 @@ public class PBHelper {
if (cs.hasTypeQuotaInfos()) {
for (HdfsProtos.StorageTypeQuotaInfoProto info :
cs.getTypeQuotaInfos().getTypeQuotaInfoList()) {
StorageType type = PBHelper.convertStorageType(info.getType());
StorageType type = PBHelperClient.convertStorageType(info.getType());
builder.typeConsumed(type, info.getConsumed());
builder.typeQuota(type, info.getQuota());
}
@ -1763,7 +1672,7 @@ public class PBHelper {
for (StorageType t: StorageType.getTypesSupportingQuota()) {
HdfsProtos.StorageTypeQuotaInfoProto info =
HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
setType(convertStorageType(t)).
setType(PBHelperClient.convertStorageType(t)).
setConsumed(cs.getTypeConsumed(t)).
setQuota(cs.getTypeQuota(t)).
build();
@ -1808,7 +1717,7 @@ public class PBHelper {
public static DatanodeStorageProto convert(DatanodeStorage s) {
return DatanodeStorageProto.newBuilder()
.setState(PBHelper.convertState(s.getState()))
.setStorageType(PBHelper.convertStorageType(s.getStorageType()))
.setStorageType(PBHelperClient.convertStorageType(s.getStorageType()))
.setStorageUuid(s.getStorageID()).build();
}
@ -1822,44 +1731,10 @@ public class PBHelper {
}
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types) {
return convertStorageTypes(types, 0);
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types, int startIdx) {
if (types == null) {
return null;
}
final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
types.length);
for (int i = startIdx; i < types.length; ++i) {
protos.add(convertStorageType(types[i]));
}
return protos;
}
public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
case SSD:
return StorageTypeProto.SSD;
case ARCHIVE:
return StorageTypeProto.ARCHIVE;
case RAM_DISK:
return StorageTypeProto.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
}
}
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
PBHelper.convertStorageType(s.getStorageType()));
PBHelperClient.convertStorageType(s.getStorageType()));
}
private static State convertState(StorageState state) {
@ -1872,22 +1747,6 @@ public class PBHelper {
}
}
public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
case SSD:
return StorageType.SSD;
case ARCHIVE:
return StorageType.ARCHIVE;
case RAM_DISK:
return StorageType.RAM_DISK;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
}
}
public static StorageType[] convertStorageTypes(
List<StorageTypeProto> storageTypesList, int expectedSize) {
final StorageType[] storageTypes = new StorageType[expectedSize];
@ -1896,7 +1755,7 @@ public class PBHelper {
Arrays.fill(storageTypes, StorageType.DEFAULT);
} else {
for (int i = 0; i < storageTypes.length; ++i) {
storageTypes[i] = convertStorageType(storageTypesList.get(i));
storageTypes[i] = PBHelperClient.convertStorageType(storageTypesList.get(i));
}
}
return storageTypes;
@ -2080,10 +1939,6 @@ public class PBHelper {
return reportProto;
}
public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static CacheDirectiveInfoProto convert
(CacheDirectiveInfo info) {
CacheDirectiveInfoProto.Builder builder =
@ -2256,9 +2111,6 @@ public class PBHelper {
return new CachePoolEntry(info, stats);
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) {
DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder();
@ -2273,17 +2125,6 @@ public class PBHelper {
proto.getConfigVersion(), proto.getUptime());
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}
int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
private static AclEntryScopeProto convert(AclEntryScope v) {
return AclEntryScopeProto.valueOf(v.ordinal());
@ -2507,30 +2348,11 @@ public class PBHelper {
proto.getKeyName());
}
public static ShortCircuitShmSlotProto convert(SlotId slotId) {
return ShortCircuitShmSlotProto.newBuilder().
setShmId(convert(slotId.getShmId())).
setSlotIdx(slotId.getSlotIdx()).
build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
return ShortCircuitShmIdProto.newBuilder().
setHi(shmId.getHi()).
setLo(shmId.getLo()).
build();
}
public static SlotId convert(ShortCircuitShmSlotProto slotId) {
return new SlotId(PBHelper.convert(slotId.getShmId()),
return new SlotId(PBHelperClient.convert(slotId.getShmId()),
slotId.getSlotIdx());
}
public static ShmId convert(ShortCircuitShmIdProto shmId) {
return new ShmId(shmId.getHi(), shmId.getLo());
}
private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType
type) {
switch (type) {
@ -3037,18 +2859,6 @@ public class PBHelper {
ezKeyVersionName);
}
public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
List<Boolean> pinnings = new ArrayList<Boolean>();
if (targetPinnings == null) {
pinnings.add(Boolean.FALSE);
} else {
for (; idx < targetPinnings.length; ++idx) {
pinnings.add(Boolean.valueOf(targetPinnings[idx]));
}
}
return pinnings;
}
public static boolean[] convertBooleanList(
List<Boolean> targetPinningsList) {
final boolean[] targetPinnings = new boolean[targetPinningsList.size()];

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

View File

@ -137,7 +137,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
@ -2172,7 +2172,7 @@ public class DataNode extends ReconfigurableBase
// read ack
if (isClient) {
DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
PBHelper.vintPrefixed(in));
PBHelperClient.vintPrefixed(in));
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
}

View File

@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
@ -427,7 +427,7 @@ class DataXceiver extends Receiver implements Runnable {
throws IOException {
DataNodeFaultInjector.get().sendShortCircuitShmResponse();
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
setId(PBHelper.convert(shmInfo.shmId)).build().
setId(PBHelperClient.convert(shmInfo.shmId)).build().
writeDelimitedTo(socketOut);
// Send the file descriptor for the shared memory segment.
byte buf[] = new byte[] { (byte)0 };
@ -559,7 +559,7 @@ class DataXceiver extends Receiver implements Runnable {
// to respond with a Status enum.
try {
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
PBHelper.vintPrefixed(in));
PBHelperClient.vintPrefixed(in));
if (!stat.hasStatus()) {
LOG.warn("Client " + peer.getRemoteAddressString() +
" did not send a valid status code after reading. " +
@ -745,7 +745,7 @@ class DataXceiver extends Receiver implements Runnable {
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@ -962,7 +962,7 @@ class DataXceiver extends Receiver implements Runnable {
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
.setCrcType(PBHelper.convert(checksum.getChecksumType())))
.setCrcType(PBHelperClient.convert(checksum.getChecksumType())))
.build()
.writeDelimitedTo(out);
out.flush();
@ -1147,8 +1147,8 @@ class DataXceiver extends Receiver implements Runnable {
// receive the response from the proxy
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(proxyReply));
PBHelperClient.vintPrefixed(proxyReply));
String logInfo = "copy block " + block + " from "
+ proxySock.getRemoteSocketAddress();
DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -154,7 +155,7 @@ public final class FSImageFormatPBINode {
QuotaByStorageTypeFeatureProto proto) {
ImmutableList.Builder<QuotaByStorageTypeEntry> b = ImmutableList.builder();
for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) {
StorageType type = PBHelper.convertStorageType(quotaEntry.getStorageType());
StorageType type = PBHelperClient.convertStorageType(quotaEntry.getStorageType());
long quota = quotaEntry.getQuota();
b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type)
.setQuota(quota).build());
@ -459,7 +460,7 @@ public final class FSImageFormatPBINode {
if (q.getTypeSpace(t) >= 0) {
QuotaByStorageTypeEntryProto.Builder eb =
QuotaByStorageTypeEntryProto.newBuilder().
setStorageType(PBHelper.convertStorageType(t)).
setStorageType(PBHelperClient.convertStorageType(t)).
setQuota(q.getTypeSpace(t));
b.addQuotas(eb);
}

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException;
@ -201,7 +201,7 @@ public class ShortCircuitCache implements Closeable {
DataInputStream in = new DataInputStream(sock.getInputStream());
ReleaseShortCircuitAccessResponseProto resp =
ReleaseShortCircuitAccessResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
PBHelperClient.vintPrefixed(in));
if (resp.getStatus() != Status.SUCCESS) {
String error = resp.hasError() ? resp.getError() : "(unknown)";
throw new IOException(resp.getStatus().toString() + ": " + error);

View File

@ -153,7 +153,7 @@ public class TestPBHelper {
@Test
public void testConvertDatanodeID() {
DatanodeID dn = DFSTestUtil.getLocalDatanodeID();
DatanodeIDProto dnProto = PBHelper.convert(dn);
DatanodeIDProto dnProto = PBHelperClient.convert(dn);
DatanodeID dn2 = PBHelper.convert(dnProto);
compare(dn, dn2);
}
@ -332,12 +332,12 @@ public class TestPBHelper {
@Test
public void testConvertExtendedBlock() {
ExtendedBlock b = getExtendedBlock();
ExtendedBlockProto bProto = PBHelper.convert(b);
ExtendedBlockProto bProto = PBHelperClient.convert(b);
ExtendedBlock b1 = PBHelper.convert(bProto);
assertEquals(b, b1);
b.setBlockId(-1);
bProto = PBHelper.convert(b);
bProto = PBHelperClient.convert(b);
b1 = PBHelper.convert(bProto);
assertEquals(b, b1);
}
@ -398,7 +398,7 @@ public class TestPBHelper {
Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service"));
TokenProto tokenProto = PBHelper.convert(token);
TokenProto tokenProto = PBHelperClient.convert(token);
Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto);
compare(token, token2);
}
@ -592,16 +592,16 @@ public class TestPBHelper {
@Test
public void testChecksumTypeProto() {
assertEquals(DataChecksum.Type.NULL,
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
assertEquals(DataChecksum.Type.CRC32,
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
assertEquals(DataChecksum.Type.CRC32C,
PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
assertEquals(PBHelper.convert(DataChecksum.Type.NULL),
PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
assertEquals(PBHelperClient.convert(DataChecksum.Type.NULL),
HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32),
assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C),
assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32C),
HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
}