YARN-5648. [ATSv2 Security] Client side changes for authentication. Contributed by Varun Saxena
This commit is contained in:
parent
d3f11e3f13
commit
ac7f52df83
|
@ -19,7 +19,10 @@
|
||||||
package org.apache.hadoop.yarn.client.api.impl;
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -69,6 +72,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
||||||
|
|
||||||
private ApplicationId contextAppId;
|
private ApplicationId contextAppId;
|
||||||
|
|
||||||
|
private UserGroupInformation authUgi;
|
||||||
|
|
||||||
public TimelineV2ClientImpl(ApplicationId appId) {
|
public TimelineV2ClientImpl(ApplicationId appId) {
|
||||||
super(TimelineV2ClientImpl.class.getName());
|
super(TimelineV2ClientImpl.class.getName());
|
||||||
this.contextAppId = appId;
|
this.contextAppId = appId;
|
||||||
|
@ -88,7 +93,6 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
UserGroupInformation realUgi = ugi.getRealUser();
|
UserGroupInformation realUgi = ugi.getRealUser();
|
||||||
String doAsUser = null;
|
String doAsUser = null;
|
||||||
UserGroupInformation authUgi = null;
|
|
||||||
if (realUgi != null) {
|
if (realUgi != null) {
|
||||||
authUgi = realUgi;
|
authUgi = realUgi;
|
||||||
doAsUser = ugi.getShortUserName();
|
doAsUser = ugi.getShortUserName();
|
||||||
|
@ -192,19 +196,33 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ClientResponse doPutObjects(URI base, String path,
|
||||||
|
MultivaluedMap<String, String> params, Object obj) {
|
||||||
|
return connector.getClient().resource(base).path(path).queryParams(params)
|
||||||
|
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, obj);
|
||||||
|
}
|
||||||
|
|
||||||
protected void putObjects(URI base, String path,
|
protected void putObjects(URI base, String path,
|
||||||
MultivaluedMap<String, String> params, Object obj)
|
MultivaluedMap<String, String> params, Object obj)
|
||||||
throws IOException, YarnException {
|
throws IOException, YarnException {
|
||||||
ClientResponse resp;
|
ClientResponse resp = null;
|
||||||
try {
|
try {
|
||||||
resp = connector.getClient().resource(base).path(path).queryParams(params)
|
resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
|
||||||
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
|
@Override
|
||||||
.put(ClientResponse.class, obj);
|
public ClientResponse run() throws Exception {
|
||||||
} catch (RuntimeException re) {
|
return doPutObjects(base, path, params, obj);
|
||||||
// runtime exception is expected if the client cannot connect the server
|
}
|
||||||
String msg = "Failed to get the response from the timeline server.";
|
});
|
||||||
LOG.error(msg, re);
|
} catch (UndeclaredThrowableException ue) {
|
||||||
throw new IOException(re);
|
Throwable cause = ue.getCause();
|
||||||
|
if (cause instanceof IOException) {
|
||||||
|
throw (IOException)cause;
|
||||||
|
} else {
|
||||||
|
throw new IOException(cause);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw (IOException) new InterruptedIOException().initCause(ie);
|
||||||
}
|
}
|
||||||
if (resp == null || resp.getStatusInfo()
|
if (resp == null || resp.getStatusInfo()
|
||||||
.getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
|
.getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
|
||||||
|
|
|
@ -125,6 +125,17 @@
|
||||||
<groupId>commons-logging</groupId>
|
<groupId>commons-logging</groupId>
|
||||||
<artifactId>commons-logging</artifactId>
|
<artifactId>commons-logging</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.bouncycastle</groupId>
|
||||||
|
<artifactId>bcprov-jdk16</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-auth</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
<type>test-jar</type>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -18,10 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server;
|
package org.apache.hadoop.yarn.server;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -32,9 +35,38 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.kerby.util.IOUtil;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestRMNMSecretKeys {
|
public class TestRMNMSecretKeys {
|
||||||
|
private static final String KRB5_CONF = "java.security.krb5.conf";
|
||||||
|
private static final File KRB5_CONF_ROOT_DIR = new File(
|
||||||
|
System.getProperty("test.build.dir", "target/test-dir"),
|
||||||
|
UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws IOException {
|
||||||
|
KRB5_CONF_ROOT_DIR.mkdir();
|
||||||
|
File krb5ConfFile = new File(KRB5_CONF_ROOT_DIR, "krb5.conf");
|
||||||
|
krb5ConfFile.createNewFile();
|
||||||
|
String content = "[libdefaults]\n" +
|
||||||
|
" default_realm = APACHE.ORG\n" +
|
||||||
|
" udp_preference_limit = 1\n"+
|
||||||
|
" extra_addresses = 127.0.0.1\n" +
|
||||||
|
"[realms]\n" +
|
||||||
|
" APACHE.ORG = {\n" +
|
||||||
|
" admin_server = localhost:88\n" +
|
||||||
|
" kdc = localhost:88\n}\n" +
|
||||||
|
"[domain_realm]\n" +
|
||||||
|
" localhost = APACHE.ORG";
|
||||||
|
IOUtil.writeFile(content, krb5ConfFile);
|
||||||
|
System.setProperty(KRB5_CONF, krb5ConfFile.getAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
KRB5_CONF_ROOT_DIR.delete();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 1000000)
|
@Test(timeout = 1000000)
|
||||||
public void testNMUpdation() throws Exception {
|
public void testNMUpdation() throws Exception {
|
||||||
|
|
|
@ -0,0 +1,309 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.security;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authentication.KerberosTestUtils;
|
||||||
|
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
||||||
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests timeline authentication filter based security for timeline service v2.
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestTimelineAuthFilterForV2 {
|
||||||
|
|
||||||
|
private static final String FOO_USER = "foo";
|
||||||
|
private static final String HTTP_USER = "HTTP";
|
||||||
|
|
||||||
|
private static final File TEST_ROOT_DIR = new File(
|
||||||
|
System.getProperty("test.build.dir", "target" + File.separator +
|
||||||
|
"test-dir"), UUID.randomUUID().toString());
|
||||||
|
private static final String BASEDIR =
|
||||||
|
System.getProperty("test.build.dir", "target/test-dir") + "/"
|
||||||
|
+ TestTimelineAuthFilterForV2.class.getSimpleName();
|
||||||
|
private static File httpSpnegoKeytabFile = new File(KerberosTestUtils.
|
||||||
|
getKeytabFile());
|
||||||
|
private static String httpSpnegoPrincipal = KerberosTestUtils.
|
||||||
|
getServerPrincipal();
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<Object[]> withSsl() {
|
||||||
|
return Arrays.asList(new Object[][] {{false}, {true}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MiniKdc testMiniKDC;
|
||||||
|
private static String keystoresDir;
|
||||||
|
private static String sslConfDir;
|
||||||
|
private static Configuration conf;
|
||||||
|
private boolean withSsl;
|
||||||
|
private NodeTimelineCollectorManager collectorManager;
|
||||||
|
private PerNodeTimelineCollectorsAuxService auxService;
|
||||||
|
|
||||||
|
public TestTimelineAuthFilterForV2(boolean withSsl) {
|
||||||
|
this.withSsl = withSsl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() {
|
||||||
|
try {
|
||||||
|
testMiniKDC = new MiniKdc(MiniKdc.createConf(), TEST_ROOT_DIR);
|
||||||
|
testMiniKDC.start();
|
||||||
|
testMiniKDC.createPrincipal(
|
||||||
|
httpSpnegoKeytabFile, HTTP_USER + "/localhost");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Couldn't setup MiniKDC.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup timeline service v2.
|
||||||
|
try {
|
||||||
|
conf = new Configuration(false);
|
||||||
|
conf.setStrings(TimelineAuthenticationFilterInitializer.PREFIX + "type",
|
||||||
|
"kerberos");
|
||||||
|
conf.set(TimelineAuthenticationFilterInitializer.PREFIX +
|
||||||
|
KerberosAuthenticationHandler.PRINCIPAL, httpSpnegoPrincipal);
|
||||||
|
conf.set(TimelineAuthenticationFilterInitializer.PREFIX +
|
||||||
|
KerberosAuthenticationHandler.KEYTAB,
|
||||||
|
httpSpnegoKeytabFile.getAbsolutePath());
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL,
|
||||||
|
httpSpnegoPrincipal);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
||||||
|
httpSpnegoKeytabFile.getAbsolutePath());
|
||||||
|
// Enable timeline service v2
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
|
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, "localhost");
|
||||||
|
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||||
|
TEST_ROOT_DIR.getAbsolutePath());
|
||||||
|
conf.set("hadoop.proxyuser.HTTP.hosts", "*");
|
||||||
|
conf.set("hadoop.proxyuser.HTTP.users", FOO_USER);
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Couldn't setup TimelineServer V2.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initialize() throws Exception {
|
||||||
|
if (withSsl) {
|
||||||
|
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||||
|
HttpConfig.Policy.HTTPS_ONLY.name());
|
||||||
|
File base = new File(BASEDIR);
|
||||||
|
FileUtil.fullyDelete(base);
|
||||||
|
base.mkdirs();
|
||||||
|
keystoresDir = new File(BASEDIR).getAbsolutePath();
|
||||||
|
sslConfDir =
|
||||||
|
KeyStoreTestUtil.getClasspathDir(TestTimelineAuthFilterForV2.class);
|
||||||
|
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
||||||
|
} else {
|
||||||
|
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||||
|
HttpConfig.Policy.HTTP_ONLY.name());
|
||||||
|
}
|
||||||
|
collectorManager = new DummyNodeTimelineCollectorManager();
|
||||||
|
auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
||||||
|
collectorManager, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) {
|
||||||
|
TimelineV2Client client =
|
||||||
|
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
|
||||||
|
// set the timeline service address.
|
||||||
|
String restBindAddr = collectorManager.getRestServerBindAddress();
|
||||||
|
String addr =
|
||||||
|
"localhost" + restBindAddr.substring(restBindAddr.indexOf(":"));
|
||||||
|
client.setTimelineServiceAddress(addr);
|
||||||
|
client.init(conf);
|
||||||
|
client.start();
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
if (testMiniKDC != null) {
|
||||||
|
testMiniKDC.stop();
|
||||||
|
}
|
||||||
|
FileUtil.fullyDelete(TEST_ROOT_DIR);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
if (auxService != null) {
|
||||||
|
auxService.stop();
|
||||||
|
}
|
||||||
|
if (withSsl) {
|
||||||
|
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
|
||||||
|
File base = new File(BASEDIR);
|
||||||
|
FileUtil.fullyDelete(base);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineEntity createEntity(String id, String type) {
|
||||||
|
TimelineEntity entityToStore = new TimelineEntity();
|
||||||
|
entityToStore.setId(id);
|
||||||
|
entityToStore.setType(type);
|
||||||
|
entityToStore.setCreatedTime(0L);
|
||||||
|
return entityToStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyEntity(File entityTypeDir, String id, String type)
|
||||||
|
throws IOException {
|
||||||
|
File entityFile = new File(entityTypeDir, id +
|
||||||
|
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||||
|
assertTrue(entityFile.exists());
|
||||||
|
TimelineEntity entity = readEntityFile(entityFile);
|
||||||
|
assertNotNull(entity);
|
||||||
|
assertEquals(id, entity.getId());
|
||||||
|
assertEquals(type, entity.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineEntity readEntityFile(File entityFile)
|
||||||
|
throws IOException {
|
||||||
|
BufferedReader reader = null;
|
||||||
|
String strLine;
|
||||||
|
try {
|
||||||
|
reader = new BufferedReader(new FileReader(entityFile));
|
||||||
|
while ((strLine = reader.readLine()) != null) {
|
||||||
|
if (strLine.trim().length() > 0) {
|
||||||
|
return FileSystemTimelineReaderImpl.
|
||||||
|
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutTimelineEntities() throws Exception {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
auxService.addApplication(appId);
|
||||||
|
final String entityType = "dummy_type";
|
||||||
|
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
|
||||||
|
File.separator + "entities" + File.separator +
|
||||||
|
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + "test_user" +
|
||||||
|
File.separator + "test_flow_name" + File.separator +
|
||||||
|
"test_flow_version" + File.separator + "1" + File.separator +
|
||||||
|
appId.toString() + File.separator + entityType);
|
||||||
|
try {
|
||||||
|
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
TimelineV2Client client = createTimelineClientForUGI(appId);
|
||||||
|
try {
|
||||||
|
// Sync call. Results available immediately.
|
||||||
|
client.putEntities(createEntity("entity1", entityType));
|
||||||
|
assertEquals(1, entityTypeDir.listFiles().length);
|
||||||
|
verifyEntity(entityTypeDir, "entity1", entityType);
|
||||||
|
// Async call.
|
||||||
|
client.putEntitiesAsync(createEntity("entity2", entityType));
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Wait for async entity to be published.
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
if (entityTypeDir.listFiles().length == 2) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
assertEquals(2, entityTypeDir.listFiles().length);
|
||||||
|
verifyEntity(entityTypeDir, "entity2", entityType);
|
||||||
|
} finally {
|
||||||
|
FileUtils.deleteQuietly(entityTypeDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DummyNodeTimelineCollectorManager extends
|
||||||
|
NodeTimelineCollectorManager {
|
||||||
|
DummyNodeTimelineCollectorManager() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CollectorNodemanagerProtocol getNMCollectorService() {
|
||||||
|
CollectorNodemanagerProtocol protocol =
|
||||||
|
mock(CollectorNodemanagerProtocol.class);
|
||||||
|
try {
|
||||||
|
GetTimelineCollectorContextResponse response =
|
||||||
|
GetTimelineCollectorContextResponse.newInstance("test_user",
|
||||||
|
"test_flow_name", "test_flow_version", 1L);
|
||||||
|
when(protocol.getTimelineCollectorContext(any(
|
||||||
|
GetTimelineCollectorContextRequest.class))).thenReturn(response);
|
||||||
|
} catch (YarnException | IOException e) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
return protocol;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,28 +0,0 @@
|
||||||
#
|
|
||||||
# Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
# or more contributor license agreements. See the NOTICE file
|
|
||||||
# distributed with this work for additional information
|
|
||||||
# regarding copyright ownership. The ASF licenses this file
|
|
||||||
# to you under the Apache License, Version 2.0 (the
|
|
||||||
# "License"); you may not use this file except in compliance
|
|
||||||
# with the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
#
|
|
||||||
[libdefaults]
|
|
||||||
default_realm = APACHE.ORG
|
|
||||||
udp_preference_limit = 1
|
|
||||||
extra_addresses = 127.0.0.1
|
|
||||||
[realms]
|
|
||||||
APACHE.ORG = {
|
|
||||||
admin_server = localhost:88
|
|
||||||
kdc = localhost:88
|
|
||||||
}
|
|
||||||
[domain_realm]
|
|
||||||
localhost = APACHE.ORG
|
|
Loading…
Reference in New Issue