From fc14a92c6b46cc435a8f33e6fa0512c70caa06e0 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Fri, 30 Aug 2013 22:15:51 +0000 Subject: [PATCH] HDFS-5141. Add cache status information to datanode heartbeat. (Contributed by Andrew Wang) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1519101 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES-HDFS-4949.txt | 3 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 81 ++++++++++++++++++- ...atanodeProtocolClientSideTranslatorPB.java | 8 +- ...atanodeProtocolServerSideTranslatorPB.java | 13 ++- .../hadoop/hdfs/protocolPB/PBHelper.java | 12 ++- .../blockmanagement/DatanodeDescriptor.java | 25 ++++-- .../blockmanagement/DatanodeManager.java | 7 +- .../blockmanagement/HeartbeatManager.java | 6 +- .../hdfs/server/datanode/BPServiceActor.java | 6 +- .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../server/namenode/NameNodeRpcServer.java | 9 ++- .../server/protocol/DatanodeProtocol.java | 1 + .../org/apache/hadoop/hdfs/web/JsonUtil.java | 4 + .../src/main/proto/DatanodeProtocol.proto | 8 ++ .../hadoop-hdfs/src/main/proto/hdfs.proto | 2 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../blockmanagement/TestBlockManager.java | 4 +- .../TestOverReplicatedBlocks.java | 2 +- .../TestReplicationPolicy.java | 20 ++--- .../TestReplicationPolicyWithNodeGroup.java | 22 ++--- .../hdfs/server/common/TestJspHelper.java | 4 +- .../server/datanode/TestBPOfferService.java | 2 + .../server/datanode/TestBlockRecovery.java | 2 + .../server/datanode/TestFsDatasetCache.java | 2 + .../namenode/NNThroughputBenchmark.java | 7 +- .../hdfs/server/namenode/NameNodeAdapter.java | 3 +- .../server/namenode/TestDeadDatanode.java | 5 +- 27 files changed, 209 insertions(+), 57 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index e23a97eb174..769996d46a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -18,6 +18,9 @@ HDFS-4949 (Unreleased) HDFS-5050. Add DataNode support for mlock and munlock (Andrew Wang via Colin Patrick McCabe) + HDFS-5141. Add cache status information to datanode heartbeat. + (Contributed by Andrew Wang) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index 5172bc59f24..3964972c070 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -44,6 +44,8 @@ public class DatanodeInfo extends DatanodeID implements Node { private long dfsUsed; private long remaining; private long blockPoolUsed; + private long cacheCapacity; + private long cacheUsed; private long lastUpdate; private int xceiverCount; private String location = NetworkTopology.DEFAULT_RACK; @@ -81,6 +83,8 @@ public DatanodeInfo(DatanodeInfo from) { this.dfsUsed = from.getDfsUsed(); this.remaining = from.getRemaining(); this.blockPoolUsed = from.getBlockPoolUsed(); + this.cacheCapacity = from.getCacheCapacity(); + this.cacheUsed = from.getCacheUsed(); this.lastUpdate = from.getLastUpdate(); this.xceiverCount = from.getXceiverCount(); this.location = from.getNetworkLocation(); @@ -93,6 +97,8 @@ public DatanodeInfo(DatanodeID nodeID) { this.dfsUsed = 0L; this.remaining = 0L; this.blockPoolUsed = 0L; + this.cacheCapacity = 0L; + this.cacheUsed = 0L; this.lastUpdate = 0L; this.xceiverCount = 0; this.adminState = null; @@ -105,24 +111,29 @@ public DatanodeInfo(DatanodeID nodeID, String location) { public DatanodeInfo(DatanodeID nodeID, String location, final long capacity, final long dfsUsed, final long remaining, - final long blockPoolUsed, final long lastUpdate, final int xceiverCount, + final long blockPoolUsed, final long cacheCapacity, final long cacheUsed, + final long lastUpdate, final int xceiverCount, final AdminStates adminState) { this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining, - blockPoolUsed, lastUpdate, xceiverCount, location, adminState); + blockPoolUsed, cacheCapacity, cacheUsed, lastUpdate, xceiverCount, + location, adminState); } /** Constructor */ public DatanodeInfo(final String ipAddr, final String hostName, final String storageID, final int xferPort, final int infoPort, final int ipcPort, final long capacity, final long dfsUsed, final long remaining, - final long blockPoolUsed, final long lastUpdate, final int xceiverCount, + final long blockPoolUsed, final long cacheCapacity, final long cacheUsed, + final long lastUpdate, final int xceiverCount, final String networkLocation, final AdminStates adminState) { super(ipAddr, hostName, storageID, xferPort, infoPort, ipcPort); this.capacity = capacity; this.dfsUsed = dfsUsed; this.remaining = remaining; this.blockPoolUsed = blockPoolUsed; + this.cacheCapacity = cacheCapacity; + this.cacheUsed = cacheUsed; this.lastUpdate = lastUpdate; this.xceiverCount = xceiverCount; this.location = networkLocation; @@ -168,6 +179,42 @@ public float getRemainingPercent() { return DFSUtil.getPercentRemaining(remaining, capacity); } + /** + * @return Amount of cache capacity in bytes + */ + public long getCacheCapacity() { + return cacheCapacity; + } + + /** + * @return Amount of cache used in bytes + */ + public long getCacheUsed() { + return cacheUsed; + } + + /** + * @return Cache used as a percentage of the datanode's total cache capacity + */ + public float getCacheUsedPercent() { + return DFSUtil.getPercentUsed(cacheUsed, cacheCapacity); + } + + /** + * @return Amount of cache remaining in bytes + */ + public long getCacheRemaining() { + return cacheCapacity - cacheUsed; + } + + /** + * @return Cache remaining as a percentage of the datanode's total cache + * capacity + */ + public float getCacheRemainingPercent() { + return DFSUtil.getPercentRemaining(getCacheRemaining(), cacheCapacity); + } + /** The time when this information was accurate. */ public long getLastUpdate() { return lastUpdate; } @@ -194,6 +241,16 @@ public void setBlockPoolUsed(long bpUsed) { this.blockPoolUsed = bpUsed; } + /** Sets cache capacity. */ + public void setCacheCapacity(long cacheCapacity) { + this.cacheCapacity = cacheCapacity; + } + + /** Sets cache used. */ + public void setCacheUsed(long cacheUsed) { + this.cacheUsed = cacheUsed; + } + /** Sets time when this information was accurate. */ public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; @@ -223,6 +280,11 @@ public String getDatanodeReport() { long nonDFSUsed = getNonDfsUsed(); float usedPercent = getDfsUsedPercent(); float remainingPercent = getRemainingPercent(); + long cc = getCacheCapacity(); + long cr = getCacheRemaining(); + long cu = getCacheUsed(); + float cacheUsedPercent = getCacheUsedPercent(); + float cacheRemainingPercent = getCacheRemainingPercent(); String lookupName = NetUtils.getHostNameOfIP(getName()); buffer.append("Name: "+ getName()); @@ -249,6 +311,12 @@ public String getDatanodeReport() { buffer.append("DFS Remaining: " +r+ " ("+StringUtils.byteDesc(r)+")"+"\n"); buffer.append("DFS Used%: "+percent2String(usedPercent) + "\n"); buffer.append("DFS Remaining%: "+percent2String(remainingPercent) + "\n"); + buffer.append("Configured Cache Capacity: "+c+" ("+StringUtils.byteDesc(cc)+")"+"\n"); + buffer.append("Cache Used: "+cu+" ("+StringUtils.byteDesc(u)+")"+"\n"); + buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(r)+")"+"\n"); + buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n"); + buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n"); + buffer.append("Last contact: "+new Date(lastUpdate)+"\n"); return buffer.toString(); } @@ -259,6 +327,9 @@ public String dumpDatanode() { long c = getCapacity(); long r = getRemaining(); long u = getDfsUsed(); + long cc = getCacheCapacity(); + long cr = getCacheRemaining(); + long cu = getCacheUsed(); buffer.append(getName()); if (!NetworkTopology.DEFAULT_RACK.equals(location)) { buffer.append(" "+location); @@ -274,6 +345,10 @@ public String dumpDatanode() { buffer.append(" " + u + "(" + StringUtils.byteDesc(u)+")"); buffer.append(" " + percent2String(u/(double)c)); buffer.append(" " + r + "(" + StringUtils.byteDesc(r)+")"); + buffer.append(" " + cc + "(" + StringUtils.byteDesc(cc)+")"); + buffer.append(" " + cu + "(" + StringUtils.byteDesc(cu)+")"); + buffer.append(" " + percent2String(cu/(double)cc)); + buffer.append(" " + cr + "(" + StringUtils.byteDesc(cr)+")"); buffer.append(" " + new Date(lastUpdate)); return buffer.toString(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index cf3921cf12e..1578d24e908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -155,8 +156,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration @Override public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, - StorageReport[] reports, int xmitsInProgress, int xceiverCount, - int failedVolumes) throws IOException { + StorageReport[] reports, CacheReport[] cacheReports, int xmitsInProgress, + int xceiverCount, int failedVolumes) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -164,6 +165,9 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, for (StorageReport r : reports) { builder.addReports(PBHelper.convert(r)); } + for (CacheReport r : cacheReports) { + builder.addCacheReports(PBHelper.convert(r)); + } HeartbeatResponseProto resp; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 78bfe1f2c01..8aafcc36154 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto; @@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -111,9 +113,16 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller, p.getCapacity(), p.getDfsUsed(), p.getRemaining(), p.getBlockPoolUsed()); } + List cacheList = request.getCacheReportsList(); + CacheReport[] cacheReport = new CacheReport[list.size()]; + i = 0; + for (CacheReportProto p : cacheList) { + cacheReport[i++] = new CacheReport(p.getCacheCapacity(), + p.getCacheUsed()); + } response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), - report, request.getXmitsInProgress(), request.getXceiverCount(), - request.getFailedVolumes()); + report, cacheReport, request.getXmitsInProgress(), + request.getXceiverCount(), request.getFailedVolumes()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 613edb1fa18..4051d01e031 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto; @@ -121,6 +122,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -469,7 +471,8 @@ static public DatanodeInfo convert(DatanodeInfoProto di) { PBHelper.convert(di.getId()), di.hasLocation() ? di.getLocation() : null , di.getCapacity(), di.getDfsUsed(), di.getRemaining(), - di.getBlockPoolUsed() , di.getLastUpdate() , di.getXceiverCount() , + di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(), + di.getLastUpdate(), di.getXceiverCount(), PBHelper.convert(di.getAdminState())); } @@ -1361,6 +1364,13 @@ public static StorageReportProto convert(StorageReport r) { .setStorageID(r.getStorageID()).build(); } + public static CacheReportProto convert(CacheReport r) { + return CacheReportProto.newBuilder() + .setCacheCapacity(r.getCapacity()) + .setCacheUsed(r.getUsed()) + .build(); + } + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index c542ae343e1..713a156cc46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -159,7 +159,7 @@ synchronized void clear() { * @param nodeID id of the data node */ public DatanodeDescriptor(DatanodeID nodeID) { - this(nodeID, 0L, 0L, 0L, 0L, 0, 0); + this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0); } /** @@ -169,7 +169,7 @@ public DatanodeDescriptor(DatanodeID nodeID) { */ public DatanodeDescriptor(DatanodeID nodeID, String networkLocation) { - this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0, 0); + this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0); } /** @@ -179,6 +179,8 @@ public DatanodeDescriptor(DatanodeID nodeID, * @param dfsUsed space used by the data node * @param remaining remaining capacity of the data node * @param bpused space used by the block pool corresponding to this namenode + * @param cacheCapacity cache capacity of the data node + * @param cacheUsed cache used on the data node * @param xceiverCount # of data transfers at the data node */ public DatanodeDescriptor(DatanodeID nodeID, @@ -186,11 +188,13 @@ public DatanodeDescriptor(DatanodeID nodeID, long dfsUsed, long remaining, long bpused, + long cacheCapacity, + long cacheUsed, int xceiverCount, int failedVolumes) { super(nodeID); - updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, - failedVolumes); + updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity, + cacheUsed, xceiverCount, failedVolumes); } /** @@ -201,6 +205,8 @@ public DatanodeDescriptor(DatanodeID nodeID, * @param dfsUsed the used space by dfs datanode * @param remaining remaining capacity of the data node * @param bpused space used by the block pool corresponding to this namenode + * @param cacheCapacity cache capacity of the data node + * @param cacheUsed cache used on the data node * @param xceiverCount # of data transfers at the data node */ public DatanodeDescriptor(DatanodeID nodeID, @@ -209,11 +215,13 @@ public DatanodeDescriptor(DatanodeID nodeID, long dfsUsed, long remaining, long bpused, + long cacheCapacity, + long cacheUsed, int xceiverCount, int failedVolumes) { super(nodeID, networkLocation); - updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, - failedVolumes); + updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity, + cacheUsed, xceiverCount, failedVolumes); } /** @@ -302,11 +310,14 @@ public int numBlocks() { * Updates stats from datanode heartbeat. */ public void updateHeartbeat(long capacity, long dfsUsed, long remaining, - long blockPoolUsed, int xceiverCount, int volFailures) { + long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount, + int volFailures) { setCapacity(capacity); setRemaining(remaining); setBlockPoolUsed(blockPoolUsed); setDfsUsed(dfsUsed); + setCacheCapacity(cacheCapacity); + setCacheUsed(cacheUsed); setXceiverCount(xceiverCount); setLastUpdate(Time.now()); this.volumeFailures = volFailures; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 9d5024fb9b1..52858139001 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1145,8 +1145,8 @@ private void setDatanodeDead(DatanodeDescriptor node) { public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, final String blockPoolId, long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xceiverCount, int maxTransfers, int failedVolumes - ) throws IOException { + long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, + int failedVolumes) throws IOException { synchronized (heartbeatManager) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; @@ -1167,7 +1167,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, } heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed, - remaining, blockPoolUsed, xceiverCount, failedVolumes); + remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount, + failedVolumes); // If we are in safemode, do not send back any recovery / replication // requests. Don't even drain the existing queue of work. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 0bff1bf52f7..f9c28e99692 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -170,7 +170,7 @@ synchronized void register(final DatanodeDescriptor d) { addDatanode(d); //update its timestamp - d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0); + d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0); } } @@ -193,10 +193,10 @@ synchronized void removeDatanode(DatanodeDescriptor node) { synchronized void updateHeartbeat(final DatanodeDescriptor node, long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xceiverCount, int failedVolumes) { + long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) { stats.subtract(node); node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed, - xceiverCount, failedVolumes); + cacheCapacity, cacheUsed, xceiverCount, failedVolumes); stats.add(node); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 9d561f9626c..81207d37cbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -468,7 +469,10 @@ HeartbeatResponse sendHeartBeat() throws IOException { dn.getFSDataset().getDfsUsed(), dn.getFSDataset().getRemaining(), dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) }; - return bpNamenode.sendHeartbeat(bpRegistration, report, + CacheReport[] cacheReport = { new CacheReport( + dn.getFSDataset().getCacheCapacity(), + dn.getFSDataset().getCacheUsed()) }; + return bpNamenode.sendHeartbeat(bpRegistration, report, cacheReport, dn.getXmitsInProgress(), dn.getXceiverCount(), dn.getFSDataset().getNumFailedVolumes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b93d75c1256..9e817629d10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3962,15 +3962,15 @@ String getRegistrationID() { */ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xceiverCount, int xmitsInProgress, int failedVolumes) - throws IOException { + long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, + int failedVolumes) throws IOException { readLock(); try { final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed, - xceiverCount, maxTransfer, failedVolumes); + cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes); return new HeartbeatResponse(cmds, createHaStatusHeartbeat()); } finally { readUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 875f81642e5..b96df2a6aa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -100,6 +100,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -935,13 +936,13 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) @Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, - StorageReport[] report, int xmitsInProgress, int xceiverCount, - int failedVolumes) throws IOException { + StorageReport[] report, CacheReport[] cacheReport, int xmitsInProgress, + int xceiverCount, int failedVolumes) throws IOException { verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(), report[0].getDfsUsed(), report[0].getRemaining(), - report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress, - failedVolumes); + report[0].getBlockPoolUsed(), cacheReport[0].getCapacity(), + cacheReport[0].getUsed(), xceiverCount, xmitsInProgress, failedVolumes); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 39992754e7d..9e74967dacd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -106,6 +106,7 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration @Idempotent public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, StorageReport[] reports, + CacheReport[] cacheReports, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index 840087393cd..208f285a9b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -301,6 +301,8 @@ private static Map toJsonMap(final DatanodeInfo datanodeinfo) { m.put("dfsUsed", datanodeinfo.getDfsUsed()); m.put("remaining", datanodeinfo.getRemaining()); m.put("blockPoolUsed", datanodeinfo.getBlockPoolUsed()); + m.put("cacheCapacity", datanodeinfo.getCacheCapacity()); + m.put("cacheUsed", datanodeinfo.getCacheUsed()); m.put("lastUpdate", datanodeinfo.getLastUpdate()); m.put("xceiverCount", datanodeinfo.getXceiverCount()); m.put("networkLocation", datanodeinfo.getNetworkLocation()); @@ -326,6 +328,8 @@ private static DatanodeInfo toDatanodeInfo(final Map m) { (Long)m.get("dfsUsed"), (Long)m.get("remaining"), (Long)m.get("blockPoolUsed"), + (Long)m.get("cacheCapacity"), + (Long)m.get("cacheUsed"), (Long)m.get("lastUpdate"), (int)(long)(Long)m.get("xceiverCount"), (String)m.get("networkLocation"), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index aeb4028de74..3dcf9bb648b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -164,6 +164,8 @@ message RegisterDatanodeResponseProto { * xmitsInProgress - number of transfers from this datanode to others * xceiverCount - number of active transceiver threads * failedVolumes - number of failed volumes + * cacheCapacity - total cache capacity available at the datanode + * cacheUsed - amount of cache used */ message HeartbeatRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info @@ -171,6 +173,7 @@ message HeartbeatRequestProto { optional uint32 xmitsInProgress = 3 [ default = 0 ]; optional uint32 xceiverCount = 4 [ default = 0 ]; optional uint32 failedVolumes = 5 [ default = 0 ]; + repeated CacheReportProto cacheReports = 6; } message StorageReportProto { @@ -182,6 +185,11 @@ message StorageReportProto { optional uint64 blockPoolUsed = 6 [ default = 0 ]; } +message CacheReportProto { + optional uint64 cacheCapacity = 1 [default = 0 ]; + optional uint64 cacheUsed = 2 [default = 0 ]; +} + /** * state - State the NN is in when returning response to the DN * txid - Highest transaction ID this NN has seen diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 085f629a8d3..60e1e1f087e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -82,6 +82,8 @@ message DatanodeInfoProto { } optional AdminState adminState = 10 [default = NORMAL]; + optional uint64 cacheCapacity = 11 [default = 0]; + optional uint64 cacheUsed = 12 [default = 0]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index f0c10b0a2fe..c033f37ac18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -847,7 +847,7 @@ public static DatanodeInfo getLocalDatanodeInfo(String ipAddr, DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT, - 1, 2, 3, 4, 5, 6, "local", adminState); + 1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState); } public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index e88ec92e39b..89a46efaf4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -98,7 +98,9 @@ private void addNodes(Iterable nodesToAdd) { cluster.add(dn); dn.updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 0L, 0L, + 0, 0); bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 79785961c91..7148b8271b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -104,7 +104,7 @@ public void testProcesOverReplicateBlock() throws Exception { String corruptMachineName = corruptDataNode.getXferAddr(); for (DatanodeDescriptor datanode : hm.getDatanodes()) { if (!corruptMachineName.equals(datanode.getXferAddr())) { - datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0); + datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0L, 0L, 0, 0); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index ba6c3737266..a284a09f740 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -116,7 +116,7 @@ public static void setupCluster() throws Exception { for (int i=0; i < NUM_OF_DATANODES; i++) { dataNodes[i].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } } @@ -133,7 +133,8 @@ public static void setupCluster() throws Exception { public void testChooseTarget1() throws Exception { dataNodes[0].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded + HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 0L, 0L, 4, 0); // overloaded DatanodeDescriptor[] targets; targets = replicator.chooseTarget(filename, 0, dataNodes[0], @@ -168,7 +169,7 @@ public void testChooseTarget1() throws Exception { dataNodes[0].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } private static DatanodeDescriptor[] chooseTarget( @@ -271,7 +272,8 @@ public void testChooseTarget3() throws Exception { // make data node 0 to be not qualified to choose dataNodes[0].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space + (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, + 0L, 0L, 0, 0); // no space DatanodeDescriptor[] targets; targets = replicator.chooseTarget(filename, 0, dataNodes[0], @@ -309,7 +311,7 @@ public void testChooseTarget3() throws Exception { dataNodes[0].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } /** @@ -326,7 +328,7 @@ public void testChoooseTarget4() throws Exception { for(int i=0; i<2; i++) { dataNodes[i].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); + (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } DatanodeDescriptor[] targets; @@ -358,7 +360,7 @@ public void testChoooseTarget4() throws Exception { for(int i=0; i<2; i++) { dataNodes[i].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } } @@ -424,7 +426,7 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception { for(int i=0; i<2; i++) { dataNodes[i].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); + (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } final LogVerificationAppender appender = new LogVerificationAppender(); @@ -451,7 +453,7 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception { for(int i=0; i<2; i++) { dataNodes[i].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 032c2c08396..3e2d60e4e17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -129,7 +129,7 @@ private static void setupDataNodeCapacity() { for(int i=0; i chosenNodes = new ArrayList(); chosenNodes.add(dataNodesInBoundaryCase[0]); @@ -651,7 +653,7 @@ public void testChooseMoreTargetsThanNodeGroups() throws Exception { for(int i=0; i live = new ArrayList(); live.add(dnDesc1); live.add(dnDesc2); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 42ea48230ee..c351b3468c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -126,6 +127,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) .when(mock).sendHeartbeat( Mockito.any(DatanodeRegistration.class), Mockito.any(StorageReport[].class), + Mockito.any(CacheReport[].class), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index a5792ad217f..d45f3fba504 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -154,6 +155,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation) when(namenode.sendHeartbeat( Mockito.any(DatanodeRegistration.class), Mockito.any(StorageReport[].class), + Mockito.any(CacheReport[].class), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt())) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index af7d323c8a5..6d67670783c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -105,6 +106,7 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds) doReturn(response).when(spyNN).sendHeartbeat( (DatanodeRegistration) any(), (StorageReport[]) any(), + (CacheReport[]) any(), anyInt(), anyInt(), anyInt()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 77f8560816f..0d195941aeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -863,8 +864,9 @@ void sendHeartbeat() throws IOException { // TODO:FEDERATION currently a single block pool is supported StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; + CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - rep, 0, 0, 0).getCommands(); + rep, cacheRep, 0, 0, 0).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -910,8 +912,9 @@ int replicateBlocks() throws IOException { // register datanode StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; + CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - rep, 0, 0, 0).getCommands(); + rep, cacheRep, 0, 0, 0).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index cf64c335bac..b83adecd194 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -111,7 +111,8 @@ public static DelegationTokenSecretManager getDtSecretManager( public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg, DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(), - dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0); + dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), + dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0); } public static boolean setReplication(final FSNamesystem ns, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index d78198ab402..ddcedcf44f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -142,7 +143,9 @@ public void testDeadDatanode() throws Exception { // that asks datanode to register again StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0, 0, 0) }; - DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands(); + CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; + DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, cacheRep, 0, 0, 0) + .getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction());