HDFS-10281. TestPendingCorruptDnMessages fails intermittently. Contributed by Mingliang Liu.
(cherry picked from commit b9c9d03591
)
This commit is contained in:
parent
9b5c5bd42f
commit
8b1e7842e3
|
@ -18,12 +18,14 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -37,19 +39,22 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.util.ThreadUtil;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestPendingCorruptDnMessages {
|
public class TestPendingCorruptDnMessages {
|
||||||
|
|
||||||
private static final Path filePath = new Path("/foo.txt");
|
private static final Path filePath = new Path("/foo.txt");
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testChangedStorageId() throws IOException, URISyntaxException,
|
public void testChangedStorageId() throws IOException, URISyntaxException,
|
||||||
InterruptedException {
|
InterruptedException, TimeoutException {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.numDataNodes(1)
|
.numDataNodes(1)
|
||||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
.build();
|
.build();
|
||||||
|
@ -83,27 +88,27 @@ public class TestPendingCorruptDnMessages {
|
||||||
|
|
||||||
// Wait until the standby NN queues up the corrupt block in the pending DN
|
// Wait until the standby NN queues up the corrupt block in the pending DN
|
||||||
// message queue.
|
// message queue.
|
||||||
while (cluster.getNamesystem(1).getBlockManager()
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
.getPendingDataNodeMessageCount() < 1) {
|
@Override
|
||||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
|
public Boolean get() {
|
||||||
}
|
return cluster.getNamesystem(1).getBlockManager()
|
||||||
|
.getPendingDataNodeMessageCount() == 1;
|
||||||
assertEquals(1, cluster.getNamesystem(1).getBlockManager()
|
}
|
||||||
.getPendingDataNodeMessageCount());
|
}, 1000, 30000);
|
||||||
String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
|
|
||||||
|
final String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
|
||||||
|
assertNotNull(oldStorageId);
|
||||||
|
|
||||||
// Reformat/restart the DN.
|
// Reformat/restart the DN.
|
||||||
assertTrue(wipeAndRestartDn(cluster, 0));
|
assertTrue(wipeAndRestartDn(cluster, 0));
|
||||||
|
|
||||||
// Give the DN time to start up and register, which will cause the
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
// DatanodeManager to dissociate the old storage ID from the DN xfer addr.
|
@Override
|
||||||
String newStorageId = "";
|
public Boolean get() {
|
||||||
do {
|
final String newStorageId = getRegisteredDatanodeUid(cluster, 1);
|
||||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
|
return newStorageId != null && !newStorageId.equals(oldStorageId);
|
||||||
newStorageId = getRegisteredDatanodeUid(cluster, 1);
|
}
|
||||||
System.out.println("====> oldStorageId: " + oldStorageId +
|
}, 1000, 30000);
|
||||||
" newStorageId: " + newStorageId);
|
|
||||||
} while (newStorageId.equals(oldStorageId));
|
|
||||||
|
|
||||||
assertEquals(0, cluster.getNamesystem(1).getBlockManager()
|
assertEquals(0, cluster.getNamesystem(1).getBlockManager()
|
||||||
.getPendingDataNodeMessageCount());
|
.getPendingDataNodeMessageCount());
|
||||||
|
@ -121,8 +126,8 @@ public class TestPendingCorruptDnMessages {
|
||||||
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
|
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
|
||||||
.getBlockManager().getDatanodeManager()
|
.getBlockManager().getDatanodeManager()
|
||||||
.getDatanodeListForReport(DatanodeReportType.ALL);
|
.getDatanodeListForReport(DatanodeReportType.ALL);
|
||||||
assertEquals(1, registeredDatanodes.size());
|
return registeredDatanodes.isEmpty() ? null :
|
||||||
return registeredDatanodes.get(0).getDatanodeUuid();
|
registeredDatanodes.get(0).getDatanodeUuid();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)
|
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)
|
||||||
|
|
Loading…
Reference in New Issue