HDFS-11896. Non-dfsUsed will be doubled on dead node re-registration. Contributed by Brahma Reddy Battula.
This commit is contained in:
parent
11ece0bda1
commit
c4a85c694f
|
@ -331,11 +331,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetBlocks() {
|
public void resetBlocks() {
|
||||||
setCapacity(0);
|
updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null);
|
||||||
setRemaining(0);
|
|
||||||
setBlockPoolUsed(0);
|
|
||||||
setDfsUsed(0);
|
|
||||||
setXceiverCount(0);
|
|
||||||
this.invalidateBlocks.clear();
|
this.invalidateBlocks.clear();
|
||||||
this.volumeFailures = 0;
|
this.volumeFailures = 0;
|
||||||
// pendingCached, cached, and pendingUncached are protected by the
|
// pendingCached, cached, and pendingUncached are protected by the
|
||||||
|
@ -384,6 +380,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
||||||
long cacheUsed, int xceiverCount, int volFailures,
|
long cacheUsed, int xceiverCount, int volFailures,
|
||||||
VolumeFailureSummary volumeFailureSummary) {
|
VolumeFailureSummary volumeFailureSummary) {
|
||||||
|
updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
|
||||||
|
volFailures, volumeFailureSummary);
|
||||||
|
setLastUpdate(Time.now());
|
||||||
|
setLastUpdateMonotonic(Time.monotonicNow());
|
||||||
|
rollBlocksScheduled(getLastUpdateMonotonic());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
|
||||||
|
long cacheUsed, int xceiverCount, int volFailures,
|
||||||
|
VolumeFailureSummary volumeFailureSummary) {
|
||||||
long totalCapacity = 0;
|
long totalCapacity = 0;
|
||||||
long totalRemaining = 0;
|
long totalRemaining = 0;
|
||||||
long totalBlockPoolUsed = 0;
|
long totalBlockPoolUsed = 0;
|
||||||
|
@ -434,8 +440,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
setCacheCapacity(cacheCapacity);
|
setCacheCapacity(cacheCapacity);
|
||||||
setCacheUsed(cacheUsed);
|
setCacheUsed(cacheUsed);
|
||||||
setXceiverCount(xceiverCount);
|
setXceiverCount(xceiverCount);
|
||||||
setLastUpdate(Time.now());
|
|
||||||
setLastUpdateMonotonic(Time.monotonicNow());
|
|
||||||
this.volumeFailures = volFailures;
|
this.volumeFailures = volFailures;
|
||||||
this.volumeFailureSummary = volumeFailureSummary;
|
this.volumeFailureSummary = volumeFailureSummary;
|
||||||
for (StorageReport report : reports) {
|
for (StorageReport report : reports) {
|
||||||
|
@ -451,7 +455,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
totalDfsUsed += report.getDfsUsed();
|
totalDfsUsed += report.getDfsUsed();
|
||||||
totalNonDfsUsed += report.getNonDfsUsed();
|
totalNonDfsUsed += report.getNonDfsUsed();
|
||||||
}
|
}
|
||||||
rollBlocksScheduled(getLastUpdateMonotonic());
|
|
||||||
|
|
||||||
// Update total metrics for the node.
|
// Update total metrics for the node.
|
||||||
setCapacity(totalCapacity);
|
setCapacity(totalCapacity);
|
||||||
|
|
|
@ -1319,7 +1319,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
// used only for testing
|
// used only for testing
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setHeartbeatsDisabledForTests(
|
public void setHeartbeatsDisabledForTests(
|
||||||
boolean heartbeatsDisabledForTests) {
|
boolean heartbeatsDisabledForTests) {
|
||||||
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
|
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -36,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
@ -52,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -178,4 +182,53 @@ public class TestDeadDatanode {
|
||||||
.getDatanodeDescriptor().equals(clientNode));
|
.getDatanodeDescriptor().equals(clientNode));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonDFSUsedONDeadNodeReReg() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||||
|
6 * 1000);
|
||||||
|
long CAPACITY = 5000L;
|
||||||
|
long[] capacities = new long[] { 4 * CAPACITY, 4 * CAPACITY };
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
||||||
|
.simulatedCapacities(capacities).build();
|
||||||
|
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
|
||||||
|
assertTrue(initialCapacity > 0);
|
||||||
|
DataNode dn1 = cluster.getDataNodes().get(0);
|
||||||
|
DataNode dn2 = cluster.getDataNodes().get(1);
|
||||||
|
final DatanodeDescriptor dn2Desc = cluster.getNamesystem(0)
|
||||||
|
.getBlockManager().getDatanodeManager()
|
||||||
|
.getDatanode(dn2.getDatanodeId());
|
||||||
|
dn1.setHeartbeatsDisabledForTests(true);
|
||||||
|
cluster.setDataNodeDead(dn1.getDatanodeId());
|
||||||
|
assertEquals("Capacity shouldn't include DeadNode", dn2Desc.getCapacity(),
|
||||||
|
cluster.getNamesystem(0).getCapacityTotal());
|
||||||
|
assertEquals("NonDFS-used shouldn't include DeadNode",
|
||||||
|
dn2Desc.getNonDfsUsed(),
|
||||||
|
cluster.getNamesystem(0).getNonDfsUsedSpace());
|
||||||
|
// Wait for re-registration and heartbeat
|
||||||
|
dn1.setHeartbeatsDisabledForTests(false);
|
||||||
|
final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0)
|
||||||
|
.getBlockManager().getDatanodeManager()
|
||||||
|
.getDatanode(dn1.getDatanodeId());
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
|
||||||
|
@Override public Boolean get() {
|
||||||
|
return dn1Desc.isAlive() && dn1Desc.isHeartbeatedSinceRegistration();
|
||||||
|
}
|
||||||
|
}, 100, 5000);
|
||||||
|
assertEquals("Capacity should be 0 after all DNs dead", initialCapacity,
|
||||||
|
cluster.getNamesystem(0).getCapacityTotal());
|
||||||
|
long nonDfsAfterReg = cluster.getNamesystem(0).getNonDfsUsedSpace();
|
||||||
|
assertEquals("NonDFS should include actual DN NonDFSUsed",
|
||||||
|
dn1Desc.getNonDfsUsed() + dn2Desc.getNonDfsUsed(), nonDfsAfterReg);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue