HDFS-13601. Optimize ByteString conversions in PBHelper.
This commit is contained in:
parent
5dcd57cbea
commit
0f0d29a8d1
@ -59,4 +59,11 @@
|
||||
<Field name="cachingStrategy" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.protocolPB.PBHelperClient" />
|
||||
<Method name="getFixedByteString" />
|
||||
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
|
||||
</Match>
|
||||
|
||||
</FindBugsFilter>
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
@ -42,7 +43,9 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||
public static final DatanodeID[] EMPTY_ARRAY = {};
|
||||
|
||||
private String ipAddr; // IP address
|
||||
private ByteString ipAddrBytes; // ipAddr ByteString to save on PB serde
|
||||
private String hostName; // hostname claimed by datanode
|
||||
private ByteString hostNameBytes; // hostName ByteString to save on PB serde
|
||||
private String peerHostName; // hostname from the actual connection
|
||||
private int xferPort; // data streaming port
|
||||
private int infoPort; // info server port
|
||||
@ -56,6 +59,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||
* For newly formatted Datanodes it is a UUID.
|
||||
*/
|
||||
private final String datanodeUuid;
|
||||
// datanodeUuid ByteString to save on PB serde
|
||||
private final ByteString datanodeUuidBytes;
|
||||
|
||||
public DatanodeID(DatanodeID from) {
|
||||
this(from.getDatanodeUuid(), from);
|
||||
@ -64,8 +69,11 @@ public DatanodeID(DatanodeID from) {
|
||||
@VisibleForTesting
|
||||
public DatanodeID(String datanodeUuid, DatanodeID from) {
|
||||
this(from.getIpAddr(),
|
||||
from.getIpAddrBytes(),
|
||||
from.getHostName(),
|
||||
from.getHostNameBytes(),
|
||||
datanodeUuid,
|
||||
getByteString(datanodeUuid),
|
||||
from.getXferPort(),
|
||||
from.getInfoPort(),
|
||||
from.getInfoSecurePort(),
|
||||
@ -87,22 +95,43 @@ public DatanodeID(String datanodeUuid, DatanodeID from) {
|
||||
*/
|
||||
public DatanodeID(String ipAddr, String hostName, String datanodeUuid,
|
||||
int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
|
||||
setIpAndXferPort(ipAddr, xferPort);
|
||||
this(ipAddr, getByteString(ipAddr),
|
||||
hostName, getByteString(hostName),
|
||||
datanodeUuid, getByteString(datanodeUuid),
|
||||
xferPort, infoPort, infoSecurePort, ipcPort);
|
||||
}
|
||||
|
||||
private DatanodeID(String ipAddr, ByteString ipAddrBytes,
|
||||
String hostName, ByteString hostNameBytes,
|
||||
String datanodeUuid, ByteString datanodeUuidBytes,
|
||||
int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
|
||||
setIpAndXferPort(ipAddr, ipAddrBytes, xferPort);
|
||||
this.hostName = hostName;
|
||||
this.hostNameBytes = hostNameBytes;
|
||||
this.datanodeUuid = checkDatanodeUuid(datanodeUuid);
|
||||
this.datanodeUuidBytes = datanodeUuidBytes;
|
||||
this.infoPort = infoPort;
|
||||
this.infoSecurePort = infoSecurePort;
|
||||
this.ipcPort = ipcPort;
|
||||
}
|
||||
|
||||
public void setIpAddr(String ipAddr) {
|
||||
//updated during registration, preserve former xferPort
|
||||
setIpAndXferPort(ipAddr, xferPort);
|
||||
private static ByteString getByteString(String str) {
|
||||
if (str != null) {
|
||||
return ByteString.copyFromUtf8(str);
|
||||
}
|
||||
return ByteString.EMPTY;
|
||||
}
|
||||
|
||||
private void setIpAndXferPort(String ipAddr, int xferPort) {
|
||||
public void setIpAddr(String ipAddr) {
|
||||
//updated during registration, preserve former xferPort
|
||||
setIpAndXferPort(ipAddr, getByteString(ipAddr), xferPort);
|
||||
}
|
||||
|
||||
private void setIpAndXferPort(String ipAddr, ByteString ipAddrBytes,
|
||||
int xferPort) {
|
||||
// build xferAddr string to reduce cost of frequent use
|
||||
this.ipAddr = ipAddr;
|
||||
this.ipAddrBytes = ipAddrBytes;
|
||||
this.xferPort = xferPort;
|
||||
this.xferAddr = ipAddr + ":" + xferPort;
|
||||
}
|
||||
@ -118,6 +147,10 @@ public String getDatanodeUuid() {
|
||||
return datanodeUuid;
|
||||
}
|
||||
|
||||
public ByteString getDatanodeUuidBytes() {
|
||||
return datanodeUuidBytes;
|
||||
}
|
||||
|
||||
private String checkDatanodeUuid(String uuid) {
|
||||
if (uuid == null || uuid.isEmpty()) {
|
||||
return null;
|
||||
@ -133,6 +166,10 @@ public String getIpAddr() {
|
||||
return ipAddr;
|
||||
}
|
||||
|
||||
public ByteString getIpAddrBytes() {
|
||||
return ipAddrBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return hostname
|
||||
*/
|
||||
@ -140,6 +177,10 @@ public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
public ByteString getHostNameBytes() {
|
||||
return hostNameBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return hostname from the actual connection
|
||||
*/
|
||||
@ -256,7 +297,8 @@ public String toString() {
|
||||
* Note that this does not update storageID.
|
||||
*/
|
||||
public void updateRegInfo(DatanodeID nodeReg) {
|
||||
setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getXferPort());
|
||||
setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getIpAddrBytes(),
|
||||
nodeReg.getXferPort());
|
||||
hostName = nodeReg.getHostName();
|
||||
peerHostName = nodeReg.getPeerHostName();
|
||||
infoPort = nodeReg.getInfoPort();
|
||||
|
@ -24,8 +24,12 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Shorts;
|
||||
import com.google.protobuf.ByteString;
|
||||
@ -192,6 +196,49 @@ public class PBHelperClient {
|
||||
private static final FsAction[] FSACTION_VALUES =
|
||||
FsAction.values();
|
||||
|
||||
/**
|
||||
* Map used to cache fixed strings to ByteStrings. Since there is no
|
||||
* automatic expiration policy, only use this for strings from a fixed, small
|
||||
* set.
|
||||
* <p/>
|
||||
* This map should not be accessed directly. Used the getFixedByteString
|
||||
* methods instead.
|
||||
*/
|
||||
private static ConcurrentHashMap<Object, ByteString> fixedByteStringCache =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private static ByteString getFixedByteString(Text key) {
|
||||
ByteString value = fixedByteStringCache.get(key);
|
||||
if (value == null) {
|
||||
value = ByteString.copyFromUtf8(key.toString());
|
||||
fixedByteStringCache.put(key, value);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
private static ByteString getFixedByteString(String key) {
|
||||
ByteString value = fixedByteStringCache.get(key);
|
||||
if (value == null) {
|
||||
value = ByteString.copyFromUtf8(key);
|
||||
fixedByteStringCache.put(key, value);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava cache for caching String to ByteString encoding. Use this when the
|
||||
* set of Strings is large, mutable, or unknown.
|
||||
*/
|
||||
private static LoadingCache<String, ByteString> bytestringCache =
|
||||
CacheBuilder.newBuilder()
|
||||
.maximumSize(10000)
|
||||
.build(
|
||||
new CacheLoader<String, ByteString>() {
|
||||
public ByteString load(String key) {
|
||||
return ByteString.copyFromUtf8(key);
|
||||
}
|
||||
});
|
||||
|
||||
private PBHelperClient() {
|
||||
/** Hidden constructor */
|
||||
}
|
||||
@ -216,7 +263,7 @@ public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
|
||||
public static ExtendedBlockProto convert(final ExtendedBlock b) {
|
||||
if (b == null) return null;
|
||||
return ExtendedBlockProto.newBuilder().
|
||||
setPoolId(b.getBlockPoolId()).
|
||||
setPoolIdBytes(getFixedByteString(b.getBlockPoolId())).
|
||||
setBlockId(b.getBlockId()).
|
||||
setNumBytes(b.getNumBytes()).
|
||||
setGenerationStamp(b.getGenerationStamp()).
|
||||
@ -227,8 +274,8 @@ public static TokenProto convert(Token<?> tok) {
|
||||
return TokenProto.newBuilder().
|
||||
setIdentifier(getByteString(tok.getIdentifier())).
|
||||
setPassword(getByteString(tok.getPassword())).
|
||||
setKind(tok.getKind().toString()).
|
||||
setService(tok.getService().toString()).build();
|
||||
setKindBytes(getFixedByteString(tok.getKind())).
|
||||
setServiceBytes(getFixedByteString(tok.getService())).build();
|
||||
}
|
||||
|
||||
public static ShortCircuitShmIdProto convert(ShmId shmId) {
|
||||
@ -251,11 +298,10 @@ public static DatanodeIDProto convert(DatanodeID dn) {
|
||||
// which is the same as the DatanodeUuid. Since StorageID is a required
|
||||
// field we pass the empty string if the DatanodeUuid is not yet known.
|
||||
return DatanodeIDProto.newBuilder()
|
||||
.setIpAddr(dn.getIpAddr())
|
||||
.setHostName(dn.getHostName())
|
||||
.setIpAddrBytes(dn.getIpAddrBytes())
|
||||
.setHostNameBytes(dn.getHostNameBytes())
|
||||
.setXferPort(dn.getXferPort())
|
||||
.setDatanodeUuid(dn.getDatanodeUuid() != null ?
|
||||
dn.getDatanodeUuid() : "")
|
||||
.setDatanodeUuidBytes(dn.getDatanodeUuidBytes())
|
||||
.setInfoPort(dn.getInfoPort())
|
||||
.setInfoSecurePort(dn.getInfoSecurePort())
|
||||
.setIpcPort(dn.getIpcPort()).build();
|
||||
@ -279,7 +325,8 @@ public static DatanodeInfoProto.AdminState convert(
|
||||
public static DatanodeInfoProto convert(DatanodeInfo info) {
|
||||
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
|
||||
if (info.getNetworkLocation() != null) {
|
||||
builder.setLocation(info.getNetworkLocation());
|
||||
builder.setLocationBytes(
|
||||
bytestringCache.getUnchecked(info.getNetworkLocation()));
|
||||
}
|
||||
if (info.getUpgradeDomain() != null) {
|
||||
builder.setUpgradeDomain(info.getUpgradeDomain());
|
||||
@ -1849,8 +1896,8 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
|
||||
setModificationTime(fs.getModificationTime()).
|
||||
setAccessTime(fs.getAccessTime()).
|
||||
setPermission(convert(fs.getPermission())).
|
||||
setOwner(fs.getOwner()).
|
||||
setGroup(fs.getGroup()).
|
||||
setOwnerBytes(getFixedByteString(fs.getOwner())).
|
||||
setGroupBytes(getFixedByteString(fs.getGroup())).
|
||||
setFileId(fs.getFileId()).
|
||||
setChildrenNum(fs.getChildrenNum()).
|
||||
setPath(getByteString(fs.getLocalNameInBytes())).
|
||||
|
Loading…
x
Reference in New Issue
Block a user