YARN-2710. RM HA tests failed intermittently on trunk. Contributed by Ahmed
Hussein.
This commit is contained in:
parent
f197f05cff
commit
1975479285
|
@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
|
@ -168,7 +170,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
|||
keepRunning = true;
|
||||
conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
|
||||
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 10);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf);
|
||||
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf);
|
||||
|
@ -222,22 +224,24 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
|||
verifyClientConnection();
|
||||
}
|
||||
|
||||
protected void verifyClientConnection() {
|
||||
int numRetries = 3;
|
||||
while(numRetries-- > 0) {
|
||||
Configuration conf = new YarnConfiguration(this.conf);
|
||||
YarnClient client = createAndStartYarnClient(conf);
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
client.getApplications();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
protected void verifyClientConnection() throws InterruptedException {
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
Configuration yarnConf = new YarnConfiguration(conf);
|
||||
YarnClient client = createAndStartYarnClient(yarnConf);
|
||||
try {
|
||||
client.getApplications();
|
||||
return true;
|
||||
} catch (YarnException | IOException ex) {
|
||||
LOG.error(ex.getMessage());
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
return false;
|
||||
}, 50, 500);
|
||||
} catch (TimeoutException e) {
|
||||
fail("Client couldn't connect to the Active RM");
|
||||
}
|
||||
fail("Client couldn't connect to the Active RM");
|
||||
}
|
||||
|
||||
protected Thread createAndStartFailoverThread() {
|
||||
|
@ -327,11 +331,11 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
|||
}
|
||||
|
||||
private boolean waittingForFailOver() {
|
||||
int maximumWaittingTime = 50;
|
||||
int maximumWaittingTime = 200;
|
||||
int count = 0;
|
||||
while (!failoverTriggered.get() && count <= maximumWaittingTime) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
Thread.sleep(25);
|
||||
} catch (InterruptedException e) {
|
||||
// DO NOTHING
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.client;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
|
@ -44,7 +45,9 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
||||
private YarnClient client = null;
|
||||
|
@ -63,7 +66,10 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(180, TimeUnit.SECONDS);
|
||||
|
||||
@Test
|
||||
public void testGetApplicationReportOnHA() throws Exception {
|
||||
ApplicationReport report =
|
||||
client.getApplicationReport(cluster.createFakeAppId());
|
||||
|
@ -71,7 +77,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
Assert.assertEquals(cluster.createFakeAppReport(), report);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetNewApplicationOnHA() throws Exception {
|
||||
ApplicationId appId =
|
||||
client.createApplication().getApplicationSubmissionContext()
|
||||
|
@ -80,7 +86,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
Assert.assertEquals(cluster.createFakeAppId(), appId);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetClusterMetricsOnHA() throws Exception {
|
||||
YarnClusterMetrics clusterMetrics =
|
||||
client.getYarnClusterMetrics();
|
||||
|
@ -89,7 +95,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
clusterMetrics);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetApplicationsOnHA() throws Exception {
|
||||
List<ApplicationReport> reports =
|
||||
client.getApplications();
|
||||
|
@ -99,7 +105,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetClusterNodesOnHA() throws Exception {
|
||||
List<NodeReport> reports = client.getNodeReports(NodeState.RUNNING);
|
||||
Assert.assertTrue(reports != null);
|
||||
|
@ -108,7 +114,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetQueueInfoOnHA() throws Exception {
|
||||
QueueInfo queueInfo = client.getQueueInfo("root");
|
||||
Assert.assertTrue(queueInfo != null);
|
||||
|
@ -116,7 +122,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
queueInfo);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetQueueUserAclsOnHA() throws Exception {
|
||||
List<QueueUserACLInfo> queueUserAclsList = client.getQueueAclsInfo();
|
||||
Assert.assertTrue(queueUserAclsList != null);
|
||||
|
@ -125,7 +131,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
queueUserAclsList);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetApplicationAttemptReportOnHA() throws Exception {
|
||||
ApplicationAttemptReport report =
|
||||
client.getApplicationAttemptReport(cluster
|
||||
|
@ -134,7 +140,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetApplicationAttemptsOnHA() throws Exception {
|
||||
List<ApplicationAttemptReport> reports =
|
||||
client.getApplicationAttempts(cluster.createFakeAppId());
|
||||
|
@ -144,7 +150,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetContainerReportOnHA() throws Exception {
|
||||
ContainerReport report =
|
||||
client.getContainerReport(cluster.createFakeContainerId());
|
||||
|
@ -152,7 +158,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
Assert.assertEquals(cluster.createFakeContainerReport(), report);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetContainersOnHA() throws Exception {
|
||||
List<ContainerReport> reports =
|
||||
client.getContainers(cluster.createFakeApplicationAttemptId());
|
||||
|
@ -162,7 +168,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testSubmitApplicationOnHA() throws Exception {
|
||||
ApplicationSubmissionContext appContext =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
|
@ -179,23 +185,23 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
.containsKey(appId));
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testMoveApplicationAcrossQueuesOnHA() throws Exception{
|
||||
client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root");
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testForceKillApplicationOnHA() throws Exception {
|
||||
client.killApplication(cluster.createFakeAppId());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testGetDelegationTokenOnHA() throws Exception {
|
||||
Token token = client.getRMDelegationToken(new Text(" "));
|
||||
Assert.assertEquals(token, cluster.createFakeToken());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testRenewDelegationTokenOnHA() throws Exception {
|
||||
RenewDelegationTokenRequest request =
|
||||
RenewDelegationTokenRequest.newInstance(cluster.createFakeToken());
|
||||
|
@ -205,7 +211,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
|||
Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testCancelDelegationTokenOnHA() throws Exception {
|
||||
CancelDelegationTokenRequest request =
|
||||
CancelDelegationTokenRequest.newInstance(cluster.createFakeToken());
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
/**
|
||||
* Tests Application Master Protocol with timeline service v2 enabled.
|
||||
|
@ -41,6 +43,8 @@ import org.junit.Test;
|
|||
public class TestApplicationMasterServiceProtocolForTimelineV2
|
||||
extends ApplicationMasterServiceProtoTestBase {
|
||||
|
||||
public Timeout timeout = new Timeout(180, TimeUnit.SECONDS);
|
||||
|
||||
@Before
|
||||
public void initialize() throws Exception {
|
||||
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf);
|
||||
|
@ -53,7 +57,7 @@ public class TestApplicationMasterServiceProtocolForTimelineV2
|
|||
super.startupHAAndSetupClient();
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testAllocateForTimelineV2OnHA()
|
||||
throws YarnException, IOException {
|
||||
AllocateRequest request = AllocateRequest.newInstance(0, 50f,
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
|
@ -35,18 +36,23 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
|
||||
public class TestApplicationMasterServiceProtocolOnHA
|
||||
extends ApplicationMasterServiceProtoTestBase {
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(180, TimeUnit.SECONDS);
|
||||
|
||||
@Before
|
||||
public void initialize() throws Exception {
|
||||
startHACluster(0, false, false, true);
|
||||
super.startupHAAndSetupClient();
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testRegisterApplicationMasterOnHA() throws YarnException,
|
||||
IOException {
|
||||
RegisterApplicationMasterRequest request =
|
||||
|
@ -57,7 +63,7 @@ public class TestApplicationMasterServiceProtocolOnHA
|
|||
this.cluster.createFakeRegisterApplicationMasterResponse());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testFinishApplicationMasterOnHA() throws YarnException,
|
||||
IOException {
|
||||
FinishApplicationMasterRequest request =
|
||||
|
@ -69,7 +75,7 @@ public class TestApplicationMasterServiceProtocolOnHA
|
|||
this.cluster.createFakeFinishApplicationMasterResponse());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testAllocateOnHA() throws YarnException, IOException {
|
||||
AllocateRequest request = AllocateRequest.newInstance(0, 50f,
|
||||
new ArrayList<ResourceRequest>(),
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
@ -33,12 +35,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
||||
public class TestResourceTrackerOnHA extends ProtocolHATestBase {
|
||||
|
||||
private ResourceTracker resourceTracker = null;
|
||||
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(180, TimeUnit.SECONDS);
|
||||
|
||||
@Before
|
||||
public void initiate() throws Exception {
|
||||
startHACluster(0, false, true, false);
|
||||
|
@ -52,7 +59,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testResourceTrackerOnHA() throws Exception {
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 0);
|
||||
Resource resource = Resource.newInstance(2048, 4);
|
||||
|
@ -62,7 +69,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
|||
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
|
||||
YarnVersionInfo.getVersion(), null, null);
|
||||
resourceTracker.registerNodeManager(request);
|
||||
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
|
||||
Assert.assertTrue(waitForNodeManagerToConnect(200, nodeId));
|
||||
|
||||
// restart the failover thread, and make sure nodeHeartbeat works
|
||||
failoverThread = createAndStartFailoverThread();
|
||||
|
@ -78,14 +85,12 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
|||
return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class);
|
||||
}
|
||||
|
||||
private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId)
|
||||
private boolean waitForNodeManagerToConnect(final int maxTime,
|
||||
final NodeId nodeId)
|
||||
throws Exception {
|
||||
for (int i = 0; i < timeout / 100; i++) {
|
||||
if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) {
|
||||
return true;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
return false;
|
||||
GenericTestUtils.waitFor(
|
||||
() -> getActiveRM().getRMContext().getRMNodes().containsKey(nodeId), 20,
|
||||
maxTime);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue