YARN-11264. Upgrade JUnit from 4 to 5 in hadoop-yarn-server-tests (#4776)
Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
e04c9e810b
commit
cbe02c2e77
|
@ -126,6 +126,26 @@
|
||||||
<artifactId>bcprov-jdk15on</artifactId>
|
<artifactId>bcprov-jdk15on</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.platform</groupId>
|
||||||
|
<artifactId>junit-platform-launcher</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-auth</artifactId>
|
<artifactId>hadoop-auth</artifactId>
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -31,6 +29,11 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
|
import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
|
||||||
|
@ -75,18 +78,15 @@ import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
import org.junit.runners.Parameterized.Parameters;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
|
|
||||||
static Logger LOG = LoggerFactory.getLogger(TestContainerManagerSecurity.class);
|
static Logger LOG = LoggerFactory.getLogger(TestContainerManagerSecurity.class);
|
||||||
|
@ -94,29 +94,24 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
.getRecordFactory(null);
|
.getRecordFactory(null);
|
||||||
private static MiniYARNCluster yarnCluster;
|
private static MiniYARNCluster yarnCluster;
|
||||||
private static final File testRootDir = new File("target",
|
private static final File testRootDir = new File("target",
|
||||||
TestContainerManagerSecurity.class.getName() + "-root");
|
TestContainerManagerSecurity.class.getName() + "-root");
|
||||||
private static File httpSpnegoKeytabFile = new File(testRootDir,
|
private static File httpSpnegoKeytabFile = new File(testRootDir,
|
||||||
"httpSpnegoKeytabFile.keytab");
|
"httpSpnegoKeytabFile.keytab");
|
||||||
private static String httpSpnegoPrincipal = "HTTP/localhost@EXAMPLE.COM";
|
private static String httpSpnegoPrincipal = "HTTP/localhost@EXAMPLE.COM";
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setUp() throws Exception {
|
public void setup() throws Exception {
|
||||||
testRootDir.mkdirs();
|
testRootDir.mkdirs();
|
||||||
httpSpnegoKeytabFile.deleteOnExit();
|
httpSpnegoKeytabFile.deleteOnExit();
|
||||||
|
startMiniKdc();
|
||||||
getKdc().createPrincipal(httpSpnegoKeytabFile, httpSpnegoPrincipal);
|
getKdc().createPrincipal(httpSpnegoKeytabFile, httpSpnegoPrincipal);
|
||||||
UserGroupInformation.setConfiguration(conf);
|
|
||||||
|
|
||||||
yarnCluster =
|
|
||||||
new MiniYARNCluster(TestContainerManagerSecurity.class.getName(), 1, 1,
|
|
||||||
1);
|
|
||||||
yarnCluster.init(conf);
|
|
||||||
yarnCluster.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
|
stopMiniKdc();
|
||||||
if (yarnCluster != null) {
|
if (yarnCluster != null) {
|
||||||
yarnCluster.stop();
|
yarnCluster.stop();
|
||||||
yarnCluster = null;
|
yarnCluster = null;
|
||||||
|
@ -130,7 +125,6 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
* and to give details in say an IDE. The second is the configuraiton
|
* and to give details in say an IDE. The second is the configuraiton
|
||||||
* object to use.
|
* object to use.
|
||||||
*/
|
*/
|
||||||
@Parameters(name = "{0}")
|
|
||||||
public static Collection<Object[]> configs() {
|
public static Collection<Object[]> configs() {
|
||||||
Configuration configurationWithoutSecurity = new Configuration();
|
Configuration configurationWithoutSecurity = new Configuration();
|
||||||
configurationWithoutSecurity.set(
|
configurationWithoutSecurity.set(
|
||||||
|
@ -138,40 +132,49 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
|
|
||||||
Configuration configurationWithSecurity = new Configuration();
|
Configuration configurationWithSecurity = new Configuration();
|
||||||
configurationWithSecurity.set(
|
configurationWithSecurity.set(
|
||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
configurationWithSecurity.set(
|
configurationWithSecurity.set(
|
||||||
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
|
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
|
||||||
configurationWithSecurity.set(
|
configurationWithSecurity.set(
|
||||||
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
|
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
|
||||||
httpSpnegoKeytabFile.getAbsolutePath());
|
httpSpnegoKeytabFile.getAbsolutePath());
|
||||||
configurationWithSecurity.set(
|
configurationWithSecurity.set(
|
||||||
YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
|
YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
|
||||||
configurationWithSecurity.set(
|
configurationWithSecurity.set(
|
||||||
YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
|
YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
|
||||||
httpSpnegoKeytabFile.getAbsolutePath());
|
httpSpnegoKeytabFile.getAbsolutePath());
|
||||||
|
|
||||||
return Arrays.asList(new Object[][] {
|
return Arrays.asList(new Object[][]{
|
||||||
{"Simple", configurationWithoutSecurity},
|
{"Simple", configurationWithoutSecurity},
|
||||||
{"Secure", configurationWithSecurity}});
|
{"Secure", configurationWithSecurity}});
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestContainerManagerSecurity(String name, Configuration conf) {
|
public void initTestContainerManagerSecurity(String name, Configuration conf) {
|
||||||
LOG.info("RUNNING TEST " + name);
|
LOG.info("RUNNING TEST " + name);
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
yarnCluster =
|
||||||
|
new MiniYARNCluster(TestContainerManagerSecurity.class.getName(), 1, 1,
|
||||||
|
1);
|
||||||
|
yarnCluster.init(conf);
|
||||||
|
yarnCluster.start();
|
||||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
|
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@MethodSource("configs")
|
||||||
public void testContainerManager() throws Exception {
|
@ParameterizedTest(name = "{0}")
|
||||||
|
void testContainerManager(String name, Configuration conf) throws Exception {
|
||||||
|
|
||||||
// TestNMTokens.
|
initTestContainerManagerSecurity(name, conf);
|
||||||
testNMTokens(conf);
|
|
||||||
|
|
||||||
// Testing for container token tampering
|
// TestNMTokens.
|
||||||
testContainerToken(conf);
|
testNMTokens(conf);
|
||||||
|
|
||||||
// Testing for container token tampering with epoch
|
// Testing for container token tampering
|
||||||
testContainerTokenWithEpoch(conf);
|
testContainerToken(conf);
|
||||||
|
|
||||||
|
// Testing for container token tampering with epoch
|
||||||
|
testContainerTokenWithEpoch(conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +185,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
private void testNMTokens(Configuration testConf) throws Exception {
|
private void testNMTokens(Configuration testConf) throws Exception {
|
||||||
NMTokenSecretManagerInRM nmTokenSecretManagerRM =
|
NMTokenSecretManagerInRM nmTokenSecretManagerRM =
|
||||||
yarnCluster.getResourceManager().getRMContext()
|
yarnCluster.getResourceManager().getRMContext()
|
||||||
.getNMTokenSecretManager();
|
.getNMTokenSecretManager();
|
||||||
NMTokenSecretManagerInNM nmTokenSecretManagerNM =
|
NMTokenSecretManagerInNM nmTokenSecretManagerNM =
|
||||||
yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager();
|
yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager();
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
@ -194,7 +197,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM);
|
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM);
|
||||||
|
|
||||||
// Both id should be equal.
|
// Both id should be equal.
|
||||||
Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
|
assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
|
||||||
nmTokenSecretManagerRM.getCurrentKey().getKeyId());
|
nmTokenSecretManagerRM.getCurrentKey().getKeyId());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -241,8 +244,8 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
0, validNode, user, r, Priority.newInstance(10), 1234);
|
0, validNode, user, r, Priority.newInstance(10), 1234);
|
||||||
ContainerTokenIdentifier identifier =
|
ContainerTokenIdentifier identifier =
|
||||||
BuilderUtils.newContainerTokenIdentifier(validContainerToken);
|
BuilderUtils.newContainerTokenIdentifier(validContainerToken);
|
||||||
Assert.assertEquals(Priority.newInstance(10), identifier.getPriority());
|
assertEquals(Priority.newInstance(10), identifier.getPriority());
|
||||||
Assert.assertEquals(1234, identifier.getCreationTime());
|
assertEquals(1234, identifier.getCreationTime());
|
||||||
|
|
||||||
StringBuilder sb;
|
StringBuilder sb;
|
||||||
// testInvalidNMToken ... creating NMToken using different secret manager.
|
// testInvalidNMToken ... creating NMToken using different secret manager.
|
||||||
|
@ -266,16 +269,16 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
}
|
}
|
||||||
String errorMsg = testStartContainer(rpc, validAppAttemptId, validNode,
|
String errorMsg = testStartContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerToken, null, true);
|
validContainerToken, null, true);
|
||||||
Assert.assertTrue("In calling " + validNode + " exception was '"
|
assertTrue(errorMsg.contains(sb.toString()), "In calling " + validNode + " exception was '"
|
||||||
+ errorMsg + "' but doesn't contain '"
|
+ errorMsg + "' but doesn't contain '"
|
||||||
+ sb.toString() + "'", errorMsg.contains(sb.toString()));
|
+ sb.toString() + "'");
|
||||||
|
|
||||||
org.apache.hadoop.yarn.api.records.Token invalidNMToken =
|
org.apache.hadoop.yarn.api.records.Token invalidNMToken =
|
||||||
tempManager.createNMToken(validAppAttemptId, validNode, user);
|
tempManager.createNMToken(validAppAttemptId, validNode, user);
|
||||||
sb = new StringBuilder("Given NMToken for application : ");
|
sb = new StringBuilder("Given NMToken for application : ");
|
||||||
sb.append(validAppAttemptId.toString())
|
sb.append(validAppAttemptId.toString())
|
||||||
.append(" seems to have been generated illegally.");
|
.append(" seems to have been generated illegally.");
|
||||||
Assert.assertTrue(sb.toString().contains(
|
assertTrue(sb.toString().contains(
|
||||||
testStartContainer(rpc, validAppAttemptId, validNode,
|
testStartContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerToken, invalidNMToken, true)));
|
validContainerToken, invalidNMToken, true)));
|
||||||
|
|
||||||
|
@ -285,10 +288,10 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
user);
|
user);
|
||||||
sb = new StringBuilder("Given NMToken for application : ");
|
sb = new StringBuilder("Given NMToken for application : ");
|
||||||
sb.append(validAppAttemptId)
|
sb.append(validAppAttemptId)
|
||||||
.append(" is not valid for current node manager.expected : ")
|
.append(" is not valid for current node manager.expected : ")
|
||||||
.append(validNode.toString())
|
.append(validNode.toString())
|
||||||
.append(" found : ").append(invalidNode.toString());
|
.append(" found : ").append(invalidNode.toString());
|
||||||
Assert.assertTrue(sb.toString().contains(
|
assertTrue(sb.toString().contains(
|
||||||
testStartContainer(rpc, validAppAttemptId, validNode,
|
testStartContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerToken, invalidNMToken, true)));
|
validContainerToken, invalidNMToken, true)));
|
||||||
|
|
||||||
|
@ -298,9 +301,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
validContainerToken =
|
validContainerToken =
|
||||||
containerTokenSecretManager.createContainerToken(validContainerId,
|
containerTokenSecretManager.createContainerToken(validContainerId,
|
||||||
0, validNode, user, r, Priority.newInstance(0), 0);
|
0, validNode, user, r, Priority.newInstance(0), 0);
|
||||||
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
|
assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerToken, validNMToken, false).isEmpty());
|
validContainerToken, validNMToken, false).isEmpty());
|
||||||
Assert.assertTrue(nmTokenSecretManagerNM
|
assertTrue(nmTokenSecretManagerNM
|
||||||
.isAppAttemptNMTokenKeyPresent(validAppAttemptId));
|
.isAppAttemptNMTokenKeyPresent(validAppAttemptId));
|
||||||
|
|
||||||
// using a new compatible version nmtoken, expect container can be started
|
// using a new compatible version nmtoken, expect container can be started
|
||||||
|
@ -330,16 +333,15 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
new NMTokenIdentifierNewForTest(newIdentifier, "message");
|
new NMTokenIdentifierNewForTest(newIdentifier, "message");
|
||||||
|
|
||||||
// check new version NMTokenIdentifier has correct info.
|
// check new version NMTokenIdentifier has correct info.
|
||||||
Assert.assertEquals("The ApplicationAttemptId is changed after set to " +
|
assertEquals(validAppAttemptId2.getAttemptId(),
|
||||||
"newVersionIdentifier", validAppAttemptId2.getAttemptId(),
|
|
||||||
newVersionIdentifier.getApplicationAttemptId().getAttemptId()
|
newVersionIdentifier.getApplicationAttemptId().getAttemptId()
|
||||||
);
|
,
|
||||||
|
"The ApplicationAttemptId is changed after set to " +
|
||||||
|
"newVersionIdentifier");
|
||||||
|
|
||||||
Assert.assertEquals("The message is changed after set to newVersionIdentifier",
|
assertEquals("message", newVersionIdentifier.getMessage(), "The message is changed after set to newVersionIdentifier");
|
||||||
"message", newVersionIdentifier.getMessage());
|
|
||||||
|
|
||||||
Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier",
|
assertEquals(validNode, newVersionIdentifier.getNodeId(), "The NodeId is changed after set to newVersionIdentifier");
|
||||||
validNode, newVersionIdentifier.getNodeId());
|
|
||||||
|
|
||||||
// create new Token based on new version NMTokenIdentifier.
|
// create new Token based on new version NMTokenIdentifier.
|
||||||
org.apache.hadoop.yarn.api.records.Token newVersionedNMToken =
|
org.apache.hadoop.yarn.api.records.Token newVersionedNMToken =
|
||||||
|
@ -348,16 +350,16 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
newVersionIdentifier);
|
newVersionIdentifier);
|
||||||
|
|
||||||
// Verify startContainer is successful and no exception is thrown.
|
// Verify startContainer is successful and no exception is thrown.
|
||||||
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode,
|
assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode,
|
||||||
validContainerToken2, newVersionedNMToken, false).isEmpty());
|
validContainerToken2, newVersionedNMToken, false).isEmpty());
|
||||||
Assert.assertTrue(nmTokenSecretManagerNM
|
assertTrue(nmTokenSecretManagerNM
|
||||||
.isAppAttemptNMTokenKeyPresent(validAppAttemptId2));
|
.isAppAttemptNMTokenKeyPresent(validAppAttemptId2));
|
||||||
|
|
||||||
//Now lets wait till container finishes and is removed from node manager.
|
//Now lets wait till container finishes and is removed from node manager.
|
||||||
waitForContainerToFinishOnNM(validContainerId);
|
waitForContainerToFinishOnNM(validContainerId);
|
||||||
sb = new StringBuilder("Attempt to relaunch the same container with id ");
|
sb = new StringBuilder("Attempt to relaunch the same container with id ");
|
||||||
sb.append(validContainerId);
|
sb.append(validContainerId);
|
||||||
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
|
assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerToken, validNMToken, true).contains(sb.toString()));
|
validContainerToken, validNMToken, true).contains(sb.toString()));
|
||||||
|
|
||||||
// Container is removed from node manager's memory by this time.
|
// Container is removed from node manager's memory by this time.
|
||||||
|
@ -377,7 +379,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
sb = new StringBuilder("Container ");
|
sb = new StringBuilder("Container ");
|
||||||
sb.append(validContainerId)
|
sb.append(validContainerId)
|
||||||
.append(" was recently stopped on node manager");
|
.append(" was recently stopped on node manager");
|
||||||
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
|
assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerId, validNMToken, true).contains(sb.toString()));
|
validContainerId, validNMToken, true).contains(sb.toString()));
|
||||||
|
|
||||||
// Now lets remove the container from nm-memory
|
// Now lets remove the container from nm-memory
|
||||||
|
@ -388,7 +390,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
sb = new StringBuilder("Container ")
|
sb = new StringBuilder("Container ")
|
||||||
.append(validContainerId.toString())
|
.append(validContainerId.toString())
|
||||||
.append(" is not handled by this NodeManager");
|
.append(" is not handled by this NodeManager");
|
||||||
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
|
assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
|
||||||
validContainerId, validNMToken, false).contains(sb.toString()));
|
validContainerId, validNMToken, false).contains(sb.toString()));
|
||||||
|
|
||||||
// using appAttempt-1 NMtoken for launching container for appAttempt-2
|
// using appAttempt-1 NMtoken for launching container for appAttempt-2
|
||||||
|
@ -396,13 +398,13 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(appId, 2);
|
ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(appId, 2);
|
||||||
Token attempt1NMToken =
|
Token attempt1NMToken =
|
||||||
nmTokenSecretManagerRM
|
nmTokenSecretManagerRM
|
||||||
.createNMToken(validAppAttemptId, validNode, user);
|
.createNMToken(validAppAttemptId, validNode, user);
|
||||||
org.apache.hadoop.yarn.api.records.Token newContainerToken =
|
org.apache.hadoop.yarn.api.records.Token newContainerToken =
|
||||||
containerTokenSecretManager.createContainerToken(
|
containerTokenSecretManager.createContainerToken(
|
||||||
ContainerId.newContainerId(attempt2, 1), 0, validNode, user, r,
|
ContainerId.newContainerId(attempt2, 1), 0, validNode, user, r,
|
||||||
Priority.newInstance(0), 0);
|
Priority.newInstance(0), 0);
|
||||||
Assert.assertTrue(testStartContainer(rpc, attempt2, validNode,
|
assertTrue(testStartContainer(rpc, attempt2, validNode,
|
||||||
newContainerToken, attempt1NMToken, false).isEmpty());
|
newContainerToken, attempt1NMToken, false).isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForContainerToFinishOnNM(ContainerId containerId)
|
private void waitForContainerToFinishOnNM(ContainerId containerId)
|
||||||
|
@ -419,7 +421,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
LOG.info("Waiting for " + containerId + " to get to state " +
|
LOG.info("Waiting for " + containerId + " to get to state " +
|
||||||
ContainerState.COMPLETE);
|
ContainerState.COMPLETE);
|
||||||
GenericTestUtils.waitFor(() -> ContainerState.COMPLETE.equals(
|
GenericTestUtils.waitFor(() -> ContainerState.COMPLETE.equals(
|
||||||
waitContainer.cloneAndGetContainerStatus().getState()),
|
waitContainer.cloneAndGetContainerStatus().getState()),
|
||||||
500, timeout);
|
500, timeout);
|
||||||
} catch (TimeoutException te) {
|
} catch (TimeoutException te) {
|
||||||
LOG.error("TimeoutException", te);
|
LOG.error("TimeoutException", te);
|
||||||
|
@ -433,7 +435,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
// Normally, Containers will be removed from NM context after they are
|
// Normally, Containers will be removed from NM context after they are
|
||||||
// explicitly acked by RM. Now, manually remove it for testing.
|
// explicitly acked by RM. Now, manually remove it for testing.
|
||||||
yarnCluster.getNodeManager(0).getNodeStatusUpdater()
|
yarnCluster.getNodeManager(0).getNodeStatusUpdater()
|
||||||
.addCompletedContainer(containerId);
|
.addCompletedContainer(containerId);
|
||||||
LOG.info("Removing container from NMContext, containerID = " + containerId);
|
LOG.info("Removing container from NMContext, containerID = " + containerId);
|
||||||
nmContext.getContainers().remove(containerId);
|
nmContext.getContainers().remove(containerId);
|
||||||
}
|
}
|
||||||
|
@ -458,7 +460,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
nmTokenSecretManagerRM.activateNextMasterKey();
|
nmTokenSecretManagerRM.activateNextMasterKey();
|
||||||
Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
|
assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
|
||||||
== nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
|
== nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,7 +469,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
ContainerId containerId, Token nmToken, boolean isExceptionExpected) {
|
ContainerId containerId, Token nmToken, boolean isExceptionExpected) {
|
||||||
try {
|
try {
|
||||||
stopContainer(rpc, nmToken,
|
stopContainer(rpc, nmToken,
|
||||||
Arrays.asList(new ContainerId[] {containerId}), appAttemptId,
|
Arrays.asList(new ContainerId[]{containerId}), appAttemptId,
|
||||||
nodeId);
|
nodeId);
|
||||||
if (isExceptionExpected) {
|
if (isExceptionExpected) {
|
||||||
fail("Exception was expected!!");
|
fail("Exception was expected!!");
|
||||||
|
@ -505,7 +507,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
try {
|
try {
|
||||||
startContainer(rpc, nmToken, containerToken, nodeId,
|
startContainer(rpc, nmToken, containerToken, nodeId,
|
||||||
appAttemptId.toString());
|
appAttemptId.toString());
|
||||||
if (isExceptionExpected){
|
if (isExceptionExpected) {
|
||||||
fail("Exception was expected!!");
|
fail("Exception was expected!!");
|
||||||
}
|
}
|
||||||
return "";
|
return "";
|
||||||
|
@ -538,12 +540,11 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void
|
private void getContainerStatus(YarnRPC rpc,
|
||||||
getContainerStatus(YarnRPC rpc,
|
org.apache.hadoop.yarn.api.records.Token nmToken,
|
||||||
org.apache.hadoop.yarn.api.records.Token nmToken,
|
ContainerId containerId,
|
||||||
ContainerId containerId,
|
ApplicationAttemptId appAttemptId, NodeId nodeId,
|
||||||
ApplicationAttemptId appAttemptId, NodeId nodeId,
|
boolean isExceptionExpected) throws Exception {
|
||||||
boolean isExceptionExpected) throws Exception {
|
|
||||||
List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
||||||
containerIds.add(containerId);
|
containerIds.add(containerId);
|
||||||
GetContainerStatusesRequest request =
|
GetContainerStatusesRequest request =
|
||||||
|
@ -558,7 +559,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
if (statuses.getFailedRequests() != null
|
if (statuses.getFailedRequests() != null
|
||||||
&& statuses.getFailedRequests().containsKey(containerId)) {
|
&& statuses.getFailedRequests().containsKey(containerId)) {
|
||||||
parseAndThrowException(statuses.getFailedRequests().get(containerId)
|
parseAndThrowException(statuses.getFailedRequests().get(containerId)
|
||||||
.deSerialize());
|
.deSerialize());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
|
@ -584,7 +585,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
try {
|
try {
|
||||||
proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
|
proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
|
||||||
StartContainersResponse response = proxy.startContainers(allRequests);
|
StartContainersResponse response = proxy.startContainers(allRequests);
|
||||||
for(SerializedException ex : response.getFailedRequests().values()){
|
for (SerializedException ex : response.getFailedRequests().values()) {
|
||||||
parseAndThrowException(ex.deSerialize());
|
parseAndThrowException(ex.deSerialize());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -617,7 +618,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
}
|
}
|
||||||
proxy =
|
proxy =
|
||||||
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
||||||
rpc, addr);
|
rpc, addr);
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,7 +643,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
*/
|
*/
|
||||||
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
|
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
|
||||||
yarnCluster.getResourceManager().getRMContext()
|
yarnCluster.getResourceManager().getRMContext()
|
||||||
.getNMTokenSecretManager();
|
.getNMTokenSecretManager();
|
||||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptId.newInstance(appId, 0);
|
ApplicationAttemptId.newInstance(appId, 0);
|
||||||
|
@ -657,7 +658,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
NodeId nodeId = nm.getNMContext().getNodeId();
|
NodeId nodeId = nm.getNMContext().getNodeId();
|
||||||
|
|
||||||
// Both id should be equal.
|
// Both id should be equal.
|
||||||
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
|
assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
|
||||||
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
|
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
|
||||||
|
|
||||||
|
|
||||||
|
@ -686,9 +687,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
nodeId, password, newVersionTokenIdentifier);
|
nodeId, password, newVersionTokenIdentifier);
|
||||||
|
|
||||||
Token nmToken =
|
Token nmToken =
|
||||||
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
|
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
|
assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
|
||||||
newContainerToken, nmToken, false).isEmpty());
|
newContainerToken, nmToken, false).isEmpty());
|
||||||
|
|
||||||
// Creating a tampered Container Token
|
// Creating a tampered Container Token
|
||||||
|
@ -710,7 +711,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
StringBuilder sb = new StringBuilder("Given Container ");
|
StringBuilder sb = new StringBuilder("Given Container ");
|
||||||
sb.append(cId2)
|
sb.append(cId2)
|
||||||
.append(" seems to have an illegally generated token.");
|
.append(" seems to have an illegally generated token.");
|
||||||
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
|
assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
|
||||||
containerToken2, nmToken, true).contains(sb.toString()));
|
containerToken2, nmToken, true).contains(sb.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -754,7 +755,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
NodeId nodeId = nm.getNMContext().getNodeId();
|
NodeId nodeId = nm.getNMContext().getNodeId();
|
||||||
|
|
||||||
// Both id should be equal.
|
// Both id should be equal.
|
||||||
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
|
assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
|
||||||
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
|
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
|
||||||
|
|
||||||
// Creating a normal Container Token
|
// Creating a normal Container Token
|
||||||
|
@ -774,8 +775,8 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
containerTokenIdentifier.readFields(dib);
|
containerTokenIdentifier.readFields(dib);
|
||||||
|
|
||||||
|
|
||||||
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
|
assertEquals(cId, containerTokenIdentifier.getContainerID());
|
||||||
Assert.assertEquals(
|
assertEquals(
|
||||||
cId.toString(), containerTokenIdentifier.getContainerID().toString());
|
cId.toString(), containerTokenIdentifier.getContainerID().toString());
|
||||||
|
|
||||||
Token nmToken =
|
Token nmToken =
|
||||||
|
@ -791,10 +792,10 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||||
= getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
|
= getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
|
||||||
GetContainerStatusesResponse res = proxy.getContainerStatuses(
|
GetContainerStatusesResponse res = proxy.getContainerStatuses(
|
||||||
GetContainerStatusesRequest.newInstance(containerIds));
|
GetContainerStatusesRequest.newInstance(containerIds));
|
||||||
Assert.assertNotNull(res.getContainerStatuses().get(0));
|
assertNotNull(res.getContainerStatuses().get(0));
|
||||||
Assert.assertEquals(
|
assertEquals(
|
||||||
cId, res.getContainerStatuses().get(0).getContainerId());
|
cId, res.getContainerStatuses().get(0).getContainerId());
|
||||||
Assert.assertEquals(cId.toString(),
|
assertEquals(cId.toString(),
|
||||||
res.getContainerStatuses().get(0).getContainerId().toString());
|
res.getContainerStatuses().get(0).getContainerId().toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -37,11 +39,10 @@ import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -73,7 +74,7 @@ public class TestDiskFailures {
|
||||||
private static MiniYARNCluster yarnCluster;
|
private static MiniYARNCluster yarnCluster;
|
||||||
LocalDirsHandlerService dirsHandler;
|
LocalDirsHandlerService dirsHandler;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void setup() throws AccessControlException,
|
public static void setup() throws AccessControlException,
|
||||||
FileNotFoundException, UnsupportedFileSystemException, IOException {
|
FileNotFoundException, UnsupportedFileSystemException, IOException {
|
||||||
localFS = FileContext.getLocalFSFileContext();
|
localFS = FileContext.getLocalFSFileContext();
|
||||||
|
@ -82,7 +83,7 @@ public class TestDiskFailures {
|
||||||
// Do not start cluster here
|
// Do not start cluster here
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void teardown() {
|
public static void teardown() {
|
||||||
if (yarnCluster != null) {
|
if (yarnCluster != null) {
|
||||||
yarnCluster.stop();
|
yarnCluster.stop();
|
||||||
|
@ -99,7 +100,7 @@ public class TestDiskFailures {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLocalDirsFailures() throws IOException {
|
void testLocalDirsFailures() throws IOException {
|
||||||
testDirsFailures(true);
|
testDirsFailures(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +112,7 @@ public class TestDiskFailures {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLogDirsFailures() throws IOException {
|
void testLogDirsFailures() throws IOException {
|
||||||
testDirsFailures(false);
|
testDirsFailures(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +123,7 @@ public class TestDiskFailures {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDirFailuresOnStartup() throws IOException {
|
void testDirFailuresOnStartup() throws IOException {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
String localDir1 = new File(testDir, "localDir1").getPath();
|
String localDir1 = new File(testDir, "localDir1").getPath();
|
||||||
String localDir2 = new File(testDir, "localDir2").getPath();
|
String localDir2 = new File(testDir, "localDir2").getPath();
|
||||||
|
@ -137,11 +138,11 @@ public class TestDiskFailures {
|
||||||
LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
|
LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
|
||||||
dirSvc.init(conf);
|
dirSvc.init(conf);
|
||||||
List<String> localDirs = dirSvc.getLocalDirs();
|
List<String> localDirs = dirSvc.getLocalDirs();
|
||||||
Assert.assertEquals(1, localDirs.size());
|
assertEquals(1, localDirs.size());
|
||||||
Assert.assertEquals(new Path(localDir2).toString(), localDirs.get(0));
|
assertEquals(new Path(localDir2).toString(), localDirs.get(0));
|
||||||
List<String> logDirs = dirSvc.getLogDirs();
|
List<String> logDirs = dirSvc.getLogDirs();
|
||||||
Assert.assertEquals(1, logDirs.size());
|
assertEquals(1, logDirs.size());
|
||||||
Assert.assertEquals(new Path(logDir1).toString(), logDirs.get(0));
|
assertEquals(new Path(logDir1).toString(), logDirs.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testDirsFailures(boolean localORLogDirs) throws IOException {
|
private void testDirsFailures(boolean localORLogDirs) throws IOException {
|
||||||
|
@ -177,8 +178,7 @@ public class TestDiskFailures {
|
||||||
List<String> list = localORLogDirs ? dirsHandler.getLocalDirs()
|
List<String> list = localORLogDirs ? dirsHandler.getLocalDirs()
|
||||||
: dirsHandler.getLogDirs();
|
: dirsHandler.getLogDirs();
|
||||||
String[] dirs = list.toArray(new String[list.size()]);
|
String[] dirs = list.toArray(new String[list.size()]);
|
||||||
Assert.assertEquals("Number of nm-" + dirType + "-dirs is wrong.",
|
assertEquals(numLocalDirs, dirs.length, "Number of nm-" + dirType + "-dirs is wrong.");
|
||||||
numLocalDirs, dirs.length);
|
|
||||||
String expectedDirs = StringUtils.join(",", list);
|
String expectedDirs = StringUtils.join(",", list);
|
||||||
// validate the health of disks initially
|
// validate the health of disks initially
|
||||||
verifyDisksHealth(localORLogDirs, expectedDirs, true);
|
verifyDisksHealth(localORLogDirs, expectedDirs, true);
|
||||||
|
@ -225,11 +225,9 @@ public class TestDiskFailures {
|
||||||
String seenDirs = StringUtils.join(",", list);
|
String seenDirs = StringUtils.join(",", list);
|
||||||
LOG.info("ExpectedDirs=" + expectedDirs);
|
LOG.info("ExpectedDirs=" + expectedDirs);
|
||||||
LOG.info("SeenDirs=" + seenDirs);
|
LOG.info("SeenDirs=" + seenDirs);
|
||||||
Assert.assertTrue("NodeManager could not identify disk failure.",
|
assertEquals(expectedDirs, seenDirs);
|
||||||
expectedDirs.equals(seenDirs));
|
|
||||||
|
|
||||||
Assert.assertEquals("Node's health in terms of disks is wrong",
|
assertEquals(isHealthy, dirsHandler.areDisksHealthy(), "Node's health in terms of disks is wrong");
|
||||||
isHealthy, dirsHandler.areDisksHealthy());
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext()
|
Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext()
|
||||||
.getRMNodes().values().iterator();
|
.getRMNodes().values().iterator();
|
||||||
|
@ -247,8 +245,7 @@ public class TestDiskFailures {
|
||||||
}
|
}
|
||||||
Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext()
|
Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext()
|
||||||
.getRMNodes().values().iterator();
|
.getRMNodes().values().iterator();
|
||||||
Assert.assertEquals("RM is not updated with the health status of a node",
|
assertEquals(isHealthy, iter.next().getState() != NodeState.UNHEALTHY, "RM is not updated with the health status of a node");
|
||||||
isHealthy, iter.next().getState() != NodeState.UNHEALTHY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,21 +18,21 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
public class TestMiniYARNClusterForHA {
|
public class TestMiniYARNClusterForHA {
|
||||||
MiniYARNCluster cluster;
|
MiniYARNCluster cluster;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws IOException, InterruptedException {
|
public void setup() throws IOException, InterruptedException {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
|
@ -43,12 +43,12 @@ public class TestMiniYARNClusterForHA {
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
|
|
||||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
assertFalse(-1 == cluster.getActiveRMIndex(), "RM never turned active");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClusterWorks() throws YarnException, InterruptedException {
|
void testClusterWorks() throws YarnException, InterruptedException {
|
||||||
assertTrue("NMs fail to connect to the RM",
|
assertTrue(cluster.waitForNodeManagersToConnect(5000),
|
||||||
cluster.waitForNodeManagersToConnect(5000));
|
"NMs fail to connect to the RM");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,18 +18,23 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
public class TestMiniYarnCluster {
|
public class TestMiniYarnCluster {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTimelineServiceStartInMiniCluster() throws Exception {
|
void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
int numNodeManagers = 1;
|
int numNodeManagers = 1;
|
||||||
int numLocalDirs = 1;
|
int numLocalDirs = 1;
|
||||||
|
@ -45,14 +50,14 @@ public class TestMiniYarnCluster {
|
||||||
try (MiniYARNCluster cluster =
|
try (MiniYARNCluster cluster =
|
||||||
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
||||||
enableAHS)) {
|
enableAHS)) {
|
||||||
|
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
|
|
||||||
//verify that the timeline service is not started.
|
//verify that the timeline service is not started.
|
||||||
Assert.assertNull("Timeline Service should not have been started",
|
assertNull(cluster.getApplicationHistoryServer(),
|
||||||
cluster.getApplicationHistoryServer());
|
"Timeline Service should not have been started");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -64,25 +69,25 @@ public class TestMiniYarnCluster {
|
||||||
try (MiniYARNCluster cluster =
|
try (MiniYARNCluster cluster =
|
||||||
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
||||||
enableAHS)) {
|
enableAHS)) {
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
|
|
||||||
// Verify that the timeline-service starts on ephemeral ports by default
|
// Verify that the timeline-service starts on ephemeral ports by default
|
||||||
String hostname = MiniYARNCluster.getHostname();
|
String hostname = MiniYARNCluster.getHostname();
|
||||||
Assert.assertEquals(hostname + ":0",
|
assertEquals(hostname + ":0",
|
||||||
conf.get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
|
conf.get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
|
||||||
|
|
||||||
cluster.start();
|
cluster.start();
|
||||||
|
|
||||||
//Timeline service may sometime take a while to get started
|
//Timeline service may sometime take a while to get started
|
||||||
int wait = 0;
|
int wait = 0;
|
||||||
while(cluster.getApplicationHistoryServer() == null && wait < 20) {
|
while (cluster.getApplicationHistoryServer() == null && wait < 20) {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
wait++;
|
wait++;
|
||||||
}
|
}
|
||||||
//verify that the timeline service is started.
|
//verify that the timeline service is started.
|
||||||
Assert.assertNotNull("Timeline Service should have been started",
|
assertNotNull(cluster.getApplicationHistoryServer(),
|
||||||
cluster.getApplicationHistoryServer());
|
"Timeline Service should have been started");
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
* Timeline service should start if TIMELINE_SERVICE_ENABLED == false
|
* Timeline service should start if TIMELINE_SERVICE_ENABLED == false
|
||||||
|
@ -93,24 +98,24 @@ public class TestMiniYarnCluster {
|
||||||
try (MiniYARNCluster cluster =
|
try (MiniYARNCluster cluster =
|
||||||
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
||||||
enableAHS)) {
|
enableAHS)) {
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
|
|
||||||
//Timeline service may sometime take a while to get started
|
//Timeline service may sometime take a while to get started
|
||||||
int wait = 0;
|
int wait = 0;
|
||||||
while(cluster.getApplicationHistoryServer() == null && wait < 20) {
|
while (cluster.getApplicationHistoryServer() == null && wait < 20) {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
wait++;
|
wait++;
|
||||||
}
|
}
|
||||||
//verify that the timeline service is started.
|
//verify that the timeline service is started.
|
||||||
Assert.assertNotNull("Timeline Service should have been started",
|
assertNotNull(cluster.getApplicationHistoryServer(),
|
||||||
cluster.getApplicationHistoryServer());
|
"Timeline Service should have been started");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiRMConf() throws IOException {
|
void testMultiRMConf() throws IOException {
|
||||||
String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
|
String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
|
||||||
int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
|
int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
|
@ -130,22 +135,22 @@ public class TestMiniYarnCluster {
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
Configuration conf1 = cluster.getResourceManager(0).getConfig(),
|
Configuration conf1 = cluster.getResourceManager(0).getConfig(),
|
||||||
conf2 = cluster.getResourceManager(1).getConfig();
|
conf2 = cluster.getResourceManager(1).getConfig();
|
||||||
Assert.assertFalse(conf1 == conf2);
|
assertFalse(conf1 == conf2);
|
||||||
Assert.assertEquals("0.0.0.0:18032",
|
assertEquals("0.0.0.0:18032",
|
||||||
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||||
RM1_NODE_ID)));
|
RM1_NODE_ID)));
|
||||||
Assert.assertEquals("0.0.0.0:28032",
|
assertEquals("0.0.0.0:28032",
|
||||||
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||||
RM2_NODE_ID)));
|
RM2_NODE_ID)));
|
||||||
Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
|
assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
|
||||||
|
|
||||||
Assert.assertEquals("0.0.0.0:18032",
|
assertEquals("0.0.0.0:18032",
|
||||||
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||||
RM1_NODE_ID)));
|
RM1_NODE_ID)));
|
||||||
Assert.assertEquals("0.0.0.0:28032",
|
assertEquals("0.0.0.0:28032",
|
||||||
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||||
RM2_NODE_ID)));
|
RM2_NODE_ID)));
|
||||||
Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
|
assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -39,8 +39,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
|
||||||
public class TestMiniYarnClusterNodeUtilization {
|
public class TestMiniYarnClusterNodeUtilization {
|
||||||
// Mini YARN cluster setup
|
// Mini YARN cluster setup
|
||||||
|
@ -72,7 +73,7 @@ public class TestMiniYarnClusterNodeUtilization {
|
||||||
|
|
||||||
private NodeStatus nodeStatus;
|
private NodeStatus nodeStatus;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() {
|
public void setup() {
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
|
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
|
||||||
|
@ -81,7 +82,7 @@ public class TestMiniYarnClusterNodeUtilization {
|
||||||
cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1);
|
cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1);
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
assertFalse(-1 == cluster.getActiveRMIndex(), "RM never turned active");
|
||||||
|
|
||||||
nm = (CustomNodeManager)cluster.getNodeManager(0);
|
nm = (CustomNodeManager)cluster.getNodeManager(0);
|
||||||
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
|
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
|
||||||
|
@ -95,11 +96,12 @@ public class TestMiniYarnClusterNodeUtilization {
|
||||||
* both the RMNode and SchedulerNode have been updated with the new
|
* both the RMNode and SchedulerNode have been updated with the new
|
||||||
* utilization.
|
* utilization.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=60000)
|
@Test
|
||||||
public void testUpdateNodeUtilization()
|
@Timeout(60000)
|
||||||
|
void testUpdateNodeUtilization()
|
||||||
throws InterruptedException, IOException, YarnException {
|
throws InterruptedException, IOException, YarnException {
|
||||||
assertTrue("NMs fail to connect to the RM",
|
assertTrue(cluster.waitForNodeManagersToConnect(10000),
|
||||||
cluster.waitForNodeManagersToConnect(10000));
|
"NMs fail to connect to the RM");
|
||||||
|
|
||||||
// Give the heartbeat time to propagate to the RM
|
// Give the heartbeat time to propagate to the RM
|
||||||
verifySimulatedUtilization();
|
verifySimulatedUtilization();
|
||||||
|
@ -119,11 +121,12 @@ public class TestMiniYarnClusterNodeUtilization {
|
||||||
* Verify both the RMNode and SchedulerNode have been updated with the new
|
* Verify both the RMNode and SchedulerNode have been updated with the new
|
||||||
* utilization.
|
* utilization.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=60000)
|
@Test
|
||||||
public void testMockNodeStatusHeartbeat()
|
@Timeout(60000)
|
||||||
|
void testMockNodeStatusHeartbeat()
|
||||||
throws InterruptedException, YarnException {
|
throws InterruptedException, YarnException {
|
||||||
assertTrue("NMs fail to connect to the RM",
|
assertTrue(cluster.waitForNodeManagersToConnect(10000),
|
||||||
cluster.waitForNodeManagersToConnect(10000));
|
"NMs fail to connect to the RM");
|
||||||
|
|
||||||
NodeStatusUpdater updater = nm.getNodeStatusUpdater();
|
NodeStatusUpdater updater = nm.getNodeStatusUpdater();
|
||||||
updater.sendOutofBandHeartBeat();
|
updater.sendOutofBandHeartBeat();
|
||||||
|
@ -196,12 +199,12 @@ public class TestMiniYarnClusterNodeUtilization {
|
||||||
|
|
||||||
// Give the heartbeat time to propagate to the RM (max 10 seconds)
|
// Give the heartbeat time to propagate to the RM (max 10 seconds)
|
||||||
// We check if the nodeUtilization is up to date
|
// We check if the nodeUtilization is up to date
|
||||||
for (int i=0; i<100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
for (RMNode ni : rmContext.getRMNodes().values()) {
|
for (RMNode ni : rmContext.getRMNodes().values()) {
|
||||||
if (ni.getNodeUtilization() != null) {
|
if (ni.getNodeUtilization() != null) {
|
||||||
if (ni.getNodeUtilization().equals(nodeUtilization)) {
|
if (ni.getNodeUtilization().equals(nodeUtilization)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
@ -210,22 +213,18 @@ public class TestMiniYarnClusterNodeUtilization {
|
||||||
// Verify the data is readable from the RM and scheduler nodes
|
// Verify the data is readable from the RM and scheduler nodes
|
||||||
for (RMNode ni : rmContext.getRMNodes().values()) {
|
for (RMNode ni : rmContext.getRMNodes().values()) {
|
||||||
ResourceUtilization cu = ni.getAggregatedContainersUtilization();
|
ResourceUtilization cu = ni.getAggregatedContainersUtilization();
|
||||||
assertEquals("Containers Utillization not propagated to RMNode",
|
assertEquals(containersUtilization, cu, "Containers Utillization not propagated to RMNode");
|
||||||
containersUtilization, cu);
|
|
||||||
|
|
||||||
ResourceUtilization nu = ni.getNodeUtilization();
|
ResourceUtilization nu = ni.getNodeUtilization();
|
||||||
assertEquals("Node Utillization not propagated to RMNode",
|
assertEquals(nodeUtilization, nu, "Node Utillization not propagated to RMNode");
|
||||||
nodeUtilization, nu);
|
|
||||||
|
|
||||||
SchedulerNode scheduler =
|
SchedulerNode scheduler = rmContext.getScheduler().getSchedulerNode(ni.getNodeID());
|
||||||
rmContext.getScheduler().getSchedulerNode(ni.getNodeID());
|
|
||||||
cu = scheduler.getAggregatedContainersUtilization();
|
cu = scheduler.getAggregatedContainersUtilization();
|
||||||
assertEquals("Containers Utillization not propagated to SchedulerNode",
|
assertEquals(containersUtilization, cu,
|
||||||
containersUtilization, cu);
|
"Containers Utillization not propagated to SchedulerNode");
|
||||||
|
|
||||||
nu = scheduler.getNodeUtilization();
|
nu = scheduler.getNodeUtilization();
|
||||||
assertEquals("Node Utillization not propagated to SchedulerNode",
|
assertEquals(nodeUtilization, nu, "Node Utillization not propagated to SchedulerNode");
|
||||||
nodeUtilization, nu);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,12 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -36,7 +38,10 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.kerby.util.IOUtil;
|
import org.apache.kerby.util.IOUtil;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
|
||||||
public class TestRMNMSecretKeys {
|
public class TestRMNMSecretKeys {
|
||||||
private static final String KRB5_CONF = "java.security.krb5.conf";
|
private static final String KRB5_CONF = "java.security.krb5.conf";
|
||||||
|
@ -44,7 +49,7 @@ public class TestRMNMSecretKeys {
|
||||||
System.getProperty("test.build.dir", "target/test-dir"),
|
System.getProperty("test.build.dir", "target/test-dir"),
|
||||||
UUID.randomUUID().toString());
|
UUID.randomUUID().toString());
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void setup() throws IOException {
|
public static void setup() throws IOException {
|
||||||
KRB5_CONF_ROOT_DIR.mkdir();
|
KRB5_CONF_ROOT_DIR.mkdir();
|
||||||
File krb5ConfFile = new File(KRB5_CONF_ROOT_DIR, "krb5.conf");
|
File krb5ConfFile = new File(KRB5_CONF_ROOT_DIR, "krb5.conf");
|
||||||
|
@ -63,13 +68,14 @@ public class TestRMNMSecretKeys {
|
||||||
System.setProperty(KRB5_CONF, krb5ConfFile.getAbsolutePath());
|
System.setProperty(KRB5_CONF, krb5ConfFile.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void tearDown() throws IOException {
|
public static void tearDown() throws IOException {
|
||||||
KRB5_CONF_ROOT_DIR.delete();
|
KRB5_CONF_ROOT_DIR.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 1000000)
|
@Test
|
||||||
public void testNMUpdation() throws Exception {
|
@Timeout(1000000)
|
||||||
|
void testNMUpdation() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
// validating RM NM keys for Unsecured environment
|
// validating RM NM keys for Unsecured environment
|
||||||
validateRMNMKeyExchange(conf);
|
validateRMNMKeyExchange(conf);
|
||||||
|
@ -113,30 +119,30 @@ public class TestRMNMSecretKeys {
|
||||||
|
|
||||||
MasterKey containerTokenMasterKey =
|
MasterKey containerTokenMasterKey =
|
||||||
registrationResponse.getContainerTokenMasterKey();
|
registrationResponse.getContainerTokenMasterKey();
|
||||||
Assert.assertNotNull(containerToken
|
assertNotNull(containerTokenMasterKey, containerToken
|
||||||
+ "Registration should cause a key-update!", containerTokenMasterKey);
|
+ "Registration should cause a key-update!");
|
||||||
MasterKey nmTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
MasterKey nmTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||||
Assert.assertNotNull(nmToken
|
assertNotNull(nmTokenMasterKey, nmToken
|
||||||
+ "Registration should cause a key-update!", nmTokenMasterKey);
|
+ "Registration should cause a key-update!");
|
||||||
|
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
|
NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
|
||||||
Assert.assertNull(containerToken +
|
assertNull(response.getContainerTokenMasterKey(),
|
||||||
"First heartbeat after registration shouldn't get any key updates!",
|
containerToken +
|
||||||
response.getContainerTokenMasterKey());
|
"First heartbeat after registration shouldn't get any key updates!");
|
||||||
Assert.assertNull(nmToken +
|
assertNull(response.getNMTokenMasterKey(),
|
||||||
"First heartbeat after registration shouldn't get any key updates!",
|
nmToken +
|
||||||
response.getNMTokenMasterKey());
|
"First heartbeat after registration shouldn't get any key updates!");
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
response = nm.nodeHeartbeat(true);
|
response = nm.nodeHeartbeat(true);
|
||||||
Assert.assertNull(containerToken +
|
assertNull(response.getContainerTokenMasterKey(),
|
||||||
"Even second heartbeat after registration shouldn't get any key updates!",
|
containerToken +
|
||||||
response.getContainerTokenMasterKey());
|
"Even second heartbeat after registration shouldn't get any key updates!");
|
||||||
Assert.assertNull(nmToken +
|
assertNull(response.getContainerTokenMasterKey(),
|
||||||
"Even second heartbeat after registration shouldn't get any key updates!",
|
nmToken +
|
||||||
response.getContainerTokenMasterKey());
|
"Even second heartbeat after registration shouldn't get any key updates!");
|
||||||
|
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
|
@ -146,30 +152,30 @@ public class TestRMNMSecretKeys {
|
||||||
|
|
||||||
// Heartbeats after roll-over and before activation should be fine.
|
// Heartbeats after roll-over and before activation should be fine.
|
||||||
response = nm.nodeHeartbeat(true);
|
response = nm.nodeHeartbeat(true);
|
||||||
Assert.assertNotNull(containerToken +
|
assertNotNull(response.getContainerTokenMasterKey(),
|
||||||
"Heartbeats after roll-over and before activation should not err out.",
|
containerToken +
|
||||||
response.getContainerTokenMasterKey());
|
"Heartbeats after roll-over and before activation should not err out.");
|
||||||
Assert.assertNotNull(nmToken +
|
assertNotNull(response.getNMTokenMasterKey(),
|
||||||
"Heartbeats after roll-over and before activation should not err out.",
|
nmToken +
|
||||||
response.getNMTokenMasterKey());
|
"Heartbeats after roll-over and before activation should not err out.");
|
||||||
|
|
||||||
Assert.assertEquals(containerToken +
|
assertEquals(containerTokenMasterKey.getKeyId() + 1,
|
||||||
"Roll-over should have incremented the key-id only by one!",
|
response.getContainerTokenMasterKey().getKeyId(),
|
||||||
containerTokenMasterKey.getKeyId() + 1,
|
containerToken +
|
||||||
response.getContainerTokenMasterKey().getKeyId());
|
"Roll-over should have incremented the key-id only by one!");
|
||||||
Assert.assertEquals(nmToken +
|
assertEquals(nmTokenMasterKey.getKeyId() + 1,
|
||||||
"Roll-over should have incremented the key-id only by one!",
|
response.getNMTokenMasterKey().getKeyId(),
|
||||||
nmTokenMasterKey.getKeyId() + 1,
|
nmToken +
|
||||||
response.getNMTokenMasterKey().getKeyId());
|
"Roll-over should have incremented the key-id only by one!");
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
response = nm.nodeHeartbeat(true);
|
response = nm.nodeHeartbeat(true);
|
||||||
Assert.assertNull(containerToken +
|
assertNull(response.getContainerTokenMasterKey(),
|
||||||
"Second heartbeat after roll-over shouldn't get any key updates!",
|
containerToken +
|
||||||
response.getContainerTokenMasterKey());
|
"Second heartbeat after roll-over shouldn't get any key updates!");
|
||||||
Assert.assertNull(nmToken +
|
assertNull(response.getNMTokenMasterKey(),
|
||||||
"Second heartbeat after roll-over shouldn't get any key updates!",
|
nmToken +
|
||||||
response.getNMTokenMasterKey());
|
"Second heartbeat after roll-over shouldn't get any key updates!");
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
// Let's force activation
|
// Let's force activation
|
||||||
|
@ -177,21 +183,21 @@ public class TestRMNMSecretKeys {
|
||||||
rm.getRMContext().getNMTokenSecretManager().activateNextMasterKey();
|
rm.getRMContext().getNMTokenSecretManager().activateNextMasterKey();
|
||||||
|
|
||||||
response = nm.nodeHeartbeat(true);
|
response = nm.nodeHeartbeat(true);
|
||||||
Assert.assertNull(containerToken
|
assertNull(response.getContainerTokenMasterKey(),
|
||||||
+ "Activation shouldn't cause any key updates!",
|
containerToken
|
||||||
response.getContainerTokenMasterKey());
|
+ "Activation shouldn't cause any key updates!");
|
||||||
Assert.assertNull(nmToken
|
assertNull(response.getNMTokenMasterKey(),
|
||||||
+ "Activation shouldn't cause any key updates!",
|
nmToken
|
||||||
response.getNMTokenMasterKey());
|
+ "Activation shouldn't cause any key updates!");
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
response = nm.nodeHeartbeat(true);
|
response = nm.nodeHeartbeat(true);
|
||||||
Assert.assertNull(containerToken +
|
assertNull(response.getContainerTokenMasterKey(),
|
||||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
containerToken +
|
||||||
response.getContainerTokenMasterKey());
|
"Even second heartbeat after activation shouldn't get any key updates!");
|
||||||
Assert.assertNull(nmToken +
|
assertNull(response.getNMTokenMasterKey(),
|
||||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
nmToken +
|
||||||
response.getNMTokenMasterKey());
|
"Even second heartbeat after activation shouldn't get any key updates!");
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
|
|
|
@ -20,28 +20,13 @@ package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.junit.rules.TestWatcher;
|
|
||||||
import org.junit.runner.Description;
|
import org.junit.jupiter.api.extension.TestWatcher;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class TimelineVersionWatcher extends TestWatcher {
|
public class TimelineVersionWatcher implements TestWatcher {
|
||||||
static final float DEFAULT_TIMELINE_VERSION = 1.0f;
|
static final float DEFAULT_TIMELINE_VERSION = 1.0f;
|
||||||
private TimelineVersion version;
|
private TimelineVersion version;
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void starting(Description description) {
|
|
||||||
version = description.getAnnotation(TimelineVersion.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the version number of timeline server for the current test (using
|
|
||||||
* timeline server v1.0 by default)
|
|
||||||
*/
|
|
||||||
public float getTimelineVersion() {
|
|
||||||
if(version == null) {
|
|
||||||
return DEFAULT_TIMELINE_VERSION;
|
|
||||||
}
|
|
||||||
return version.value();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.timelineservice;
|
package org.apache.hadoop.yarn.server.timelineservice;
|
||||||
|
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -54,9 +54,9 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineColle
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
public class TestTimelineServiceClientIntegration {
|
public class TestTimelineServiceClientIntegration {
|
||||||
private static final String ROOT_DIR = new File("target",
|
private static final String ROOT_DIR = new File("target",
|
||||||
|
@ -66,7 +66,7 @@ public class TestTimelineServiceClientIntegration {
|
||||||
private static PerNodeTimelineCollectorsAuxService auxService;
|
private static PerNodeTimelineCollectorsAuxService auxService;
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void setupClass() throws Exception {
|
public static void setupClass() throws Exception {
|
||||||
try {
|
try {
|
||||||
collectorManager = new MockNodeTimelineCollectorManager();
|
collectorManager = new MockNodeTimelineCollectorManager();
|
||||||
|
@ -88,7 +88,7 @@ public class TestTimelineServiceClientIntegration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void tearDownClass() throws Exception {
|
public static void tearDownClass() throws Exception {
|
||||||
if (auxService != null) {
|
if (auxService != null) {
|
||||||
auxService.stop();
|
auxService.stop();
|
||||||
|
@ -97,7 +97,7 @@ public class TestTimelineServiceClientIntegration {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutEntities() throws Exception {
|
void testPutEntities() throws Exception {
|
||||||
TimelineV2Client client =
|
TimelineV2Client client =
|
||||||
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
|
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
|
||||||
try {
|
try {
|
||||||
|
@ -123,7 +123,7 @@ public class TestTimelineServiceClientIntegration {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutExtendedEntities() throws Exception {
|
void testPutExtendedEntities() throws Exception {
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
TimelineV2Client client =
|
TimelineV2Client client =
|
||||||
TimelineV2Client.createTimelineClient(appId);
|
TimelineV2Client.createTimelineClient(appId);
|
||||||
|
|
|
@ -18,11 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.security;
|
package org.apache.hadoop.yarn.server.timelineservice.security;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.atLeastOnce;
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
|
@ -75,18 +75,16 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineR
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_HTTP_AUTH_PREFIX;
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_HTTP_AUTH_PREFIX;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests timeline authentication filter based security for timeline service v2.
|
* Tests timeline authentication filter based security for timeline service v2.
|
||||||
*/
|
*/
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
public class TestTimelineAuthFilterForV2 {
|
public class TestTimelineAuthFilterForV2 {
|
||||||
|
|
||||||
private static final String FOO_USER = "foo";
|
private static final String FOO_USER = "foo";
|
||||||
|
@ -106,9 +104,8 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
|
|
||||||
// First param indicates whether HTTPS access or HTTP access and second param
|
// First param indicates whether HTTPS access or HTTP access and second param
|
||||||
// indicates whether it is kerberos access or token based access.
|
// indicates whether it is kerberos access or token based access.
|
||||||
@Parameterized.Parameters
|
|
||||||
public static Collection<Object[]> params() {
|
public static Collection<Object[]> params() {
|
||||||
return Arrays.asList(new Object[][] {{false, true}, {false, false},
|
return Arrays.asList(new Object[][]{{false, true}, {false, false},
|
||||||
{true, false}, {true, true}});
|
{true, false}, {true, true}});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,11 +114,14 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
private static String sslConfDir;
|
private static String sslConfDir;
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
private static UserGroupInformation nonKerberosUser;
|
private static UserGroupInformation nonKerberosUser;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
nonKerberosUser = UserGroupInformation.getCurrentUser();
|
nonKerberosUser = UserGroupInformation.getCurrentUser();
|
||||||
} catch (IOException e) {}
|
} catch (IOException e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Indicates whether HTTPS or HTTP access.
|
// Indicates whether HTTPS or HTTP access.
|
||||||
private boolean withSsl;
|
private boolean withSsl;
|
||||||
// Indicates whether Kerberos based login is used or token based access is
|
// Indicates whether Kerberos based login is used or token based access is
|
||||||
|
@ -129,13 +129,14 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
private boolean withKerberosLogin;
|
private boolean withKerberosLogin;
|
||||||
private NodeTimelineCollectorManager collectorManager;
|
private NodeTimelineCollectorManager collectorManager;
|
||||||
private PerNodeTimelineCollectorsAuxService auxService;
|
private PerNodeTimelineCollectorsAuxService auxService;
|
||||||
public TestTimelineAuthFilterForV2(boolean withSsl,
|
|
||||||
|
public void initTestTimelineAuthFilterForV2(boolean withSsl,
|
||||||
boolean withKerberosLogin) {
|
boolean withKerberosLogin) {
|
||||||
this.withSsl = withSsl;
|
this.withSsl = withSsl;
|
||||||
this.withKerberosLogin = withKerberosLogin;
|
this.withKerberosLogin = withKerberosLogin;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void setup() {
|
public static void setup() {
|
||||||
try {
|
try {
|
||||||
testMiniKDC = new MiniKdc(MiniKdc.createConf(), TEST_ROOT_DIR);
|
testMiniKDC = new MiniKdc(MiniKdc.createConf(), TEST_ROOT_DIR);
|
||||||
|
@ -181,7 +182,7 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void initialize() throws Exception {
|
public void initialize() throws Exception {
|
||||||
if (withSsl) {
|
if (withSsl) {
|
||||||
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||||
|
@ -221,7 +222,7 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
appId, UserGroupInformation.getCurrentUser().getUserName());
|
appId, UserGroupInformation.getCurrentUser().getUserName());
|
||||||
if (!withKerberosLogin) {
|
if (!withKerberosLogin) {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
(AppLevelTimelineCollector)collectorManager.get(appId);
|
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||||
Token<TimelineDelegationTokenIdentifier> token =
|
Token<TimelineDelegationTokenIdentifier> token =
|
||||||
collector.getDelegationTokenForApp();
|
collector.getDelegationTokenForApp();
|
||||||
token.setService(new Text("localhost" + token.getService().toString().
|
token.setService(new Text("localhost" + token.getService().toString().
|
||||||
|
@ -243,7 +244,7 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
if (testMiniKDC != null) {
|
if (testMiniKDC != null) {
|
||||||
testMiniKDC.stop();
|
testMiniKDC.stop();
|
||||||
|
@ -251,7 +252,7 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
FileUtil.fullyDelete(TEST_ROOT_DIR);
|
FileUtil.fullyDelete(TEST_ROOT_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void destroy() throws Exception {
|
public void destroy() throws Exception {
|
||||||
if (auxService != null) {
|
if (auxService != null) {
|
||||||
auxService.stop();
|
auxService.stop();
|
||||||
|
@ -318,7 +319,7 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
String entityType, int numEntities) throws Exception {
|
String entityType, int numEntities) throws Exception {
|
||||||
TimelineV2Client client = createTimelineClientForUGI(appId);
|
TimelineV2Client client = createTimelineClientForUGI(appId);
|
||||||
try {
|
try {
|
||||||
// Sync call. Results available immediately.
|
// Sync call. Results available immediately.
|
||||||
client.putEntities(createEntity("entity1", entityType));
|
client.putEntities(createEntity("entity1", entityType));
|
||||||
assertEquals(numEntities, entityTypeDir.listFiles().length);
|
assertEquals(numEntities, entityTypeDir.listFiles().length);
|
||||||
verifyEntity(entityTypeDir, "entity1", entityType);
|
verifyEntity(entityTypeDir, "entity1", entityType);
|
||||||
|
@ -343,8 +344,10 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@MethodSource("params")
|
||||||
public void testPutTimelineEntities() throws Exception {
|
@ParameterizedTest
|
||||||
|
void testPutTimelineEntities(boolean withSsl, boolean withKerberosLogin) throws Exception {
|
||||||
|
initTestTimelineAuthFilterForV2(withSsl, withKerberosLogin);
|
||||||
final String entityType = ENTITY_TYPE +
|
final String entityType = ENTITY_TYPE +
|
||||||
ENTITY_TYPE_SUFFIX.getAndIncrement();
|
ENTITY_TYPE_SUFFIX.getAndIncrement();
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
@ -364,8 +367,8 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
assertTrue("Entities should have been published successfully.",
|
assertTrue(publishWithRetries(appId, entityTypeDir, entityType, 1),
|
||||||
publishWithRetries(appId, entityTypeDir, entityType, 1));
|
"Entities should have been published successfully.");
|
||||||
|
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
(AppLevelTimelineCollector) collectorManager.get(appId);
|
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||||
|
@ -377,12 +380,12 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
// published.
|
// published.
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
// Entities should publish successfully after renewal.
|
// Entities should publish successfully after renewal.
|
||||||
assertTrue("Entities should have been published successfully.",
|
assertTrue(publishWithRetries(appId, entityTypeDir, entityType, 2),
|
||||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
"Entities should have been published successfully.");
|
||||||
assertNotNull(collector);
|
assertNotNull(collector);
|
||||||
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
|
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
|
||||||
renewToken(eq(collector.getDelegationTokenForApp()),
|
renewToken(eq(collector.getDelegationTokenForApp()),
|
||||||
any(String.class));
|
any(String.class));
|
||||||
|
|
||||||
// Wait to ensure lifetime of token expires and ensure its regenerated
|
// Wait to ensure lifetime of token expires and ensure its regenerated
|
||||||
// automatically.
|
// automatically.
|
||||||
|
@ -393,8 +396,9 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
assertNotEquals("Token should have been regenerated.", token,
|
assertNotEquals(token,
|
||||||
collector.getDelegationTokenForApp());
|
collector.getDelegationTokenForApp(),
|
||||||
|
"Token should have been regenerated.");
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
// Try publishing with the old token in UGI. Publishing should fail due
|
// Try publishing with the old token in UGI. Publishing should fail due
|
||||||
// to invalid token.
|
// to invalid token.
|
||||||
|
@ -402,8 +406,8 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
||||||
fail("Exception should have been thrown due to Invalid Token.");
|
fail("Exception should have been thrown due to Invalid Token.");
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
assertTrue("Exception thrown should have been due to Invalid Token.",
|
assertTrue(e.getCause().getMessage().contains("InvalidToken"),
|
||||||
e.getCause().getMessage().contains("InvalidToken"));
|
"Exception thrown should have been due to Invalid Token.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the regenerated token in UGI and retry publishing entities.
|
// Update the regenerated token in UGI and retry publishing entities.
|
||||||
|
@ -411,10 +415,10 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
collector.getDelegationTokenForApp();
|
collector.getDelegationTokenForApp();
|
||||||
regeneratedToken.setService(new Text("localhost" +
|
regeneratedToken.setService(new Text("localhost" +
|
||||||
regeneratedToken.getService().toString().substring(
|
regeneratedToken.getService().toString().substring(
|
||||||
regeneratedToken.getService().toString().indexOf(":"))));
|
regeneratedToken.getService().toString().indexOf(":"))));
|
||||||
UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
|
UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
|
||||||
assertTrue("Entities should have been published successfully.",
|
assertTrue(publishWithRetries(appId, entityTypeDir, entityType, 2),
|
||||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
"Entities should have been published successfully.");
|
||||||
// Token was generated twice, once when app collector was created and
|
// Token was generated twice, once when app collector was created and
|
||||||
// later after token lifetime expiry.
|
// later after token lifetime expiry.
|
||||||
verify(collectorManager.getTokenManagerService(), times(2)).
|
verify(collectorManager.getTokenManagerService(), times(2)).
|
||||||
|
@ -432,11 +436,11 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
assertNotNull("Error reading entityTypeDir", entities);
|
assertNotNull(entities, "Error reading entityTypeDir");
|
||||||
assertEquals(2, entities.length);
|
assertEquals(2, entities.length);
|
||||||
verifyEntity(entityTypeDir, "entity2", entityType);
|
verifyEntity(entityTypeDir, "entity2", entityType);
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
(AppLevelTimelineCollector)collectorManager.get(appId);
|
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||||
assertNotNull(collector);
|
assertNotNull(collector);
|
||||||
auxService.removeApplication(appId);
|
auxService.removeApplication(appId);
|
||||||
verify(collectorManager.getTokenManagerService()).cancelToken(
|
verify(collectorManager.getTokenManagerService()).cancelToken(
|
||||||
|
@ -446,24 +450,20 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
private static class DummyNodeTimelineCollectorManager extends
|
private static class DummyNodeTimelineCollectorManager extends
|
||||||
NodeTimelineCollectorManager {
|
NodeTimelineCollectorManager {
|
||||||
private volatile int tokenExpiredCnt = 0;
|
private volatile int tokenExpiredCnt = 0;
|
||||||
DummyNodeTimelineCollectorManager() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
private int getTokenExpiredCnt() {
|
private int getTokenExpiredCnt() {
|
||||||
return tokenExpiredCnt;
|
return tokenExpiredCnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected TimelineV2DelegationTokenSecretManagerService
|
protected TimelineV2DelegationTokenSecretManagerService createTokenManagerService() {
|
||||||
createTokenManagerService() {
|
|
||||||
return spy(new TimelineV2DelegationTokenSecretManagerService() {
|
return spy(new TimelineV2DelegationTokenSecretManagerService() {
|
||||||
@Override
|
@Override
|
||||||
protected AbstractDelegationTokenSecretManager
|
protected AbstractDelegationTokenSecretManager
|
||||||
<TimelineDelegationTokenIdentifier>
|
<TimelineDelegationTokenIdentifier> createTimelineDelegationTokenSecretManager(long
|
||||||
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
|
secretKeyInterval,
|
||||||
long tokenMaxLifetime, long tokenRenewInterval,
|
long tokenMaxLifetime, long tokenRenewInterval,
|
||||||
long tokenRemovalScanInterval) {
|
long tokenRemovalScanInterval) {
|
||||||
return spy(new TimelineV2DelegationTokenSecretManager(
|
return spy(new TimelineV2DelegationTokenSecretManager(
|
||||||
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 2000L) {
|
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 2000L) {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue