HDFS-13601. Optimize ByteString conversions in PBHelper.
(cherry picked from commit 1d2640b613
)
This commit is contained in:
parent
2bbf1b4435
commit
d47c0fc39a
|
@ -91,5 +91,10 @@
|
||||||
<Method name="getSymlinkInBytes" />
|
<Method name="getSymlinkInBytes" />
|
||||||
<Bug pattern="EI_EXPOSE_REP" />
|
<Bug pattern="EI_EXPOSE_REP" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.protocolPB.PBHelperClient" />
|
||||||
|
<Method name="getFixedByteString" />
|
||||||
|
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
|
||||||
|
</Match>
|
||||||
|
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.protocol;
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
@ -44,7 +45,9 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
"null", "null", 0, 0, 0, 0);
|
"null", "null", 0, 0, 0, 0);
|
||||||
|
|
||||||
private String ipAddr; // IP address
|
private String ipAddr; // IP address
|
||||||
|
private ByteString ipAddrBytes; // ipAddr ByteString to save on PB serde
|
||||||
private String hostName; // hostname claimed by datanode
|
private String hostName; // hostname claimed by datanode
|
||||||
|
private ByteString hostNameBytes; // hostName ByteString to save on PB serde
|
||||||
private String peerHostName; // hostname from the actual connection
|
private String peerHostName; // hostname from the actual connection
|
||||||
private int xferPort; // data streaming port
|
private int xferPort; // data streaming port
|
||||||
private int infoPort; // info server port
|
private int infoPort; // info server port
|
||||||
|
@ -58,6 +61,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
* For newly formatted Datanodes it is a UUID.
|
* For newly formatted Datanodes it is a UUID.
|
||||||
*/
|
*/
|
||||||
private final String datanodeUuid;
|
private final String datanodeUuid;
|
||||||
|
// datanodeUuid ByteString to save on PB serde
|
||||||
|
private final ByteString datanodeUuidBytes;
|
||||||
|
|
||||||
public DatanodeID(DatanodeID from) {
|
public DatanodeID(DatanodeID from) {
|
||||||
this(from.getDatanodeUuid(), from);
|
this(from.getDatanodeUuid(), from);
|
||||||
|
@ -66,8 +71,11 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public DatanodeID(String datanodeUuid, DatanodeID from) {
|
public DatanodeID(String datanodeUuid, DatanodeID from) {
|
||||||
this(from.getIpAddr(),
|
this(from.getIpAddr(),
|
||||||
|
from.getIpAddrBytes(),
|
||||||
from.getHostName(),
|
from.getHostName(),
|
||||||
|
from.getHostNameBytes(),
|
||||||
datanodeUuid,
|
datanodeUuid,
|
||||||
|
getByteString(datanodeUuid),
|
||||||
from.getXferPort(),
|
from.getXferPort(),
|
||||||
from.getInfoPort(),
|
from.getInfoPort(),
|
||||||
from.getInfoSecurePort(),
|
from.getInfoSecurePort(),
|
||||||
|
@ -89,22 +97,43 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
*/
|
*/
|
||||||
public DatanodeID(String ipAddr, String hostName, String datanodeUuid,
|
public DatanodeID(String ipAddr, String hostName, String datanodeUuid,
|
||||||
int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
|
int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
|
||||||
setIpAndXferPort(ipAddr, xferPort);
|
this(ipAddr, getByteString(ipAddr),
|
||||||
|
hostName, getByteString(hostName),
|
||||||
|
datanodeUuid, getByteString(datanodeUuid),
|
||||||
|
xferPort, infoPort, infoSecurePort, ipcPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DatanodeID(String ipAddr, ByteString ipAddrBytes,
|
||||||
|
String hostName, ByteString hostNameBytes,
|
||||||
|
String datanodeUuid, ByteString datanodeUuidBytes,
|
||||||
|
int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
|
||||||
|
setIpAndXferPort(ipAddr, ipAddrBytes, xferPort);
|
||||||
this.hostName = hostName;
|
this.hostName = hostName;
|
||||||
|
this.hostNameBytes = hostNameBytes;
|
||||||
this.datanodeUuid = checkDatanodeUuid(datanodeUuid);
|
this.datanodeUuid = checkDatanodeUuid(datanodeUuid);
|
||||||
|
this.datanodeUuidBytes = datanodeUuidBytes;
|
||||||
this.infoPort = infoPort;
|
this.infoPort = infoPort;
|
||||||
this.infoSecurePort = infoSecurePort;
|
this.infoSecurePort = infoSecurePort;
|
||||||
this.ipcPort = ipcPort;
|
this.ipcPort = ipcPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setIpAddr(String ipAddr) {
|
private static ByteString getByteString(String str) {
|
||||||
//updated during registration, preserve former xferPort
|
if (str != null) {
|
||||||
setIpAndXferPort(ipAddr, xferPort);
|
return ByteString.copyFromUtf8(str);
|
||||||
|
}
|
||||||
|
return ByteString.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setIpAndXferPort(String ipAddr, int xferPort) {
|
public void setIpAddr(String ipAddr) {
|
||||||
|
//updated during registration, preserve former xferPort
|
||||||
|
setIpAndXferPort(ipAddr, getByteString(ipAddr), xferPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setIpAndXferPort(String ipAddr, ByteString ipAddrBytes,
|
||||||
|
int xferPort) {
|
||||||
// build xferAddr string to reduce cost of frequent use
|
// build xferAddr string to reduce cost of frequent use
|
||||||
this.ipAddr = ipAddr;
|
this.ipAddr = ipAddr;
|
||||||
|
this.ipAddrBytes = ipAddrBytes;
|
||||||
this.xferPort = xferPort;
|
this.xferPort = xferPort;
|
||||||
this.xferAddr = ipAddr + ":" + xferPort;
|
this.xferAddr = ipAddr + ":" + xferPort;
|
||||||
}
|
}
|
||||||
|
@ -120,6 +149,10 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
return datanodeUuid;
|
return datanodeUuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ByteString getDatanodeUuidBytes() {
|
||||||
|
return datanodeUuidBytes;
|
||||||
|
}
|
||||||
|
|
||||||
private String checkDatanodeUuid(String uuid) {
|
private String checkDatanodeUuid(String uuid) {
|
||||||
if (uuid == null || uuid.isEmpty()) {
|
if (uuid == null || uuid.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -135,6 +168,10 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
return ipAddr;
|
return ipAddr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ByteString getIpAddrBytes() {
|
||||||
|
return ipAddrBytes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return hostname
|
* @return hostname
|
||||||
*/
|
*/
|
||||||
|
@ -142,6 +179,10 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
return hostName;
|
return hostName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ByteString getHostNameBytes() {
|
||||||
|
return hostNameBytes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return hostname from the actual connection
|
* @return hostname from the actual connection
|
||||||
*/
|
*/
|
||||||
|
@ -258,7 +299,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
* Note that this does not update storageID.
|
* Note that this does not update storageID.
|
||||||
*/
|
*/
|
||||||
public void updateRegInfo(DatanodeID nodeReg) {
|
public void updateRegInfo(DatanodeID nodeReg) {
|
||||||
setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getXferPort());
|
setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getIpAddrBytes(),
|
||||||
|
nodeReg.getXferPort());
|
||||||
hostName = nodeReg.getHostName();
|
hostName = nodeReg.getHostName();
|
||||||
peerHostName = nodeReg.getPeerHostName();
|
peerHostName = nodeReg.getPeerHostName();
|
||||||
infoPort = nodeReg.getInfoPort();
|
infoPort = nodeReg.getInfoPort();
|
||||||
|
|
|
@ -27,8 +27,12 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import com.google.common.cache.CacheLoader;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.primitives.Shorts;
|
import com.google.common.primitives.Shorts;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
@ -228,6 +232,49 @@ public class PBHelperClient {
|
||||||
private static final FsAction[] FSACTION_VALUES =
|
private static final FsAction[] FSACTION_VALUES =
|
||||||
FsAction.values();
|
FsAction.values();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map used to cache fixed strings to ByteStrings. Since there is no
|
||||||
|
* automatic expiration policy, only use this for strings from a fixed, small
|
||||||
|
* set.
|
||||||
|
* <p/>
|
||||||
|
* This map should not be accessed directly. Used the getFixedByteString
|
||||||
|
* methods instead.
|
||||||
|
*/
|
||||||
|
private static ConcurrentHashMap<Object, ByteString> fixedByteStringCache =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private static ByteString getFixedByteString(Text key) {
|
||||||
|
ByteString value = fixedByteStringCache.get(key);
|
||||||
|
if (value == null) {
|
||||||
|
value = ByteString.copyFromUtf8(key.toString());
|
||||||
|
fixedByteStringCache.put(key, value);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ByteString getFixedByteString(String key) {
|
||||||
|
ByteString value = fixedByteStringCache.get(key);
|
||||||
|
if (value == null) {
|
||||||
|
value = ByteString.copyFromUtf8(key);
|
||||||
|
fixedByteStringCache.put(key, value);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Guava cache for caching String to ByteString encoding. Use this when the
|
||||||
|
* set of Strings is large, mutable, or unknown.
|
||||||
|
*/
|
||||||
|
private static LoadingCache<String, ByteString> bytestringCache =
|
||||||
|
CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(10000)
|
||||||
|
.build(
|
||||||
|
new CacheLoader<String, ByteString>() {
|
||||||
|
public ByteString load(String key) {
|
||||||
|
return ByteString.copyFromUtf8(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
private PBHelperClient() {
|
private PBHelperClient() {
|
||||||
/** Hidden constructor */
|
/** Hidden constructor */
|
||||||
}
|
}
|
||||||
|
@ -294,7 +341,7 @@ public class PBHelperClient {
|
||||||
public static ExtendedBlockProto convert(final ExtendedBlock b) {
|
public static ExtendedBlockProto convert(final ExtendedBlock b) {
|
||||||
if (b == null) return null;
|
if (b == null) return null;
|
||||||
return ExtendedBlockProto.newBuilder().
|
return ExtendedBlockProto.newBuilder().
|
||||||
setPoolId(b.getBlockPoolId()).
|
setPoolIdBytes(getFixedByteString(b.getBlockPoolId())).
|
||||||
setBlockId(b.getBlockId()).
|
setBlockId(b.getBlockId()).
|
||||||
setNumBytes(b.getNumBytes()).
|
setNumBytes(b.getNumBytes()).
|
||||||
setGenerationStamp(b.getGenerationStamp()).
|
setGenerationStamp(b.getGenerationStamp()).
|
||||||
|
@ -305,8 +352,8 @@ public class PBHelperClient {
|
||||||
return TokenProto.newBuilder().
|
return TokenProto.newBuilder().
|
||||||
setIdentifier(getByteString(tok.getIdentifier())).
|
setIdentifier(getByteString(tok.getIdentifier())).
|
||||||
setPassword(getByteString(tok.getPassword())).
|
setPassword(getByteString(tok.getPassword())).
|
||||||
setKind(tok.getKind().toString()).
|
setKindBytes(getFixedByteString(tok.getKind())).
|
||||||
setService(tok.getService().toString()).build();
|
setServiceBytes(getFixedByteString(tok.getService())).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ShortCircuitShmIdProto convert(ShmId shmId) {
|
public static ShortCircuitShmIdProto convert(ShmId shmId) {
|
||||||
|
@ -329,11 +376,10 @@ public class PBHelperClient {
|
||||||
// which is the same as the DatanodeUuid. Since StorageID is a required
|
// which is the same as the DatanodeUuid. Since StorageID is a required
|
||||||
// field we pass the empty string if the DatanodeUuid is not yet known.
|
// field we pass the empty string if the DatanodeUuid is not yet known.
|
||||||
return DatanodeIDProto.newBuilder()
|
return DatanodeIDProto.newBuilder()
|
||||||
.setIpAddr(dn.getIpAddr())
|
.setIpAddrBytes(dn.getIpAddrBytes())
|
||||||
.setHostName(dn.getHostName())
|
.setHostNameBytes(dn.getHostNameBytes())
|
||||||
.setXferPort(dn.getXferPort())
|
.setXferPort(dn.getXferPort())
|
||||||
.setDatanodeUuid(dn.getDatanodeUuid() != null ?
|
.setDatanodeUuidBytes(dn.getDatanodeUuidBytes())
|
||||||
dn.getDatanodeUuid() : "")
|
|
||||||
.setInfoPort(dn.getInfoPort())
|
.setInfoPort(dn.getInfoPort())
|
||||||
.setInfoSecurePort(dn.getInfoSecurePort())
|
.setInfoSecurePort(dn.getInfoSecurePort())
|
||||||
.setIpcPort(dn.getIpcPort()).build();
|
.setIpcPort(dn.getIpcPort()).build();
|
||||||
|
@ -357,7 +403,8 @@ public class PBHelperClient {
|
||||||
public static DatanodeInfoProto convert(DatanodeInfo info) {
|
public static DatanodeInfoProto convert(DatanodeInfo info) {
|
||||||
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
|
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
|
||||||
if (info.getNetworkLocation() != null) {
|
if (info.getNetworkLocation() != null) {
|
||||||
builder.setLocation(info.getNetworkLocation());
|
builder.setLocationBytes(
|
||||||
|
bytestringCache.getUnchecked(info.getNetworkLocation()));
|
||||||
}
|
}
|
||||||
if (info.getUpgradeDomain() != null) {
|
if (info.getUpgradeDomain() != null) {
|
||||||
builder.setUpgradeDomain(info.getUpgradeDomain());
|
builder.setUpgradeDomain(info.getUpgradeDomain());
|
||||||
|
@ -2260,8 +2307,8 @@ public class PBHelperClient {
|
||||||
setModificationTime(fs.getModificationTime()).
|
setModificationTime(fs.getModificationTime()).
|
||||||
setAccessTime(fs.getAccessTime()).
|
setAccessTime(fs.getAccessTime()).
|
||||||
setPermission(convert(fs.getPermission())).
|
setPermission(convert(fs.getPermission())).
|
||||||
setOwner(fs.getOwner()).
|
setOwnerBytes(getFixedByteString(fs.getOwner())).
|
||||||
setGroup(fs.getGroup()).
|
setGroupBytes(getFixedByteString(fs.getGroup())).
|
||||||
setFileId(fs.getFileId()).
|
setFileId(fs.getFileId()).
|
||||||
setChildrenNum(fs.getChildrenNum()).
|
setChildrenNum(fs.getChildrenNum()).
|
||||||
setPath(getByteString(fs.getLocalNameInBytes())).
|
setPath(getByteString(fs.getLocalNameInBytes())).
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.net.*;
|
import org.apache.hadoop.hdfs.net.*;
|
||||||
|
@ -47,6 +48,7 @@ import java.io.OutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
@ -171,9 +173,17 @@ public class TestDataXceiverBackwardsCompat {
|
||||||
|
|
||||||
DatanodeInfo datanodeInfo = mock(DatanodeInfo.class);
|
DatanodeInfo datanodeInfo = mock(DatanodeInfo.class);
|
||||||
doReturn("localhost").when(datanodeInfo).getHostName();
|
doReturn("localhost").when(datanodeInfo).getHostName();
|
||||||
|
doReturn(ByteString.copyFromUtf8("localhost"))
|
||||||
|
.when(datanodeInfo).getHostNameBytes();
|
||||||
doReturn("127.0.0.1").when(datanodeInfo).getIpAddr();
|
doReturn("127.0.0.1").when(datanodeInfo).getIpAddr();
|
||||||
|
doReturn(ByteString.copyFromUtf8("127.0.0.1"))
|
||||||
|
.when(datanodeInfo).getIpAddrBytes();
|
||||||
doReturn(DatanodeInfo.AdminStates.NORMAL).when(datanodeInfo)
|
doReturn(DatanodeInfo.AdminStates.NORMAL).when(datanodeInfo)
|
||||||
.getAdminState();
|
.getAdminState();
|
||||||
|
final String uuid = UUID.randomUUID().toString();
|
||||||
|
doReturn(uuid).when(datanodeInfo).getDatanodeUuid();
|
||||||
|
doReturn(ByteString.copyFromUtf8(uuid))
|
||||||
|
.when(datanodeInfo).getDatanodeUuidBytes();
|
||||||
|
|
||||||
Exception storedException = null;
|
Exception storedException = null;
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue