HDFS-13542. TestBlockManager#testNeededReplicationWhileAppending fails due to improper cluster shutdown in TestBlockManager#testBlockManagerMachinesArray on Windows. Contributed by Anbang Hu.

This commit is contained in:
Inigo Goiri 2018-05-11 09:50:40 -07:00
parent 11794e5eda
commit d88d9f2874
1 changed files with 84 additions and 69 deletions

View File

@ -441,8 +441,8 @@ public class TestBlockManager {
String src = "/test-file"; String src = "/test-file";
Path file = new Path(src); Path file = new Path(src);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
try { try {
cluster.waitActive();
BlockManager bm = cluster.getNamesystem().getBlockManager(); BlockManager bm = cluster.getNamesystem().getBlockManager();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
NamenodeProtocols namenode = cluster.getNameNodeRpc(); NamenodeProtocols namenode = cluster.getNameNodeRpc();
@ -481,7 +481,9 @@ public class TestBlockManager {
IOUtils.closeStream(out); IOUtils.closeStream(out);
} }
} finally { } finally {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
} }
@ -970,7 +972,9 @@ public class TestBlockManager {
assertTrue(fs.exists(file1)); assertTrue(fs.exists(file1));
fs.delete(file1, true); fs.delete(file1, true);
assertTrue(!fs.exists(file1)); assertTrue(!fs.exists(file1));
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
} }
@ -1070,7 +1074,9 @@ public class TestBlockManager {
assertEquals(0, bm.getBlockOpQueueLength()); assertEquals(0, bm.getBlockOpQueueLength());
assertTrue(doneLatch.await(1, TimeUnit.SECONDS)); assertTrue(doneLatch.await(1, TimeUnit.SECONDS));
} finally { } finally {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
} }
@ -1145,7 +1151,9 @@ public class TestBlockManager {
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb); long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
assertTrue(batched > 0); assertTrue(batched > 0);
} finally { } finally {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
} }
@ -1154,76 +1162,83 @@ public class TestBlockManager {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
cluster.waitActive(); try {
BlockManager blockManager = cluster.getNamesystem().getBlockManager(); cluster.waitActive();
FileSystem fs = cluster.getFileSystem(); BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final Path filePath = new Path("/tmp.txt"); FileSystem fs = cluster.getFileSystem();
final long fileLen = 1L; final Path filePath = new Path("/tmp.txt");
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L); final long fileLen = 1L;
DFSTestUtil.waitForReplication((DistributedFileSystem)fs, DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
filePath, (short) 3, 60000); DFSTestUtil.waitForReplication((DistributedFileSystem)fs,
ArrayList<DataNode> datanodes = cluster.getDataNodes(); filePath, (short) 3, 60000);
assertEquals(datanodes.size(), 4); ArrayList<DataNode> datanodes = cluster.getDataNodes();
FSNamesystem ns = cluster.getNamesystem(); assertEquals(datanodes.size(), 4);
// get the block FSNamesystem ns = cluster.getNamesystem();
final String bpid = cluster.getNamesystem().getBlockPoolId(); // get the block
File storageDir = cluster.getInstanceStorageDir(0, 0); final String bpid = cluster.getNamesystem().getBlockPoolId();
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File storageDir = cluster.getInstanceStorageDir(0, 0);
assertTrue("Data directory does not exist", dataDir.exists()); File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
BlockInfo blockInfo = blockManager.blocksMap.getBlocks().iterator().next(); assertTrue("Data directory does not exist", dataDir.exists());
ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(), BlockInfo blockInfo =
blockInfo.getNumBytes(), blockInfo.getGenerationStamp()); blockManager.blocksMap.getBlocks().iterator().next();
DatanodeDescriptor failedStorageDataNode = ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
blockManager.getStoredBlock(blockInfo).getDatanode(0); blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
DatanodeDescriptor corruptStorageDataNode = DatanodeDescriptor failedStorageDataNode =
blockManager.getStoredBlock(blockInfo).getDatanode(1); blockManager.getStoredBlock(blockInfo).getDatanode(0);
DatanodeDescriptor corruptStorageDataNode =
blockManager.getStoredBlock(blockInfo).getDatanode(1);
ArrayList<StorageReport> reports = new ArrayList<StorageReport>(); ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) { for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
DatanodeStorageInfo storageInfo = failedStorageDataNode DatanodeStorageInfo storageInfo = failedStorageDataNode
.getStorageInfos()[i]; .getStorageInfos()[i];
DatanodeStorage dns = new DatanodeStorage( DatanodeStorage dns = new DatanodeStorage(
failedStorageDataNode.getStorageInfos()[i].getStorageID(), failedStorageDataNode.getStorageInfos()[i].getStorageID(),
DatanodeStorage.State.FAILED, DatanodeStorage.State.FAILED,
failedStorageDataNode.getStorageInfos()[i].getStorageType()); failedStorageDataNode.getStorageInfos()[i].getStorageType());
while(storageInfo.getBlockIterator().hasNext()) { while(storageInfo.getBlockIterator().hasNext()) {
BlockInfo blockInfo1 = storageInfo.getBlockIterator().next(); BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
if(blockInfo1.equals(blockInfo)) { if(blockInfo1.equals(blockInfo)) {
StorageReport report = new StorageReport( StorageReport report = new StorageReport(
dns, true, storageInfo.getCapacity(), dns, true, storageInfo.getCapacity(),
storageInfo.getDfsUsed(), storageInfo.getRemaining(), storageInfo.getDfsUsed(), storageInfo.getRemaining(),
storageInfo.getBlockPoolUsed(), 0L); storageInfo.getBlockPoolUsed(), 0L);
reports.add(report); reports.add(report);
break; break;
}
} }
} }
} failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport .EMPTY_ARRAY), 0L, 0L, 0, 0, null);
.EMPTY_ARRAY), 0L, 0L, 0, 0, null); ns.writeLock();
ns.writeLock(); DatanodeStorageInfo corruptStorageInfo= null;
DatanodeStorageInfo corruptStorageInfo= null; for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) { corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i]; while(corruptStorageInfo.getBlockIterator().hasNext()) {
while(corruptStorageInfo.getBlockIterator().hasNext()) { BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next(); if (blockInfo1.equals(blockInfo)) {
if (blockInfo1.equals(blockInfo)) { break;
break; }
} }
} }
blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
corruptStorageInfo.getStorageID(),
CorruptReplicasMap.Reason.ANY.toString());
ns.writeUnlock();
BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
ns.readLock();
LocatedBlocks locatedBlocks =
blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
false, false, null);
assertTrue("Located Blocks should exclude corrupt" +
"replicas and failed storages",
locatedBlocks.getLocatedBlocks().size() == 1);
ns.readUnlock();
} finally {
if (cluster != null) {
cluster.shutdown();
}
} }
blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
corruptStorageInfo.getStorageID(),
CorruptReplicasMap.Reason.ANY.toString());
ns.writeUnlock();
BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
ns.readLock();
LocatedBlocks locatedBlocks =
blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
false, false, null);
assertTrue("Located Blocks should exclude corrupt" +
"replicas and failed storages",
locatedBlocks.getLocatedBlocks().size() == 1);
ns.readUnlock();
} }
@Test @Test