HDFS-7886. Fix TestFileTruncate falures. Contributed by Plamen Jeliazkov and Konstantin Shvachko.

This commit is contained in:
Konstantin V Shvachko 2015-03-16 12:54:04 -07:00
parent 587d8be17b
commit ce5de93a58
3 changed files with 51 additions and 13 deletions

View File

@ -1157,6 +1157,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and
fail to tell the DFSClient about it because of a network error (cmccabe) fail to tell the DFSClient about it because of a network error (cmccabe)
HDFS-7886. Fix TestFileTruncate falures. (Plamen Jeliazkov and shv)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -77,9 +77,12 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
@ -1343,7 +1346,6 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
} }
int curDatanodesNum = dataNodes.size(); int curDatanodesNum = dataNodes.size();
final int curDatanodesNumSaved = curDatanodesNum;
// for mincluster's the default initialDelay for BRs is 0 // for mincluster's the default initialDelay for BRs is 0
if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) { if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0); conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
@ -2022,7 +2024,23 @@ public boolean restartDataNode(int i) throws IOException {
*/ */
public synchronized boolean restartDataNode(int i, boolean keepPort) public synchronized boolean restartDataNode(int i, boolean keepPort)
throws IOException { throws IOException {
DataNodeProperties dnprop = stopDataNode(i); return restartDataNode(i, keepPort, false);
}
/**
* Restart a particular DataNode.
* @param idn index of the DataNode
* @param keepPort true if should restart on the same port
* @param expireOnNN true if NameNode should expire the DataNode heartbeat
* @return
* @throws IOException
*/
public synchronized boolean restartDataNode(
int idn, boolean keepPort, boolean expireOnNN) throws IOException {
DataNodeProperties dnprop = stopDataNode(idn);
if(expireOnNN) {
setDataNodeDead(dnprop.datanode.getDatanodeId());
}
if (dnprop == null) { if (dnprop == null) {
return false; return false;
} else { } else {
@ -2030,6 +2048,24 @@ public synchronized boolean restartDataNode(int i, boolean keepPort)
} }
} }
/**
* Expire a DataNode heartbeat on the NameNode
* @param dnId
* @throws IOException
*/
public void setDataNodeDead(DatanodeID dnId) throws IOException {
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(getNamesystem(), dnId);
dnd.setLastUpdate(0L);
BlockManagerTestUtil.checkHeartbeat(getNamesystem().getBlockManager());
}
public void setDataNodesDead() throws IOException {
for (DataNodeProperties dnp : dataNodes) {
setDataNodeDead(dnp.datanode.getDatanodeId());
}
}
/* /*
* Restart all datanodes, on the same ports if keepPort is true * Restart all datanodes, on the same ports if keepPort is true
*/ */
@ -2255,8 +2291,8 @@ private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
// make sure all datanodes have sent first heartbeat to namenode, // make sure all datanodes have sent first heartbeat to namenode,
// using (capacity == 0) as proxy. // using (capacity == 0) as proxy.
for (DatanodeInfo dn : dnInfo) { for (DatanodeInfo dn : dnInfo) {
if (dn.getCapacity() == 0) { if (dn.getCapacity() == 0 || dn.getLastUpdate() <= 0) {
LOG.info("dn.getCapacity() == 0"); LOG.info("No heartbeat from DataNode: " + dn.toString());
return true; return true;
} }
} }

View File

@ -679,10 +679,10 @@ public void testTruncateWithDataNodesRestart() throws Exception {
boolean isReady = fs.truncate(p, newLength); boolean isReady = fs.truncate(p, newLength);
assertFalse(isReady); assertFalse(isReady);
} finally { } finally {
cluster.restartDataNode(dn); cluster.restartDataNode(dn, true, true);
cluster.waitActive(); cluster.waitActive();
cluster.triggerBlockReports();
} }
checkBlockRecovery(p);
LocatedBlock newBlock = getLocatedBlocks(p).getLastLocatedBlock(); LocatedBlock newBlock = getLocatedBlocks(p).getLastLocatedBlock();
/* /*
@ -699,7 +699,6 @@ public void testTruncateWithDataNodesRestart() throws Exception {
assertEquals(newBlock.getBlock().getGenerationStamp(), assertEquals(newBlock.getBlock().getGenerationStamp(),
oldBlock.getBlock().getGenerationStamp() + 1); oldBlock.getBlock().getGenerationStamp() + 1);
checkBlockRecovery(p);
// Wait replicas come to 3 // Wait replicas come to 3
DFSTestUtil.waitReplication(fs, p, REPLICATION); DFSTestUtil.waitReplication(fs, p, REPLICATION);
// Old replica is disregarded and replaced with the truncated one // Old replica is disregarded and replaced with the truncated one
@ -741,10 +740,10 @@ public void testCopyOnTruncateWithDataNodesRestart() throws Exception {
boolean isReady = fs.truncate(p, newLength); boolean isReady = fs.truncate(p, newLength);
assertFalse(isReady); assertFalse(isReady);
} finally { } finally {
cluster.restartDataNode(dn); cluster.restartDataNode(dn, true, true);
cluster.waitActive(); cluster.waitActive();
cluster.triggerBlockReports();
} }
checkBlockRecovery(p);
LocatedBlock newBlock = getLocatedBlocks(p).getLastLocatedBlock(); LocatedBlock newBlock = getLocatedBlocks(p).getLastLocatedBlock();
/* /*
@ -757,7 +756,6 @@ public void testCopyOnTruncateWithDataNodesRestart() throws Exception {
assertEquals(newBlock.getBlock().getGenerationStamp(), assertEquals(newBlock.getBlock().getGenerationStamp(),
oldBlock.getBlock().getGenerationStamp() + 1); oldBlock.getBlock().getGenerationStamp() + 1);
checkBlockRecovery(p);
// Wait replicas come to 3 // Wait replicas come to 3
DFSTestUtil.waitReplication(fs, p, REPLICATION); DFSTestUtil.waitReplication(fs, p, REPLICATION);
// New block is replicated to dn1 // New block is replicated to dn1
@ -800,9 +798,10 @@ public void testTruncateWithDataNodesRestartImmediately() throws Exception {
boolean isReady = fs.truncate(p, newLength); boolean isReady = fs.truncate(p, newLength);
assertFalse(isReady); assertFalse(isReady);
cluster.restartDataNode(dn0); cluster.restartDataNode(dn0, true, true);
cluster.restartDataNode(dn1); cluster.restartDataNode(dn1, true, true);
cluster.waitActive(); cluster.waitActive();
checkBlockRecovery(p);
cluster.triggerBlockReports(); cluster.triggerBlockReports();
LocatedBlock newBlock = getLocatedBlocks(p).getLastLocatedBlock(); LocatedBlock newBlock = getLocatedBlocks(p).getLastLocatedBlock();
@ -815,7 +814,6 @@ public void testTruncateWithDataNodesRestartImmediately() throws Exception {
assertEquals(newBlock.getBlock().getGenerationStamp(), assertEquals(newBlock.getBlock().getGenerationStamp(),
oldBlock.getBlock().getGenerationStamp() + 1); oldBlock.getBlock().getGenerationStamp() + 1);
checkBlockRecovery(p);
// Wait replicas come to 3 // Wait replicas come to 3
DFSTestUtil.waitReplication(fs, p, REPLICATION); DFSTestUtil.waitReplication(fs, p, REPLICATION);
// Old replica is disregarded and replaced with the truncated one on dn0 // Old replica is disregarded and replaced with the truncated one on dn0
@ -859,6 +857,7 @@ public void testTruncateWithDataNodesShutdownImmediately() throws Exception {
assertFalse(isReady); assertFalse(isReady);
cluster.shutdownDataNodes(); cluster.shutdownDataNodes();
cluster.setDataNodesDead();
try { try {
for(int i = 0; i < SUCCESS_ATTEMPTS && cluster.isDataNodeUp(); i++) { for(int i = 0; i < SUCCESS_ATTEMPTS && cluster.isDataNodeUp(); i++) {
Thread.sleep(SLEEP); Thread.sleep(SLEEP);
@ -871,6 +870,7 @@ public void testTruncateWithDataNodesShutdownImmediately() throws Exception {
StartupOption.REGULAR, null); StartupOption.REGULAR, null);
cluster.waitActive(); cluster.waitActive();
} }
checkBlockRecovery(p);
fs.delete(parent, true); fs.delete(parent, true);
} }