Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
253883fd8d
commit
03c908fb04
|
@ -398,6 +398,28 @@ public abstract class GenericTestUtils {
|
|||
public static void waitFor(final Supplier<Boolean> check,
|
||||
final long checkEveryMillis, final long waitForMillis)
|
||||
throws TimeoutException, InterruptedException {
|
||||
waitFor(check, checkEveryMillis, waitForMillis, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the specified test to return true. The test will be performed
|
||||
* initially and then every {@code checkEveryMillis} until at least
|
||||
* {@code waitForMillis} time has expired. If {@code check} is null or
|
||||
* {@code waitForMillis} is less than {@code checkEveryMillis} this method
|
||||
* will throw an {@link IllegalArgumentException}.
|
||||
*
|
||||
* @param check the test to perform.
|
||||
* @param checkEveryMillis how often to perform the test.
|
||||
* @param waitForMillis the amount of time after which no more tests will be
|
||||
* performed.
|
||||
* @param errorMsg error message to provide in TimeoutException.
|
||||
* @throws TimeoutException if the test does not return true in the allotted
|
||||
* time.
|
||||
* @throws InterruptedException if the method is interrupted while waiting.
|
||||
*/
|
||||
public static void waitFor(final Supplier<Boolean> check,
|
||||
final long checkEveryMillis, final long waitForMillis,
|
||||
final String errorMsg) throws TimeoutException, InterruptedException {
|
||||
Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
|
||||
if (waitForMillis < checkEveryMillis) {
|
||||
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
|
||||
|
@ -412,9 +434,12 @@ public abstract class GenericTestUtils {
|
|||
}
|
||||
|
||||
if (!result) {
|
||||
throw new TimeoutException("Timed out waiting for condition. " +
|
||||
"Thread diagnostics:\n" +
|
||||
TimedOutTestsListener.buildThreadDiagnosticString());
|
||||
final String exceptionErrorMsg = "Timed out waiting for condition. "
|
||||
+ (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg)
|
||||
? "Error Message: " + errorMsg : "")
|
||||
+ "\nThread diagnostics:\n" +
|
||||
TimedOutTestsListener.buildThreadDiagnosticString();
|
||||
throw new TimeoutException(exceptionErrorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,10 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
|
@ -57,11 +60,12 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* This class tests the decommissioning of nodes.
|
||||
|
@ -75,7 +79,8 @@ public class TestDecommissioningStatus {
|
|||
private static FileSystem fileSys;
|
||||
private static HostsFileWriter hostsFileWriter;
|
||||
private static Configuration conf;
|
||||
private Logger LOG;
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestDecommissioningStatus.class);
|
||||
|
||||
final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
|
||||
|
||||
|
@ -102,8 +107,8 @@ public class TestDecommissioningStatus {
|
|||
fileSys = cluster.getFileSystem();
|
||||
cluster.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setHeartbeatExpireInterval(3000);
|
||||
Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
|
||||
LOG = Logger.getLogger(TestDecommissioningStatus.class);
|
||||
GenericTestUtils.setLogLevel(
|
||||
LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -142,17 +147,30 @@ public class TestDecommissioningStatus {
|
|||
|
||||
private void checkDecommissionStatus(DatanodeDescriptor decommNode,
|
||||
int expectedUnderRep, int expectedDecommissionOnly,
|
||||
int expectedUnderRepInOpenFiles) {
|
||||
assertEquals("Unexpected num under-replicated blocks",
|
||||
expectedUnderRep,
|
||||
decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks());
|
||||
assertEquals("Unexpected number of decom-only replicas",
|
||||
expectedDecommissionOnly,
|
||||
decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
|
||||
assertEquals(
|
||||
"Unexpected number of replicas in under-replicated open files",
|
||||
expectedUnderRepInOpenFiles,
|
||||
decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
|
||||
int expectedUnderRepInOpenFiles) throws TimeoutException,
|
||||
InterruptedException {
|
||||
String errorMsg;
|
||||
errorMsg = "Under replicated blocks. Expected: "
|
||||
+ expectedUnderRep + " , Actual: "
|
||||
+ decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks();
|
||||
GenericTestUtils.waitFor(
|
||||
() -> expectedUnderRep == decommNode.getLeavingServiceStatus()
|
||||
.getUnderReplicatedBlocks(),
|
||||
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
|
||||
errorMsg = "OutOfService only replicas. Expected: "
|
||||
+ expectedDecommissionOnly + " , Actual: "
|
||||
+ decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas();
|
||||
GenericTestUtils.waitFor(
|
||||
() -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus()
|
||||
.getOutOfServiceOnlyReplicas(),
|
||||
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
|
||||
errorMsg = "UnderReplicated in open files. Expected: "
|
||||
+ expectedUnderRepInOpenFiles + " , Actual: "
|
||||
+ decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles();
|
||||
GenericTestUtils.waitFor(
|
||||
() -> expectedUnderRepInOpenFiles == decommNode
|
||||
.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(),
|
||||
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
|
||||
}
|
||||
|
||||
private void checkDFSAdminDecommissionStatus(
|
||||
|
@ -247,6 +265,7 @@ public class TestDecommissioningStatus {
|
|||
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
||||
verifyInitialState(fsn, dm);
|
||||
for (int iteration = 0; iteration < numDatanodes; iteration++) {
|
||||
String downnode = decommissionNode(client, iteration);
|
||||
dm.refreshNodes(conf);
|
||||
|
@ -255,14 +274,13 @@ public class TestDecommissioningStatus {
|
|||
// Block until the admin's monitor updates the number of tracked nodes.
|
||||
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
|
||||
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
|
||||
assertEquals(decommissioningNodes.size(), iteration + 1);
|
||||
if (iteration == 0) {
|
||||
assertEquals(decommissioningNodes.size(), 1);
|
||||
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
|
||||
checkDecommissionStatus(decommNode, 3, 0, 1);
|
||||
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
|
||||
fileSys, admin);
|
||||
} else {
|
||||
assertEquals(decommissioningNodes.size(), 2);
|
||||
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
|
||||
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
|
||||
// This one is still 3,3,1 since it passed over the UC block
|
||||
|
@ -284,6 +302,69 @@ public class TestDecommissioningStatus {
|
|||
AdminStatesBaseTest.cleanupFile(fileSys, file2);
|
||||
}
|
||||
|
||||
// Why do we verify initial state of DataNodes here?
|
||||
// Before we start actual decommission testing, we should ensure that
|
||||
// total 8 blocks (original 4 blocks of 2 files and 4 replicas) are
|
||||
// present over two Datanodes available. If we don't wait until all 8 blocks
|
||||
// are reported live by BlockManager, we might get to a situation
|
||||
// where one of the replicas might not yet been present on any of Datanodes
|
||||
// and we start decommissioning process, and then it would result in
|
||||
// flaky test because total (no of under replicated blocks, no of outOfService
|
||||
// only replicas, no of under replicated in open files) counts would be
|
||||
// incorrect.
|
||||
protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm)
|
||||
throws InterruptedException {
|
||||
dm.getDatanodes().forEach(datanodeDescriptor -> {
|
||||
try {
|
||||
checkDecommissionStatus(datanodeDescriptor, 0, 0, 0);
|
||||
} catch (TimeoutException | InterruptedException e) {
|
||||
throw new AssertionError("Datanode not in good state.", e);
|
||||
}
|
||||
});
|
||||
int c = 0;
|
||||
int totalBlocks;
|
||||
long totalReplicatedBlocks;
|
||||
while (true) {
|
||||
totalBlocks = fsn.getBlockManager().getTotalBlocks();
|
||||
totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks();
|
||||
if (totalBlocks == 4 && totalReplicatedBlocks == 4) {
|
||||
break;
|
||||
} else {
|
||||
if (c == 4) {
|
||||
throw new AssertionError("Unexpected Total blocks " + totalBlocks
|
||||
+ " and replicated blocks " + totalReplicatedBlocks);
|
||||
}
|
||||
Thread.sleep(3000);
|
||||
}
|
||||
c++;
|
||||
}
|
||||
c = 0;
|
||||
AtomicInteger total = new AtomicInteger(0);
|
||||
AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0);
|
||||
while (true) {
|
||||
total.set(0);
|
||||
sufficientBlocksSuccess.set(0);
|
||||
dm.getDatanodes().forEach(
|
||||
datanodeDescriptor -> {
|
||||
total.addAndGet(datanodeDescriptor.numBlocks());
|
||||
if (datanodeDescriptor.numBlocks() == 4) {
|
||||
sufficientBlocksSuccess.incrementAndGet();
|
||||
}
|
||||
});
|
||||
if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) {
|
||||
break;
|
||||
} else {
|
||||
if (c == 4) {
|
||||
throw new AssertionError("Unexpected Total blocks " + total.get()
|
||||
+ " from Datanode Storage. 4 blocks per Datanode Storage"
|
||||
+ " expected from each DataNode");
|
||||
}
|
||||
Thread.sleep(3000);
|
||||
}
|
||||
c++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
|
||||
* as dead before decommission has completed. That will allow DN to resume
|
||||
|
@ -367,8 +448,8 @@ public class TestDecommissioningStatus {
|
|||
*/
|
||||
@Test(timeout=120000)
|
||||
public void testDecommissionDeadDN() throws Exception {
|
||||
Logger log = Logger.getLogger(DatanodeAdminManager.class);
|
||||
log.setLevel(Level.DEBUG);
|
||||
Logger log = LoggerFactory.getLogger(DatanodeAdminManager.class);
|
||||
GenericTestUtils.setLogLevel(log, Level.DEBUG);
|
||||
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
|
||||
String dnName = dnID.getXferAddr();
|
||||
DataNodeProperties stoppedDN = cluster.stopDataNode(0);
|
||||
|
|
Loading…
Reference in New Issue