HDFS-13542. TestBlockManager#testNeededReplicationWhileAppending fails due to improper cluster shutdown in TestBlockManager#testBlockManagerMachinesArray on Windows. Contributed by Anbang Hu.
(cherry picked from commit d50c4d71dc
)
This commit is contained in:
parent
3665217d50
commit
e332e011ae
|
@ -452,8 +452,8 @@ public class TestBlockManager {
|
||||||
String src = "/test-file";
|
String src = "/test-file";
|
||||||
Path file = new Path(src);
|
Path file = new Path(src);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
cluster.waitActive();
|
|
||||||
try {
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||||
|
@ -492,7 +492,9 @@ public class TestBlockManager {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1043,7 +1045,9 @@ public class TestBlockManager {
|
||||||
assertTrue(fs.exists(file1));
|
assertTrue(fs.exists(file1));
|
||||||
fs.delete(file1, true);
|
fs.delete(file1, true);
|
||||||
assertTrue(!fs.exists(file1));
|
assertTrue(!fs.exists(file1));
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1143,7 +1147,9 @@ public class TestBlockManager {
|
||||||
assertEquals(0, bm.getBlockOpQueueLength());
|
assertEquals(0, bm.getBlockOpQueueLength());
|
||||||
assertTrue(doneLatch.await(1, TimeUnit.SECONDS));
|
assertTrue(doneLatch.await(1, TimeUnit.SECONDS));
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1218,7 +1224,9 @@ public class TestBlockManager {
|
||||||
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
|
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
|
||||||
assertTrue(batched > 0);
|
assertTrue(batched > 0);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1227,76 +1235,83 @@ public class TestBlockManager {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
final MiniDFSCluster cluster =
|
final MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
||||||
cluster.waitActive();
|
try {
|
||||||
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
cluster.waitActive();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||||
final Path filePath = new Path("/tmp.txt");
|
FileSystem fs = cluster.getFileSystem();
|
||||||
final long fileLen = 1L;
|
final Path filePath = new Path("/tmp.txt");
|
||||||
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
|
final long fileLen = 1L;
|
||||||
DFSTestUtil.waitForReplication((DistributedFileSystem)fs,
|
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
|
||||||
filePath, (short) 3, 60000);
|
DFSTestUtil.waitForReplication((DistributedFileSystem)fs,
|
||||||
ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
filePath, (short) 3, 60000);
|
||||||
assertEquals(datanodes.size(), 4);
|
ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
||||||
FSNamesystem ns = cluster.getNamesystem();
|
assertEquals(datanodes.size(), 4);
|
||||||
// get the block
|
FSNamesystem ns = cluster.getNamesystem();
|
||||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
// get the block
|
||||||
File storageDir = cluster.getInstanceStorageDir(0, 0);
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File storageDir = cluster.getInstanceStorageDir(0, 0);
|
||||||
assertTrue("Data directory does not exist", dataDir.exists());
|
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
BlockInfo blockInfo = blockManager.blocksMap.getBlocks().iterator().next();
|
assertTrue("Data directory does not exist", dataDir.exists());
|
||||||
ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
|
BlockInfo blockInfo =
|
||||||
blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
|
blockManager.blocksMap.getBlocks().iterator().next();
|
||||||
DatanodeDescriptor failedStorageDataNode =
|
ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
|
||||||
blockManager.getStoredBlock(blockInfo).getDatanode(0);
|
blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
|
||||||
DatanodeDescriptor corruptStorageDataNode =
|
DatanodeDescriptor failedStorageDataNode =
|
||||||
blockManager.getStoredBlock(blockInfo).getDatanode(1);
|
blockManager.getStoredBlock(blockInfo).getDatanode(0);
|
||||||
|
DatanodeDescriptor corruptStorageDataNode =
|
||||||
|
blockManager.getStoredBlock(blockInfo).getDatanode(1);
|
||||||
|
|
||||||
ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
|
ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
|
||||||
for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
|
for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
|
||||||
DatanodeStorageInfo storageInfo = failedStorageDataNode
|
DatanodeStorageInfo storageInfo = failedStorageDataNode
|
||||||
.getStorageInfos()[i];
|
.getStorageInfos()[i];
|
||||||
DatanodeStorage dns = new DatanodeStorage(
|
DatanodeStorage dns = new DatanodeStorage(
|
||||||
failedStorageDataNode.getStorageInfos()[i].getStorageID(),
|
failedStorageDataNode.getStorageInfos()[i].getStorageID(),
|
||||||
DatanodeStorage.State.FAILED,
|
DatanodeStorage.State.FAILED,
|
||||||
failedStorageDataNode.getStorageInfos()[i].getStorageType());
|
failedStorageDataNode.getStorageInfos()[i].getStorageType());
|
||||||
while(storageInfo.getBlockIterator().hasNext()) {
|
while(storageInfo.getBlockIterator().hasNext()) {
|
||||||
BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
|
BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
|
||||||
if(blockInfo1.equals(blockInfo)) {
|
if(blockInfo1.equals(blockInfo)) {
|
||||||
StorageReport report = new StorageReport(
|
StorageReport report = new StorageReport(
|
||||||
dns, true, storageInfo.getCapacity(),
|
dns, true, storageInfo.getCapacity(),
|
||||||
storageInfo.getDfsUsed(), storageInfo.getRemaining(),
|
storageInfo.getDfsUsed(), storageInfo.getRemaining(),
|
||||||
storageInfo.getBlockPoolUsed(), 0L);
|
storageInfo.getBlockPoolUsed(), 0L);
|
||||||
reports.add(report);
|
reports.add(report);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
|
||||||
failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
|
.EMPTY_ARRAY), 0L, 0L, 0, 0, null);
|
||||||
.EMPTY_ARRAY), 0L, 0L, 0, 0, null);
|
ns.writeLock();
|
||||||
ns.writeLock();
|
DatanodeStorageInfo corruptStorageInfo= null;
|
||||||
DatanodeStorageInfo corruptStorageInfo= null;
|
for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
|
||||||
for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
|
corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
|
||||||
corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
|
while(corruptStorageInfo.getBlockIterator().hasNext()) {
|
||||||
while(corruptStorageInfo.getBlockIterator().hasNext()) {
|
BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
|
||||||
BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
|
if (blockInfo1.equals(blockInfo)) {
|
||||||
if (blockInfo1.equals(blockInfo)) {
|
break;
|
||||||
break;
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
|
||||||
|
corruptStorageInfo.getStorageID(),
|
||||||
|
CorruptReplicasMap.Reason.ANY.toString());
|
||||||
|
ns.writeUnlock();
|
||||||
|
BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
|
||||||
|
ns.readLock();
|
||||||
|
LocatedBlocks locatedBlocks =
|
||||||
|
blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
|
||||||
|
false, false, null, null);
|
||||||
|
assertTrue("Located Blocks should exclude corrupt" +
|
||||||
|
"replicas and failed storages",
|
||||||
|
locatedBlocks.getLocatedBlocks().size() == 1);
|
||||||
|
ns.readUnlock();
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
|
|
||||||
corruptStorageInfo.getStorageID(),
|
|
||||||
CorruptReplicasMap.Reason.ANY.toString());
|
|
||||||
ns.writeUnlock();
|
|
||||||
BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
|
|
||||||
ns.readLock();
|
|
||||||
LocatedBlocks locatedBlocks =
|
|
||||||
blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
|
|
||||||
false, false, null, null);
|
|
||||||
assertTrue("Located Blocks should exclude corrupt" +
|
|
||||||
"replicas and failed storages",
|
|
||||||
locatedBlocks.getLocatedBlocks().size() == 1);
|
|
||||||
ns.readUnlock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue