From d47c0fc39a48dace6c7a2332ee041885cc4bb89c Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 22 May 2018 23:55:20 -0700 Subject: [PATCH] HDFS-13601. Optimize ByteString conversions in PBHelper. (cherry picked from commit 1d2640b6132e8308c07476badd2d1482be68a298) --- .../dev-support/findbugsExcludeFile.xml | 5 ++ .../hadoop/hdfs/protocol/DatanodeID.java | 54 +++++++++++++-- .../hdfs/protocolPB/PBHelperClient.java | 67 ++++++++++++++++--- .../TestDataXceiverBackwardsCompat.java | 10 +++ 4 files changed, 120 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index 8e2bc944e87..fa9654b16a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -91,5 +91,10 @@ + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index af720c7d80b..718661e3ae8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocol; +import com.google.protobuf.ByteString; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -44,7 +45,9 @@ public class DatanodeID implements Comparable { "null", "null", 0, 0, 0, 0); private String ipAddr; // IP address + private ByteString ipAddrBytes; // ipAddr ByteString to save on PB serde private String hostName; // hostname claimed by datanode + private ByteString hostNameBytes; // hostName ByteString to save on PB serde private String peerHostName; // hostname from the actual connection private int xferPort; // data streaming port private int infoPort; // info server port @@ -58,6 +61,8 @@ public class DatanodeID implements Comparable { * For newly formatted Datanodes it is a UUID. */ private final String datanodeUuid; + // datanodeUuid ByteString to save on PB serde + private final ByteString datanodeUuidBytes; public DatanodeID(DatanodeID from) { this(from.getDatanodeUuid(), from); @@ -66,8 +71,11 @@ public class DatanodeID implements Comparable { @VisibleForTesting public DatanodeID(String datanodeUuid, DatanodeID from) { this(from.getIpAddr(), + from.getIpAddrBytes(), from.getHostName(), + from.getHostNameBytes(), datanodeUuid, + getByteString(datanodeUuid), from.getXferPort(), from.getInfoPort(), from.getInfoSecurePort(), @@ -89,22 +97,43 @@ public class DatanodeID implements Comparable { */ public DatanodeID(String ipAddr, String hostName, String datanodeUuid, int xferPort, int infoPort, int infoSecurePort, int ipcPort) { - setIpAndXferPort(ipAddr, xferPort); + this(ipAddr, getByteString(ipAddr), + hostName, getByteString(hostName), + datanodeUuid, getByteString(datanodeUuid), + xferPort, infoPort, infoSecurePort, ipcPort); + } + + private DatanodeID(String ipAddr, ByteString ipAddrBytes, + String hostName, ByteString hostNameBytes, + String datanodeUuid, ByteString datanodeUuidBytes, + int xferPort, int infoPort, int infoSecurePort, int ipcPort) { + setIpAndXferPort(ipAddr, ipAddrBytes, xferPort); this.hostName = hostName; + this.hostNameBytes = hostNameBytes; this.datanodeUuid = checkDatanodeUuid(datanodeUuid); + this.datanodeUuidBytes = datanodeUuidBytes; this.infoPort = infoPort; this.infoSecurePort = infoSecurePort; this.ipcPort = ipcPort; } - public void setIpAddr(String ipAddr) { - //updated during registration, preserve former xferPort - setIpAndXferPort(ipAddr, xferPort); + private static ByteString getByteString(String str) { + if (str != null) { + return ByteString.copyFromUtf8(str); + } + return ByteString.EMPTY; } - private void setIpAndXferPort(String ipAddr, int xferPort) { + public void setIpAddr(String ipAddr) { + //updated during registration, preserve former xferPort + setIpAndXferPort(ipAddr, getByteString(ipAddr), xferPort); + } + + private void setIpAndXferPort(String ipAddr, ByteString ipAddrBytes, + int xferPort) { // build xferAddr string to reduce cost of frequent use this.ipAddr = ipAddr; + this.ipAddrBytes = ipAddrBytes; this.xferPort = xferPort; this.xferAddr = ipAddr + ":" + xferPort; } @@ -120,6 +149,10 @@ public class DatanodeID implements Comparable { return datanodeUuid; } + public ByteString getDatanodeUuidBytes() { + return datanodeUuidBytes; + } + private String checkDatanodeUuid(String uuid) { if (uuid == null || uuid.isEmpty()) { return null; @@ -135,6 +168,10 @@ public class DatanodeID implements Comparable { return ipAddr; } + public ByteString getIpAddrBytes() { + return ipAddrBytes; + } + /** * @return hostname */ @@ -142,6 +179,10 @@ public class DatanodeID implements Comparable { return hostName; } + public ByteString getHostNameBytes() { + return hostNameBytes; + } + /** * @return hostname from the actual connection */ @@ -258,7 +299,8 @@ public class DatanodeID implements Comparable { * Note that this does not update storageID. */ public void updateRegInfo(DatanodeID nodeReg) { - setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getXferPort()); + setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getIpAddrBytes(), + nodeReg.getXferPort()); hostName = nodeReg.getHostName(); peerHostName = nodeReg.getPeerHostName(); infoPort = nodeReg.getInfoPort(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index ff9733c66a9..579ac436c31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -27,8 +27,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.primitives.Shorts; import com.google.protobuf.ByteString; @@ -228,6 +232,49 @@ public class PBHelperClient { private static final FsAction[] FSACTION_VALUES = FsAction.values(); + /** + * Map used to cache fixed strings to ByteStrings. Since there is no + * automatic expiration policy, only use this for strings from a fixed, small + * set. + *

+ * This map should not be accessed directly. Used the getFixedByteString + * methods instead. + */ + private static ConcurrentHashMap fixedByteStringCache = + new ConcurrentHashMap<>(); + + private static ByteString getFixedByteString(Text key) { + ByteString value = fixedByteStringCache.get(key); + if (value == null) { + value = ByteString.copyFromUtf8(key.toString()); + fixedByteStringCache.put(key, value); + } + return value; + } + + private static ByteString getFixedByteString(String key) { + ByteString value = fixedByteStringCache.get(key); + if (value == null) { + value = ByteString.copyFromUtf8(key); + fixedByteStringCache.put(key, value); + } + return value; + } + + /** + * Guava cache for caching String to ByteString encoding. Use this when the + * set of Strings is large, mutable, or unknown. + */ + private static LoadingCache bytestringCache = + CacheBuilder.newBuilder() + .maximumSize(10000) + .build( + new CacheLoader() { + public ByteString load(String key) { + return ByteString.copyFromUtf8(key); + } + }); + private PBHelperClient() { /** Hidden constructor */ } @@ -294,7 +341,7 @@ public class PBHelperClient { public static ExtendedBlockProto convert(final ExtendedBlock b) { if (b == null) return null; return ExtendedBlockProto.newBuilder(). - setPoolId(b.getBlockPoolId()). + setPoolIdBytes(getFixedByteString(b.getBlockPoolId())). setBlockId(b.getBlockId()). setNumBytes(b.getNumBytes()). setGenerationStamp(b.getGenerationStamp()). @@ -305,8 +352,8 @@ public class PBHelperClient { return TokenProto.newBuilder(). setIdentifier(getByteString(tok.getIdentifier())). setPassword(getByteString(tok.getPassword())). - setKind(tok.getKind().toString()). - setService(tok.getService().toString()).build(); + setKindBytes(getFixedByteString(tok.getKind())). + setServiceBytes(getFixedByteString(tok.getService())).build(); } public static ShortCircuitShmIdProto convert(ShmId shmId) { @@ -329,11 +376,10 @@ public class PBHelperClient { // which is the same as the DatanodeUuid. Since StorageID is a required // field we pass the empty string if the DatanodeUuid is not yet known. return DatanodeIDProto.newBuilder() - .setIpAddr(dn.getIpAddr()) - .setHostName(dn.getHostName()) + .setIpAddrBytes(dn.getIpAddrBytes()) + .setHostNameBytes(dn.getHostNameBytes()) .setXferPort(dn.getXferPort()) - .setDatanodeUuid(dn.getDatanodeUuid() != null ? - dn.getDatanodeUuid() : "") + .setDatanodeUuidBytes(dn.getDatanodeUuidBytes()) .setInfoPort(dn.getInfoPort()) .setInfoSecurePort(dn.getInfoSecurePort()) .setIpcPort(dn.getIpcPort()).build(); @@ -357,7 +403,8 @@ public class PBHelperClient { public static DatanodeInfoProto convert(DatanodeInfo info) { DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); if (info.getNetworkLocation() != null) { - builder.setLocation(info.getNetworkLocation()); + builder.setLocationBytes( + bytestringCache.getUnchecked(info.getNetworkLocation())); } if (info.getUpgradeDomain() != null) { builder.setUpgradeDomain(info.getUpgradeDomain()); @@ -2260,8 +2307,8 @@ public class PBHelperClient { setModificationTime(fs.getModificationTime()). setAccessTime(fs.getAccessTime()). setPermission(convert(fs.getPermission())). - setOwner(fs.getOwner()). - setGroup(fs.getGroup()). + setOwnerBytes(getFixedByteString(fs.getOwner())). + setGroupBytes(getFixedByteString(fs.getGroup())). setFileId(fs.getFileId()). setChildrenNum(fs.getChildrenNum()). setPath(getByteString(fs.getLocalNameInBytes())). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java index bdcbe7fffc7..0f65269462f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; +import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.net.*; @@ -47,6 +48,7 @@ import java.io.OutputStream; import java.io.PrintStream; import java.net.ServerSocket; import java.net.Socket; +import java.util.UUID; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; @@ -171,9 +173,17 @@ public class TestDataXceiverBackwardsCompat { DatanodeInfo datanodeInfo = mock(DatanodeInfo.class); doReturn("localhost").when(datanodeInfo).getHostName(); + doReturn(ByteString.copyFromUtf8("localhost")) + .when(datanodeInfo).getHostNameBytes(); doReturn("127.0.0.1").when(datanodeInfo).getIpAddr(); + doReturn(ByteString.copyFromUtf8("127.0.0.1")) + .when(datanodeInfo).getIpAddrBytes(); doReturn(DatanodeInfo.AdminStates.NORMAL).when(datanodeInfo) .getAdminState(); + final String uuid = UUID.randomUUID().toString(); + doReturn(uuid).when(datanodeInfo).getDatanodeUuid(); + doReturn(ByteString.copyFromUtf8(uuid)) + .when(datanodeInfo).getDatanodeUuidBytes(); Exception storedException = null; try {