HDFS-11896. Non-dfsUsed will be doubled on dead node re-registration. Contributed by Brahma Reddy Battula.
(cherry picked from commit c4a85c694fae3f814ab4e7f3c172da1df0e0e353)
This commit is contained in:
parent
b51623503f
commit
f90b9d2b25
@ -396,6 +396,9 @@ Release 2.7.4 - UNRELEASED
|
||||
|
||||
HDFS-11742. Improve balancer usability after HDFS-8818. (kihwal)
|
||||
|
||||
HDFS-11896. Non-dfsUsed will be doubled on dead node re-registration.
|
||||
(Brahma Reddy Battula via shv)
|
||||
|
||||
Release 2.7.3 - 2016-08-25
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -311,11 +311,7 @@ boolean removeBlock(String storageID, BlockInfoContiguous b) {
|
||||
}
|
||||
|
||||
public void resetBlocks() {
|
||||
setCapacity(0);
|
||||
setRemaining(0);
|
||||
setBlockPoolUsed(0);
|
||||
setDfsUsed(0);
|
||||
setXceiverCount(0);
|
||||
updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null);
|
||||
this.invalidateBlocks.clear();
|
||||
this.volumeFailures = 0;
|
||||
// pendingCached, cached, and pendingUncached are protected by the
|
||||
@ -346,6 +342,11 @@ public int numBlocks() {
|
||||
return blocks;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean isHeartbeatedSinceRegistration() {
|
||||
return heartbeatedSinceRegistration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates stats from datanode heartbeat.
|
||||
*/
|
||||
@ -363,6 +364,16 @@ public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
|
||||
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
||||
long cacheUsed, int xceiverCount, int volFailures,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
|
||||
volFailures, volumeFailureSummary);
|
||||
setLastUpdate(Time.now());
|
||||
setLastUpdateMonotonic(Time.monotonicNow());
|
||||
rollBlocksScheduled(getLastUpdateMonotonic());
|
||||
}
|
||||
|
||||
private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
|
||||
long cacheUsed, int xceiverCount, int volFailures,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
long totalCapacity = 0;
|
||||
long totalRemaining = 0;
|
||||
long totalBlockPoolUsed = 0;
|
||||
@ -409,8 +420,6 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
||||
setCacheCapacity(cacheCapacity);
|
||||
setCacheUsed(cacheUsed);
|
||||
setXceiverCount(xceiverCount);
|
||||
setLastUpdate(Time.now());
|
||||
setLastUpdateMonotonic(Time.monotonicNow());
|
||||
this.volumeFailures = volFailures;
|
||||
this.volumeFailureSummary = volumeFailureSummary;
|
||||
for (StorageReport report : reports) {
|
||||
@ -426,7 +435,6 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
||||
totalDfsUsed += report.getDfsUsed();
|
||||
totalNonDfsUsed += report.getNonDfsUsed();
|
||||
}
|
||||
rollBlocksScheduled(getLastUpdateMonotonic());
|
||||
|
||||
// Update total metrics for the node.
|
||||
setCapacity(totalCapacity);
|
||||
|
@ -188,10 +188,9 @@ public synchronized int getExpiredHeartbeats() {
|
||||
|
||||
synchronized void register(final DatanodeDescriptor d) {
|
||||
if (!d.isAlive) {
|
||||
addDatanode(d);
|
||||
|
||||
//update its timestamp
|
||||
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
|
||||
addDatanode(d);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1066,10 +1066,8 @@ private BPOfferService getBPOSForBlock(ExtendedBlock block)
|
||||
return bpos;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// used only for testing
|
||||
void setHeartbeatsDisabledForTests(
|
||||
public void setHeartbeatsDisabledForTests(
|
||||
boolean heartbeatsDisabledForTests) {
|
||||
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
|
||||
}
|
||||
|
@ -19,10 +19,12 @@
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -33,6 +35,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
@ -45,6 +48,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -128,4 +132,51 @@ public void testDeadDatanode() throws Exception {
|
||||
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
|
||||
.getAction());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDFSUsedONDeadNodeReReg() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
6 * 1000);
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
|
||||
assertTrue(initialCapacity > 0);
|
||||
DataNode dn1 = cluster.getDataNodes().get(0);
|
||||
DataNode dn2 = cluster.getDataNodes().get(1);
|
||||
final DatanodeDescriptor dn2Desc = cluster.getNamesystem(0)
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dn2.getDatanodeId());
|
||||
dn1.setHeartbeatsDisabledForTests(true);
|
||||
cluster.setDataNodeDead(dn1.getDatanodeId());
|
||||
assertEquals("Capacity shouldn't include DeadNode", dn2Desc.getCapacity(),
|
||||
cluster.getNamesystem(0).getCapacityTotal());
|
||||
assertEquals("NonDFS-used shouldn't include DeadNode",
|
||||
dn2Desc.getNonDfsUsed(),
|
||||
cluster.getNamesystem(0).getNonDfsUsedSpace());
|
||||
// Wait for re-registration and heartbeat
|
||||
dn1.setHeartbeatsDisabledForTests(false);
|
||||
final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0)
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dn1.getDatanodeId());
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return dn1Desc.isAlive && dn1Desc.isHeartbeatedSinceRegistration();
|
||||
}
|
||||
}, 100, 5000);
|
||||
assertEquals("Capacity should be 0 after all DNs dead", initialCapacity,
|
||||
cluster.getNamesystem(0).getCapacityTotal());
|
||||
long nonDFSAfterReg = cluster.getNamesystem(0).getNonDfsUsedSpace();
|
||||
assertEquals("NonDFS should include actual DN NonDFSUsed",
|
||||
dn1Desc.getNonDfsUsed() + dn2Desc.getNonDfsUsed(), nonDFSAfterReg);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user