HDFS-5141. Add cache status information to datanode heartbeat. (Contributed by Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1519101 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-08-30 22:15:51 +00:00
parent b992219fa1
commit fc14a92c6b
27 changed files with 209 additions and 57 deletions

View File

@ -18,6 +18,9 @@ HDFS-4949 (Unreleased)
HDFS-5050. Add DataNode support for mlock and munlock
(Andrew Wang via Colin Patrick McCabe)
HDFS-5141. Add cache status information to datanode heartbeat.
(Contributed by Andrew Wang)
OPTIMIZATIONS
BUG FIXES

View File

@ -44,6 +44,8 @@ public class DatanodeInfo extends DatanodeID implements Node {
private long dfsUsed;
private long remaining;
private long blockPoolUsed;
private long cacheCapacity;
private long cacheUsed;
private long lastUpdate;
private int xceiverCount;
private String location = NetworkTopology.DEFAULT_RACK;
@ -81,6 +83,8 @@ public DatanodeInfo(DatanodeInfo from) {
this.dfsUsed = from.getDfsUsed();
this.remaining = from.getRemaining();
this.blockPoolUsed = from.getBlockPoolUsed();
this.cacheCapacity = from.getCacheCapacity();
this.cacheUsed = from.getCacheUsed();
this.lastUpdate = from.getLastUpdate();
this.xceiverCount = from.getXceiverCount();
this.location = from.getNetworkLocation();
@ -93,6 +97,8 @@ public DatanodeInfo(DatanodeID nodeID) {
this.dfsUsed = 0L;
this.remaining = 0L;
this.blockPoolUsed = 0L;
this.cacheCapacity = 0L;
this.cacheUsed = 0L;
this.lastUpdate = 0L;
this.xceiverCount = 0;
this.adminState = null;
@ -105,24 +111,29 @@ public DatanodeInfo(DatanodeID nodeID, String location) {
public DatanodeInfo(DatanodeID nodeID, String location,
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
final long lastUpdate, final int xceiverCount,
final AdminStates adminState) {
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(),
nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining,
blockPoolUsed, lastUpdate, xceiverCount, location, adminState);
blockPoolUsed, cacheCapacity, cacheUsed, lastUpdate, xceiverCount,
location, adminState);
}
/** Constructor */
public DatanodeInfo(final String ipAddr, final String hostName,
final String storageID, final int xferPort, final int infoPort, final int ipcPort,
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
final long lastUpdate, final int xceiverCount,
final String networkLocation, final AdminStates adminState) {
super(ipAddr, hostName, storageID, xferPort, infoPort, ipcPort);
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.remaining = remaining;
this.blockPoolUsed = blockPoolUsed;
this.cacheCapacity = cacheCapacity;
this.cacheUsed = cacheUsed;
this.lastUpdate = lastUpdate;
this.xceiverCount = xceiverCount;
this.location = networkLocation;
@ -168,6 +179,42 @@ public float getRemainingPercent() {
return DFSUtil.getPercentRemaining(remaining, capacity);
}
/**
* @return Amount of cache capacity in bytes
*/
public long getCacheCapacity() {
return cacheCapacity;
}
/**
* @return Amount of cache used in bytes
*/
public long getCacheUsed() {
return cacheUsed;
}
/**
* @return Cache used as a percentage of the datanode's total cache capacity
*/
public float getCacheUsedPercent() {
return DFSUtil.getPercentUsed(cacheUsed, cacheCapacity);
}
/**
* @return Amount of cache remaining in bytes
*/
public long getCacheRemaining() {
return cacheCapacity - cacheUsed;
}
/**
* @return Cache remaining as a percentage of the datanode's total cache
* capacity
*/
public float getCacheRemainingPercent() {
return DFSUtil.getPercentRemaining(getCacheRemaining(), cacheCapacity);
}
/** The time when this information was accurate. */
public long getLastUpdate() { return lastUpdate; }
@ -194,6 +241,16 @@ public void setBlockPoolUsed(long bpUsed) {
this.blockPoolUsed = bpUsed;
}
/** Sets cache capacity. */
public void setCacheCapacity(long cacheCapacity) {
this.cacheCapacity = cacheCapacity;
}
/** Sets cache used. */
public void setCacheUsed(long cacheUsed) {
this.cacheUsed = cacheUsed;
}
/** Sets time when this information was accurate. */
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
@ -223,6 +280,11 @@ public String getDatanodeReport() {
long nonDFSUsed = getNonDfsUsed();
float usedPercent = getDfsUsedPercent();
float remainingPercent = getRemainingPercent();
long cc = getCacheCapacity();
long cr = getCacheRemaining();
long cu = getCacheUsed();
float cacheUsedPercent = getCacheUsedPercent();
float cacheRemainingPercent = getCacheRemainingPercent();
String lookupName = NetUtils.getHostNameOfIP(getName());
buffer.append("Name: "+ getName());
@ -249,6 +311,12 @@ public String getDatanodeReport() {
buffer.append("DFS Remaining: " +r+ " ("+StringUtils.byteDesc(r)+")"+"\n");
buffer.append("DFS Used%: "+percent2String(usedPercent) + "\n");
buffer.append("DFS Remaining%: "+percent2String(remainingPercent) + "\n");
buffer.append("Configured Cache Capacity: "+c+" ("+StringUtils.byteDesc(cc)+")"+"\n");
buffer.append("Cache Used: "+cu+" ("+StringUtils.byteDesc(u)+")"+"\n");
buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(r)+")"+"\n");
buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
return buffer.toString();
}
@ -259,6 +327,9 @@ public String dumpDatanode() {
long c = getCapacity();
long r = getRemaining();
long u = getDfsUsed();
long cc = getCacheCapacity();
long cr = getCacheRemaining();
long cu = getCacheUsed();
buffer.append(getName());
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
buffer.append(" "+location);
@ -274,6 +345,10 @@ public String dumpDatanode() {
buffer.append(" " + u + "(" + StringUtils.byteDesc(u)+")");
buffer.append(" " + percent2String(u/(double)c));
buffer.append(" " + r + "(" + StringUtils.byteDesc(r)+")");
buffer.append(" " + cc + "(" + StringUtils.byteDesc(cc)+")");
buffer.append(" " + cu + "(" + StringUtils.byteDesc(cu)+")");
buffer.append(" " + percent2String(cu/(double)cc));
buffer.append(" " + cr + "(" + StringUtils.byteDesc(cr)+")");
buffer.append(" " + new Date(lastUpdate));
return buffer.toString();
}

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -155,8 +156,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, int xmitsInProgress, int xceiverCount,
int failedVolumes) throws IOException {
StorageReport[] reports, CacheReport[] cacheReports, int xmitsInProgress,
int xceiverCount, int failedVolumes) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -164,6 +165,9 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
for (StorageReport r : reports) {
builder.addReports(PBHelper.convert(r));
}
for (CacheReport r : cacheReports) {
builder.addCacheReports(PBHelper.convert(r));
}
HeartbeatResponseProto resp;
try {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
@ -47,6 +48,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -111,9 +113,16 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed());
}
List<CacheReportProto> cacheList = request.getCacheReportsList();
CacheReport[] cacheReport = new CacheReport[list.size()];
i = 0;
for (CacheReportProto p : cacheList) {
cacheReport[i++] = new CacheReport(p.getCacheCapacity(),
p.getCacheUsed());
}
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getXmitsInProgress(), request.getXceiverCount(),
request.getFailedVolumes());
report, cacheReport, request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
@ -121,6 +122,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -469,7 +471,8 @@ static public DatanodeInfo convert(DatanodeInfoProto di) {
PBHelper.convert(di.getId()),
di.hasLocation() ? di.getLocation() : null ,
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
di.getBlockPoolUsed() , di.getLastUpdate() , di.getXceiverCount() ,
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
di.getLastUpdate(), di.getXceiverCount(),
PBHelper.convert(di.getAdminState()));
}
@ -1361,6 +1364,13 @@ public static StorageReportProto convert(StorageReport r) {
.setStorageID(r.getStorageID()).build();
}
public static CacheReportProto convert(CacheReport r) {
return CacheReportProto.newBuilder()
.setCacheCapacity(r.getCapacity())
.setCacheUsed(r.getUsed())
.build();
}
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

View File

@ -159,7 +159,7 @@ synchronized void clear() {
* @param nodeID id of the data node
*/
public DatanodeDescriptor(DatanodeID nodeID) {
this(nodeID, 0L, 0L, 0L, 0L, 0, 0);
this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
}
/**
@ -169,7 +169,7 @@ public DatanodeDescriptor(DatanodeID nodeID) {
*/
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0, 0);
this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
}
/**
@ -179,6 +179,8 @@ public DatanodeDescriptor(DatanodeID nodeID,
* @param dfsUsed space used by the data node
* @param remaining remaining capacity of the data node
* @param bpused space used by the block pool corresponding to this namenode
* @param cacheCapacity cache capacity of the data node
* @param cacheUsed cache used on the data node
* @param xceiverCount # of data transfers at the data node
*/
public DatanodeDescriptor(DatanodeID nodeID,
@ -186,11 +188,13 @@ public DatanodeDescriptor(DatanodeID nodeID,
long dfsUsed,
long remaining,
long bpused,
long cacheCapacity,
long cacheUsed,
int xceiverCount,
int failedVolumes) {
super(nodeID);
updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount,
failedVolumes);
updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes);
}
/**
@ -201,6 +205,8 @@ public DatanodeDescriptor(DatanodeID nodeID,
* @param dfsUsed the used space by dfs datanode
* @param remaining remaining capacity of the data node
* @param bpused space used by the block pool corresponding to this namenode
* @param cacheCapacity cache capacity of the data node
* @param cacheUsed cache used on the data node
* @param xceiverCount # of data transfers at the data node
*/
public DatanodeDescriptor(DatanodeID nodeID,
@ -209,11 +215,13 @@ public DatanodeDescriptor(DatanodeID nodeID,
long dfsUsed,
long remaining,
long bpused,
long cacheCapacity,
long cacheUsed,
int xceiverCount,
int failedVolumes) {
super(nodeID, networkLocation);
updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount,
failedVolumes);
updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes);
}
/**
@ -302,11 +310,14 @@ public int numBlocks() {
* Updates stats from datanode heartbeat.
*/
public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
long blockPoolUsed, int xceiverCount, int volFailures) {
long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount,
int volFailures) {
setCapacity(capacity);
setRemaining(remaining);
setBlockPoolUsed(blockPoolUsed);
setDfsUsed(dfsUsed);
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
setLastUpdate(Time.now());
this.volumeFailures = volFailures;

View File

@ -1145,8 +1145,8 @@ private void setDatanodeDead(DatanodeDescriptor node) {
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final String blockPoolId,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException {
long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers,
int failedVolumes) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
@ -1167,7 +1167,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
}
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
remaining, blockPoolUsed, xceiverCount, failedVolumes);
remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount,
failedVolumes);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.

View File

@ -170,7 +170,7 @@ synchronized void register(final DatanodeDescriptor d) {
addDatanode(d);
//update its timestamp
d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
}
}
@ -193,10 +193,10 @@ synchronized void removeDatanode(DatanodeDescriptor node) {
synchronized void updateHeartbeat(final DatanodeDescriptor node,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int failedVolumes) {
long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) {
stats.subtract(node);
node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, failedVolumes);
cacheCapacity, cacheUsed, xceiverCount, failedVolumes);
stats.add(node);
}

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -468,7 +469,10 @@ HeartbeatResponse sendHeartBeat() throws IOException {
dn.getFSDataset().getDfsUsed(),
dn.getFSDataset().getRemaining(),
dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
return bpNamenode.sendHeartbeat(bpRegistration, report,
CacheReport[] cacheReport = { new CacheReport(
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed()) };
return bpNamenode.sendHeartbeat(bpRegistration, report, cacheReport,
dn.getXmitsInProgress(),
dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes());

View File

@ -3962,15 +3962,15 @@ String getRegistrationID() {
*/
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress,
int failedVolumes) throws IOException {
readLock();
try {
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes);
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();

View File

@ -100,6 +100,7 @@
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -935,13 +936,13 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, int xmitsInProgress, int xceiverCount,
int failedVolumes) throws IOException {
StorageReport[] report, CacheReport[] cacheReport, int xmitsInProgress,
int xceiverCount, int failedVolumes) throws IOException {
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(),
report[0].getDfsUsed(), report[0].getRemaining(),
report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress,
failedVolumes);
report[0].getBlockPoolUsed(), cacheReport[0].getCapacity(),
cacheReport[0].getUsed(), xceiverCount, xmitsInProgress, failedVolumes);
}
@Override // DatanodeProtocol

View File

@ -106,6 +106,7 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
@Idempotent
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports,
CacheReport[] cacheReports,
int xmitsInProgress,
int xceiverCount,
int failedVolumes) throws IOException;

View File

@ -301,6 +301,8 @@ private static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
m.put("dfsUsed", datanodeinfo.getDfsUsed());
m.put("remaining", datanodeinfo.getRemaining());
m.put("blockPoolUsed", datanodeinfo.getBlockPoolUsed());
m.put("cacheCapacity", datanodeinfo.getCacheCapacity());
m.put("cacheUsed", datanodeinfo.getCacheUsed());
m.put("lastUpdate", datanodeinfo.getLastUpdate());
m.put("xceiverCount", datanodeinfo.getXceiverCount());
m.put("networkLocation", datanodeinfo.getNetworkLocation());
@ -326,6 +328,8 @@ private static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
(Long)m.get("dfsUsed"),
(Long)m.get("remaining"),
(Long)m.get("blockPoolUsed"),
(Long)m.get("cacheCapacity"),
(Long)m.get("cacheUsed"),
(Long)m.get("lastUpdate"),
(int)(long)(Long)m.get("xceiverCount"),
(String)m.get("networkLocation"),

View File

@ -164,6 +164,8 @@ message RegisterDatanodeResponseProto {
* xmitsInProgress - number of transfers from this datanode to others
* xceiverCount - number of active transceiver threads
* failedVolumes - number of failed volumes
* cacheCapacity - total cache capacity available at the datanode
* cacheUsed - amount of cache used
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@ -171,6 +173,7 @@ message HeartbeatRequestProto {
optional uint32 xmitsInProgress = 3 [ default = 0 ];
optional uint32 xceiverCount = 4 [ default = 0 ];
optional uint32 failedVolumes = 5 [ default = 0 ];
repeated CacheReportProto cacheReports = 6;
}
message StorageReportProto {
@ -182,6 +185,11 @@ message StorageReportProto {
optional uint64 blockPoolUsed = 6 [ default = 0 ];
}
message CacheReportProto {
optional uint64 cacheCapacity = 1 [default = 0 ];
optional uint64 cacheUsed = 2 [default = 0 ];
}
/**
* state - State the NN is in when returning response to the DN
* txid - Highest transaction ID this NN has seen

View File

@ -82,6 +82,8 @@ message DatanodeInfoProto {
}
optional AdminState adminState = 10 [default = NORMAL];
optional uint64 cacheCapacity = 11 [default = 0];
optional uint64 cacheUsed = 12 [default = 0];
}
/**

View File

@ -847,7 +847,7 @@ public static DatanodeInfo getLocalDatanodeInfo(String ipAddr,
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
1, 2, 3, 4, 5, 6, "local", adminState);
1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState);
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,

View File

@ -98,7 +98,9 @@ private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
cluster.add(dn);
dn.updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L,
0, 0);
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
}
}

View File

@ -104,7 +104,7 @@ public void testProcesOverReplicateBlock() throws Exception {
String corruptMachineName = corruptDataNode.getXferAddr();
for (DatanodeDescriptor datanode : hm.getDatanodes()) {
if (!corruptMachineName.equals(datanode.getXferAddr())) {
datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0);
datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0L, 0L, 0, 0);
}
}

View File

@ -116,7 +116,7 @@ public static void setupCluster() throws Exception {
for (int i=0; i < NUM_OF_DATANODES; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
@ -133,7 +133,8 @@ public static void setupCluster() throws Exception {
public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
@ -168,7 +169,7 @@ public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
private static DatanodeDescriptor[] chooseTarget(
@ -271,7 +272,8 @@ public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
0L, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
@ -309,7 +311,7 @@ public void testChooseTarget3() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
/**
@ -326,7 +328,7 @@ public void testChoooseTarget4() throws Exception {
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
@ -358,7 +360,7 @@ public void testChoooseTarget4() throws Exception {
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
@ -424,7 +426,7 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
final LogVerificationAppender appender = new LogVerificationAppender();
@ -451,7 +453,7 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}

View File

@ -129,7 +129,7 @@ private static void setupDataNodeCapacity() {
for(int i=0; i<NUM_OF_DATANODES; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
@ -166,7 +166,8 @@ private static boolean checkTargetsOnDifferentNodeGroup(
public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
@ -204,7 +205,7 @@ public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
@ -272,7 +273,8 @@ public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
0L, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
@ -308,7 +310,7 @@ public void testChooseTarget3() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
/**
@ -326,7 +328,7 @@ public void testChooseTarget4() throws Exception {
for(int i=0; i<3; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
@ -573,11 +575,11 @@ public void testChooseTargetsOnBoundaryTopology() throws Exception {
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
dataNodesInBoundaryCase[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
@ -612,7 +614,7 @@ public void testRereplicateOnBoundaryTopology() throws Exception {
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
dataNodesInBoundaryCase[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodesInBoundaryCase[0]);
@ -651,7 +653,7 @@ public void testChooseMoreTargetsThanNodeGroups() throws Exception {
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
dataNodesInMoreTargetsCase[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
DatanodeDescriptor[] targets;

View File

@ -450,9 +450,9 @@ public void testSortNodeByFields() throws Exception {
DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "storage2",
1235, 2346, 3457);
DatanodeDescriptor dnDesc1 = new DatanodeDescriptor(dnId1, "rack1", 1024,
100, 924, 100, 10, 2);
100, 924, 100, 5l, 3l, 10, 2);
DatanodeDescriptor dnDesc2 = new DatanodeDescriptor(dnId2, "rack2", 2500,
200, 1848, 200, 20, 1);
200, 1848, 200, 10l, 2l, 20, 1);
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
live.add(dnDesc1);
live.add(dnDesc2);

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -126,6 +127,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
.when(mock).sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.any(CacheReport[].class),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt());

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -154,6 +155,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
when(namenode.sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.any(CacheReport[].class),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt()))

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -105,6 +106,7 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds)
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(),
(CacheReport[]) any(),
anyInt(), anyInt(), anyInt());
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -863,8 +864,9 @@ void sendHeartbeat() throws IOException {
// TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
CacheReport[] cacheRep = { new CacheReport(0l, 0l) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
rep, 0, 0, 0).getCommands();
rep, cacheRep, 0, 0, 0).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -910,8 +912,9 @@ int replicateBlocks() throws IOException {
// register datanode
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
CacheReport[] cacheRep = { new CacheReport(0l, 0l) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
rep, 0, 0, 0).getCommands();
rep, cacheRep, 0, 0, 0).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -111,7 +111,8 @@ public static DelegationTokenSecretManager getDtSecretManager(
public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(),
dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0);
dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0);
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -142,7 +143,9 @@ public void testDeadDatanode() throws Exception {
// that asks datanode to register again
StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0,
0, 0) };
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands();
CacheReport[] cacheRep = { new CacheReport(0l, 0l) };
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, cacheRep, 0, 0, 0)
.getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());