HDFS-16171. De-flake testDecommissionStatus (#3280)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
919a99bbb4
commit
6342d5e523
|
@ -418,6 +418,28 @@ public abstract class GenericTestUtils {
|
||||||
public static void waitFor(final Supplier<Boolean> check,
|
public static void waitFor(final Supplier<Boolean> check,
|
||||||
final long checkEveryMillis, final long waitForMillis)
|
final long checkEveryMillis, final long waitForMillis)
|
||||||
throws TimeoutException, InterruptedException {
|
throws TimeoutException, InterruptedException {
|
||||||
|
waitFor(check, checkEveryMillis, waitForMillis, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the specified test to return true. The test will be performed
|
||||||
|
* initially and then every {@code checkEveryMillis} until at least
|
||||||
|
* {@code waitForMillis} time has expired. If {@code check} is null or
|
||||||
|
* {@code waitForMillis} is less than {@code checkEveryMillis} this method
|
||||||
|
* will throw an {@link IllegalArgumentException}.
|
||||||
|
*
|
||||||
|
* @param check the test to perform.
|
||||||
|
* @param checkEveryMillis how often to perform the test.
|
||||||
|
* @param waitForMillis the amount of time after which no more tests will be
|
||||||
|
* performed.
|
||||||
|
* @param errorMsg error message to provide in TimeoutException.
|
||||||
|
* @throws TimeoutException if the test does not return true in the allotted
|
||||||
|
* time.
|
||||||
|
* @throws InterruptedException if the method is interrupted while waiting.
|
||||||
|
*/
|
||||||
|
public static void waitFor(final Supplier<Boolean> check,
|
||||||
|
final long checkEveryMillis, final long waitForMillis,
|
||||||
|
final String errorMsg) throws TimeoutException, InterruptedException {
|
||||||
Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
|
Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
|
||||||
if (waitForMillis < checkEveryMillis) {
|
if (waitForMillis < checkEveryMillis) {
|
||||||
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
|
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
|
||||||
|
@ -432,9 +454,12 @@ public abstract class GenericTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!result) {
|
if (!result) {
|
||||||
throw new TimeoutException("Timed out waiting for condition. " +
|
final String exceptionErrorMsg = "Timed out waiting for condition. "
|
||||||
"Thread diagnostics:\n" +
|
+ (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg)
|
||||||
TimedOutTestsListener.buildThreadDiagnosticString());
|
? "Error Message: " + errorMsg : "")
|
||||||
|
+ "\nThread diagnostics:\n" +
|
||||||
|
TimedOutTestsListener.buildThreadDiagnosticString();
|
||||||
|
throw new TimeoutException(exceptionErrorMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,10 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
@ -76,7 +79,8 @@ public class TestDecommissioningStatus {
|
||||||
private FileSystem fileSys;
|
private FileSystem fileSys;
|
||||||
private HostsFileWriter hostsFileWriter;
|
private HostsFileWriter hostsFileWriter;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private Logger LOG;
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestDecommissioningStatus.class);
|
||||||
|
|
||||||
final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
|
final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
|
||||||
|
|
||||||
|
@ -110,7 +114,6 @@ public class TestDecommissioningStatus {
|
||||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
|
||||||
GenericTestUtils.setLogLevel(
|
GenericTestUtils.setLogLevel(
|
||||||
LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG);
|
LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG);
|
||||||
LOG = LoggerFactory.getLogger(TestDecommissioningStatus.class);
|
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,17 +168,30 @@ public class TestDecommissioningStatus {
|
||||||
|
|
||||||
protected void checkDecommissionStatus(DatanodeDescriptor decommNode,
|
protected void checkDecommissionStatus(DatanodeDescriptor decommNode,
|
||||||
int expectedUnderRep, int expectedDecommissionOnly,
|
int expectedUnderRep, int expectedDecommissionOnly,
|
||||||
int expectedUnderRepInOpenFiles) {
|
int expectedUnderRepInOpenFiles) throws TimeoutException,
|
||||||
assertEquals("Unexpected num under-replicated blocks",
|
InterruptedException {
|
||||||
expectedUnderRep,
|
String errorMsg;
|
||||||
decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks());
|
errorMsg = "Under replicated blocks. Expected: "
|
||||||
assertEquals("Unexpected number of decom-only replicas",
|
+ expectedUnderRep + " , Actual: "
|
||||||
expectedDecommissionOnly,
|
+ decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks();
|
||||||
decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
|
GenericTestUtils.waitFor(
|
||||||
assertEquals(
|
() -> expectedUnderRep == decommNode.getLeavingServiceStatus()
|
||||||
"Unexpected number of replicas in under-replicated open files",
|
.getUnderReplicatedBlocks(),
|
||||||
expectedUnderRepInOpenFiles,
|
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
|
||||||
decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
|
errorMsg = "OutOfService only replicas. Expected: "
|
||||||
|
+ expectedDecommissionOnly + " , Actual: "
|
||||||
|
+ decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas();
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus()
|
||||||
|
.getOutOfServiceOnlyReplicas(),
|
||||||
|
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
|
||||||
|
errorMsg = "UnderReplicated in open files. Expected: "
|
||||||
|
+ expectedUnderRepInOpenFiles + " , Actual: "
|
||||||
|
+ decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles();
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> expectedUnderRepInOpenFiles == decommNode
|
||||||
|
.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(),
|
||||||
|
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void checkDFSAdminDecommissionStatus(
|
protected void checkDFSAdminDecommissionStatus(
|
||||||
|
@ -270,6 +286,7 @@ public class TestDecommissioningStatus {
|
||||||
|
|
||||||
FSNamesystem fsn = cluster.getNamesystem();
|
FSNamesystem fsn = cluster.getNamesystem();
|
||||||
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
||||||
|
verifyInitialState(fsn, dm);
|
||||||
for (int iteration = 0; iteration < numDatanodes; iteration++) {
|
for (int iteration = 0; iteration < numDatanodes; iteration++) {
|
||||||
String downnode = decommissionNode(client, iteration);
|
String downnode = decommissionNode(client, iteration);
|
||||||
dm.refreshNodes(conf);
|
dm.refreshNodes(conf);
|
||||||
|
@ -278,14 +295,13 @@ public class TestDecommissioningStatus {
|
||||||
// Block until the admin's monitor updates the number of tracked nodes.
|
// Block until the admin's monitor updates the number of tracked nodes.
|
||||||
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
|
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
|
||||||
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
|
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
|
||||||
|
assertEquals(decommissioningNodes.size(), iteration + 1);
|
||||||
if (iteration == 0) {
|
if (iteration == 0) {
|
||||||
assertEquals(decommissioningNodes.size(), 1);
|
|
||||||
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
|
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
|
||||||
checkDecommissionStatus(decommNode, 3, 0, 1);
|
checkDecommissionStatus(decommNode, 3, 0, 1);
|
||||||
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
|
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
|
||||||
fileSys, admin);
|
fileSys, admin);
|
||||||
} else {
|
} else {
|
||||||
assertEquals(decommissioningNodes.size(), 2);
|
|
||||||
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
|
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
|
||||||
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
|
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
|
||||||
// This one is still 3,3,1 since it passed over the UC block
|
// This one is still 3,3,1 since it passed over the UC block
|
||||||
|
@ -307,6 +323,69 @@ public class TestDecommissioningStatus {
|
||||||
AdminStatesBaseTest.cleanupFile(fileSys, file2);
|
AdminStatesBaseTest.cleanupFile(fileSys, file2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Why do we verify initial state of DataNodes here?
|
||||||
|
// Before we start actual decommission testing, we should ensure that
|
||||||
|
// total 8 blocks (original 4 blocks of 2 files and 4 replicas) are
|
||||||
|
// present over two Datanodes available. If we don't wait until all 8 blocks
|
||||||
|
// are reported live by BlockManager, we might get to a situation
|
||||||
|
// where one of the replicas might not yet been present on any of Datanodes
|
||||||
|
// and we start decommissioning process, and then it would result in
|
||||||
|
// flaky test because total (no of under replicated blocks, no of outOfService
|
||||||
|
// only replicas, no of under replicated in open files) counts would be
|
||||||
|
// incorrect.
|
||||||
|
protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm)
|
||||||
|
throws InterruptedException {
|
||||||
|
dm.getDatanodes().forEach(datanodeDescriptor -> {
|
||||||
|
try {
|
||||||
|
checkDecommissionStatus(datanodeDescriptor, 0, 0, 0);
|
||||||
|
} catch (TimeoutException | InterruptedException e) {
|
||||||
|
throw new AssertionError("Datanode not in good state.", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int c = 0;
|
||||||
|
int totalBlocks;
|
||||||
|
long totalReplicatedBlocks;
|
||||||
|
while (true) {
|
||||||
|
totalBlocks = fsn.getBlockManager().getTotalBlocks();
|
||||||
|
totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks();
|
||||||
|
if (totalBlocks == 4 && totalReplicatedBlocks == 4) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (c == 4) {
|
||||||
|
throw new AssertionError("Unexpected Total blocks " + totalBlocks
|
||||||
|
+ " and replicated blocks " + totalReplicatedBlocks);
|
||||||
|
}
|
||||||
|
Thread.sleep(3000);
|
||||||
|
}
|
||||||
|
c++;
|
||||||
|
}
|
||||||
|
c = 0;
|
||||||
|
AtomicInteger total = new AtomicInteger(0);
|
||||||
|
AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0);
|
||||||
|
while (true) {
|
||||||
|
total.set(0);
|
||||||
|
sufficientBlocksSuccess.set(0);
|
||||||
|
dm.getDatanodes().forEach(
|
||||||
|
datanodeDescriptor -> {
|
||||||
|
total.addAndGet(datanodeDescriptor.numBlocks());
|
||||||
|
if (datanodeDescriptor.numBlocks() == 4) {
|
||||||
|
sufficientBlocksSuccess.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (c == 4) {
|
||||||
|
throw new AssertionError("Unexpected Total blocks " + total.get()
|
||||||
|
+ " from Datanode Storage. 4 blocks per Datanode Storage"
|
||||||
|
+ " expected from each DataNode");
|
||||||
|
}
|
||||||
|
Thread.sleep(3000);
|
||||||
|
}
|
||||||
|
c++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
|
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
|
||||||
* as dead before decommission has completed. That will allow DN to resume
|
* as dead before decommission has completed. That will allow DN to resume
|
||||||
|
|
|
@ -106,6 +106,7 @@ public class TestDecommissioningStatusWithBackoffMonitor
|
||||||
|
|
||||||
FSNamesystem fsn = cluster.getNamesystem();
|
FSNamesystem fsn = cluster.getNamesystem();
|
||||||
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
||||||
|
verifyInitialState(fsn, dm);
|
||||||
for (int iteration = 0; iteration < numDatanodes; iteration++) {
|
for (int iteration = 0; iteration < numDatanodes; iteration++) {
|
||||||
String downnode = decommissionNode(client, iteration);
|
String downnode = decommissionNode(client, iteration);
|
||||||
dm.refreshNodes(conf);
|
dm.refreshNodes(conf);
|
||||||
|
|
Loading…
Reference in New Issue