YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs on Timeline service event data. Contributed by Zhijie Shen.
(cherry picked from commit d78b452a4f
)
This commit is contained in:
parent
d5d9fd3255
commit
5d251d99d6
|
@ -67,6 +67,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2569. Added the log handling APIs for the long running services. (Xuan
|
YARN-2569. Added the log handling APIs for the long running services. (Xuan
|
||||||
Gong via zjshen)
|
Gong via zjshen)
|
||||||
|
|
||||||
|
YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs
|
||||||
|
on Timeline service event data. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.records.timeline;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* This class contains the information about a timeline domain, which is used
|
||||||
|
* to a user to host a number of timeline entities, isolating them from others'.
|
||||||
|
* The user can also define the reader and writer users/groups for the the
|
||||||
|
* domain, which is used to control the access to its entities.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* The reader and writer users/groups pattern that the user can supply is the
|
||||||
|
* same as what <code>AccessControlList</code> takes.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@XmlRootElement(name = "domain")
|
||||||
|
@XmlAccessorType(XmlAccessType.NONE)
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class TimelineDomain {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
private String description;
|
||||||
|
private String owner;
|
||||||
|
private String readers;
|
||||||
|
private String writers;
|
||||||
|
private Long createdTime;
|
||||||
|
private Long modifiedTime;
|
||||||
|
|
||||||
|
public TimelineDomain() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the domain ID
|
||||||
|
*
|
||||||
|
* @return the domain ID
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "id")
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the domain ID
|
||||||
|
*
|
||||||
|
* @param id the domain ID
|
||||||
|
*/
|
||||||
|
public void setId(String id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the domain description
|
||||||
|
*
|
||||||
|
* @return the domain description
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "description")
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the domain description
|
||||||
|
*
|
||||||
|
* @param description the domain description
|
||||||
|
*/
|
||||||
|
public void setDescription(String description) {
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the domain owner
|
||||||
|
*
|
||||||
|
* @return the domain owner
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "owner")
|
||||||
|
public String getOwner() {
|
||||||
|
return owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the domain owner. The user doesn't need to set it, which will
|
||||||
|
* automatically set to the user who puts the domain.
|
||||||
|
*
|
||||||
|
* @param owner the domain owner
|
||||||
|
*/
|
||||||
|
public void setOwner(String owner) {
|
||||||
|
this.owner = owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the reader (and/or reader group) list string
|
||||||
|
*
|
||||||
|
* @return the reader (and/or reader group) list string
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "readers")
|
||||||
|
public String getReaders() {
|
||||||
|
return readers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the reader (and/or reader group) list string
|
||||||
|
*
|
||||||
|
* @param readers the reader (and/or reader group) list string
|
||||||
|
*/
|
||||||
|
public void setReaders(String readers) {
|
||||||
|
this.readers = readers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the writer (and/or writer group) list string
|
||||||
|
*
|
||||||
|
* @return the writer (and/or writer group) list string
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "writers")
|
||||||
|
public String getWriters() {
|
||||||
|
return writers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the writer (and/or writer group) list string
|
||||||
|
*
|
||||||
|
* @param writers the writer (and/or writer group) list string
|
||||||
|
*/
|
||||||
|
public void setWriters(String writers) {
|
||||||
|
this.writers = writers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the created time of the domain
|
||||||
|
*
|
||||||
|
* @return the created time of the domain
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "createdtime")
|
||||||
|
public Long getCreatedTime() {
|
||||||
|
return createdTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the created time of the domain
|
||||||
|
*
|
||||||
|
* @param createdTime the created time of the domain
|
||||||
|
*/
|
||||||
|
public void setCreatedTime(Long createdTime) {
|
||||||
|
this.createdTime = createdTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the modified time of the domain
|
||||||
|
*
|
||||||
|
* @return the modified time of the domain
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "modifiedtime")
|
||||||
|
public Long getModifiedTime() {
|
||||||
|
return modifiedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the modified time of the domain
|
||||||
|
*
|
||||||
|
* @param modifiedTime the modified time of the domain
|
||||||
|
*/
|
||||||
|
public void setModifiedTime(Long modifiedTime) {
|
||||||
|
this.modifiedTime = modifiedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.records.timeline;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The class that hosts a list of timeline domains.
|
||||||
|
*/
|
||||||
|
@XmlRootElement(name = "domains")
|
||||||
|
@XmlAccessorType(XmlAccessType.NONE)
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class TimelineDomains {
|
||||||
|
|
||||||
|
private List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
||||||
|
|
||||||
|
public TimelineDomains() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of domains
|
||||||
|
*
|
||||||
|
* @return a list of domains
|
||||||
|
*/
|
||||||
|
@XmlElement(name = "domains")
|
||||||
|
public List<TimelineDomain> getDomains() {
|
||||||
|
return domains;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a single domain into the existing domain list
|
||||||
|
*
|
||||||
|
* @param domain
|
||||||
|
* a single domain
|
||||||
|
*/
|
||||||
|
public void addDomain(TimelineDomain domain) {
|
||||||
|
domains.add(domain);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All a list of domains into the existing domain list
|
||||||
|
*
|
||||||
|
* @param domains
|
||||||
|
* a list of domains
|
||||||
|
*/
|
||||||
|
public void addDomains(List<TimelineDomain> domains) {
|
||||||
|
this.domains.addAll(domains);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the domain list to the given list of domains
|
||||||
|
*
|
||||||
|
* @param domains
|
||||||
|
* a list of domains
|
||||||
|
*/
|
||||||
|
public void setDomains(List<TimelineDomain> domains) {
|
||||||
|
this.domains = domains;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -67,6 +68,22 @@ public abstract class TimelineClient extends AbstractService {
|
||||||
public abstract TimelinePutResponse putEntities(
|
public abstract TimelinePutResponse putEntities(
|
||||||
TimelineEntity... entities) throws IOException, YarnException;
|
TimelineEntity... entities) throws IOException, YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Send the information of a domain to the timeline server. It is a
|
||||||
|
* blocking API. The method will not return until it gets the response from
|
||||||
|
* the timeline server.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param domain
|
||||||
|
* an {@link TimelineDomain} object
|
||||||
|
* @throws IOException
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
public abstract void putDomain(
|
||||||
|
TimelineDomain domain) throws IOException, YarnException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Get a delegation token so as to be able to talk to the timeline server in a
|
* Get a delegation token so as to be able to talk to the timeline server in a
|
||||||
|
|
|
@ -50,6 +50,8 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
|
@ -84,11 +86,15 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
||||||
|
|
||||||
private static Options opts;
|
private static Options opts;
|
||||||
|
private static final String ENTITY_DATA_TYPE = "entity";
|
||||||
|
private static final String DOMAIN_DATA_TYPE = "domain";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
opts = new Options();
|
opts = new Options();
|
||||||
opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
|
opts.addOption("put", true, "Put the timeline entities/domain in a JSON file");
|
||||||
opts.getOption("put").setArgName("Path to the JSON file");
|
opts.getOption("put").setArgName("Path to the JSON file");
|
||||||
|
opts.addOption(ENTITY_DATA_TYPE, false, "Specify the JSON file contains the entities");
|
||||||
|
opts.addOption(DOMAIN_DATA_TYPE, false, "Specify the JSON file contains the domain");
|
||||||
opts.addOption("help", false, "Print usage");
|
opts.addOption("help", false, "Print usage");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,9 +156,27 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
}
|
}
|
||||||
TimelineEntities entitiesContainer = new TimelineEntities();
|
TimelineEntities entitiesContainer = new TimelineEntities();
|
||||||
entitiesContainer.addEntities(Arrays.asList(entities));
|
entitiesContainer.addEntities(Arrays.asList(entities));
|
||||||
|
ClientResponse resp = doPosting(entitiesContainer, null);
|
||||||
|
return resp.getEntity(TimelinePutResponse.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putDomain(TimelineDomain domain) throws IOException,
|
||||||
|
YarnException {
|
||||||
|
if (!isEnabled) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Nothing will be put because timeline service is not enabled");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
doPosting(domain, "domain");
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClientResponse doPosting(Object obj, String path) throws IOException, YarnException {
|
||||||
ClientResponse resp;
|
ClientResponse resp;
|
||||||
try {
|
try {
|
||||||
resp = doPostingEntities(entitiesContainer);
|
resp = doPostingObject(obj, path);
|
||||||
} catch (RuntimeException re) {
|
} catch (RuntimeException re) {
|
||||||
// runtime exception is expected if the client cannot connect the server
|
// runtime exception is expected if the client cannot connect the server
|
||||||
String msg =
|
String msg =
|
||||||
|
@ -172,7 +196,7 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
}
|
}
|
||||||
throw new YarnException(msg);
|
throw new YarnException(msg);
|
||||||
}
|
}
|
||||||
return resp.getEntity(TimelinePutResponse.class);
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -184,11 +208,22 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ClientResponse doPostingEntities(TimelineEntities entities) {
|
public ClientResponse doPostingObject(Object object, String path) {
|
||||||
WebResource webResource = client.resource(resURI);
|
WebResource webResource = client.resource(resURI);
|
||||||
return webResource.accept(MediaType.APPLICATION_JSON)
|
if (path != null) {
|
||||||
.type(MediaType.APPLICATION_JSON)
|
webResource.path(path);
|
||||||
.post(ClientResponse.class, entities);
|
}
|
||||||
|
if (path == null) {
|
||||||
|
return webResource.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.post(ClientResponse.class, object);
|
||||||
|
} else if (path.equals("domain")) {
|
||||||
|
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, object);
|
||||||
|
} else {
|
||||||
|
throw new YarnRuntimeException("Unknown resource type");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class PseudoAuthenticatedURLConnectionFactory
|
private static class PseudoAuthenticatedURLConnectionFactory
|
||||||
|
@ -334,8 +369,13 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
if (cliParser.hasOption("put")) {
|
if (cliParser.hasOption("put")) {
|
||||||
String path = cliParser.getOptionValue("put");
|
String path = cliParser.getOptionValue("put");
|
||||||
if (path != null && path.length() > 0) {
|
if (path != null && path.length() > 0) {
|
||||||
putTimelineEntitiesInJSONFile(path);
|
if (cliParser.hasOption(ENTITY_DATA_TYPE)) {
|
||||||
return;
|
putTimelineDataInJSONFile(path, ENTITY_DATA_TYPE);
|
||||||
|
return;
|
||||||
|
} else if (cliParser.hasOption(DOMAIN_DATA_TYPE)) {
|
||||||
|
putTimelineDataInJSONFile(path, DOMAIN_DATA_TYPE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printUsage();
|
printUsage();
|
||||||
|
@ -345,22 +385,28 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
* Put timeline data in a JSON file via command line.
|
* Put timeline data in a JSON file via command line.
|
||||||
*
|
*
|
||||||
* @param path
|
* @param path
|
||||||
* path to the {@link TimelineEntities} JSON file
|
* path to the timeline data JSON file
|
||||||
|
* @param type
|
||||||
|
* the type of the timeline data in the JSON file
|
||||||
*/
|
*/
|
||||||
private static void putTimelineEntitiesInJSONFile(String path) {
|
private static void putTimelineDataInJSONFile(String path, String type) {
|
||||||
File jsonFile = new File(path);
|
File jsonFile = new File(path);
|
||||||
if (!jsonFile.exists()) {
|
if (!jsonFile.exists()) {
|
||||||
System.out.println("Error: File [" + jsonFile.getAbsolutePath()
|
LOG.error("File [" + jsonFile.getAbsolutePath() + "] doesn't exist");
|
||||||
+ "] doesn't exist");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
|
YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
|
||||||
TimelineEntities entities = null;
|
TimelineEntities entities = null;
|
||||||
|
TimelineDomains domains = null;
|
||||||
try {
|
try {
|
||||||
entities = mapper.readValue(jsonFile, TimelineEntities.class);
|
if (type.equals(ENTITY_DATA_TYPE)) {
|
||||||
|
entities = mapper.readValue(jsonFile, TimelineEntities.class);
|
||||||
|
} else if (type.equals(DOMAIN_DATA_TYPE)){
|
||||||
|
domains = mapper.readValue(jsonFile, TimelineDomains.class);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.err.println("Error: " + e.getMessage());
|
LOG.error("Error when reading " + e.getMessage());
|
||||||
e.printStackTrace(System.err);
|
e.printStackTrace(System.err);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -376,21 +422,37 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
UserGroupInformation.getCurrentUser().getUserName());
|
UserGroupInformation.getCurrentUser().getUserName());
|
||||||
UserGroupInformation.getCurrentUser().addToken(token);
|
UserGroupInformation.getCurrentUser().addToken(token);
|
||||||
}
|
}
|
||||||
TimelinePutResponse response = client.putEntities(
|
if (type.equals(ENTITY_DATA_TYPE)) {
|
||||||
entities.getEntities().toArray(
|
TimelinePutResponse response = client.putEntities(
|
||||||
new TimelineEntity[entities.getEntities().size()]));
|
entities.getEntities().toArray(
|
||||||
if (response.getErrors().size() == 0) {
|
new TimelineEntity[entities.getEntities().size()]));
|
||||||
System.out.println("Timeline data is successfully put");
|
if (response.getErrors().size() == 0) {
|
||||||
} else {
|
LOG.info("Timeline entities are successfully put");
|
||||||
for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
|
} else {
|
||||||
System.out.println("TimelineEntity [" + error.getEntityType() + ":" +
|
for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
|
||||||
error.getEntityId() + "] is not successfully put. Error code: " +
|
LOG.error("TimelineEntity [" + error.getEntityType() + ":" +
|
||||||
error.getErrorCode());
|
error.getEntityId() + "] is not successfully put. Error code: " +
|
||||||
|
error.getErrorCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (type.equals(DOMAIN_DATA_TYPE)) {
|
||||||
|
boolean hasError = false;
|
||||||
|
for (TimelineDomain domain : domains.getDomains()) {
|
||||||
|
try {
|
||||||
|
client.putDomain(domain);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error when putting domain " + domain.getId(), e);
|
||||||
|
hasError = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!hasError) {
|
||||||
|
LOG.info("Timeline domains are successfully put");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch(RuntimeException e) {
|
||||||
|
LOG.error("Error when putting the timeline data", e);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.err.println("Error: " + e.getMessage());
|
LOG.error("Error when putting the timeline data", e);
|
||||||
e.printStackTrace(System.err);
|
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,4 +161,42 @@ public class TestTimelineRecords {
|
||||||
Assert.assertEquals(error2.getErrorCode(), e.getErrorCode());
|
Assert.assertEquals(error2.getErrorCode(), e.getErrorCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimelineDomain() throws Exception {
|
||||||
|
TimelineDomains domains = new TimelineDomains();
|
||||||
|
|
||||||
|
TimelineDomain domain = null;
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
domain = new TimelineDomain();
|
||||||
|
domain.setId("test id " + (i + 1));
|
||||||
|
domain.setDescription("test description " + (i + 1));
|
||||||
|
domain.setOwner("test owner " + (i + 1));
|
||||||
|
domain.setReaders("test_reader_user_" + (i + 1) +
|
||||||
|
" test_reader_group+" + (i + 1));
|
||||||
|
domain.setWriters("test_writer_user_" + (i + 1) +
|
||||||
|
" test_writer_group+" + (i + 1));
|
||||||
|
domain.setCreatedTime(0L);
|
||||||
|
domain.setModifiedTime(1L);
|
||||||
|
domains.addDomain(domain);
|
||||||
|
}
|
||||||
|
LOG.info("Domain in JSON:");
|
||||||
|
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(domains, true));
|
||||||
|
|
||||||
|
Assert.assertEquals(2, domains.getDomains().size());
|
||||||
|
|
||||||
|
for (int i = 0; i < domains.getDomains().size(); ++i) {
|
||||||
|
domain = domains.getDomains().get(i);
|
||||||
|
Assert.assertEquals("test id " + (i + 1), domain.getId());
|
||||||
|
Assert.assertEquals("test description " + (i + 1),
|
||||||
|
domain.getDescription());
|
||||||
|
Assert.assertEquals("test owner " + (i + 1), domain.getOwner());
|
||||||
|
Assert.assertEquals("test_reader_user_" + (i + 1) +
|
||||||
|
" test_reader_group+" + (i + 1), domain.getReaders());
|
||||||
|
Assert.assertEquals("test_writer_user_" + (i + 1) +
|
||||||
|
" test_writer_group+" + (i + 1), domain.getWriters());
|
||||||
|
Assert.assertEquals(new Long(0L), domain.getCreatedTime());
|
||||||
|
Assert.assertEquals(new Long(1L), domain.getModifiedTime());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,16 +27,16 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ public class TestTimelineClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPostEntities() throws Exception {
|
public void testPostEntities() throws Exception {
|
||||||
mockClientResponse(client, ClientResponse.Status.OK, false, false);
|
mockEntityClientResponse(client, ClientResponse.Status.OK, false, false);
|
||||||
try {
|
try {
|
||||||
TimelinePutResponse response = client.putEntities(generateEntity());
|
TimelinePutResponse response = client.putEntities(generateEntity());
|
||||||
Assert.assertEquals(0, response.getErrors().size());
|
Assert.assertEquals(0, response.getErrors().size());
|
||||||
|
@ -74,7 +74,7 @@ public class TestTimelineClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPostEntitiesWithError() throws Exception {
|
public void testPostEntitiesWithError() throws Exception {
|
||||||
mockClientResponse(client, ClientResponse.Status.OK, true, false);
|
mockEntityClientResponse(client, ClientResponse.Status.OK, true, false);
|
||||||
try {
|
try {
|
||||||
TimelinePutResponse response = client.putEntities(generateEntity());
|
TimelinePutResponse response = client.putEntities(generateEntity());
|
||||||
Assert.assertEquals(1, response.getErrors().size());
|
Assert.assertEquals(1, response.getErrors().size());
|
||||||
|
@ -91,7 +91,7 @@ public class TestTimelineClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPostEntitiesNoResponse() throws Exception {
|
public void testPostEntitiesNoResponse() throws Exception {
|
||||||
mockClientResponse(
|
mockEntityClientResponse(
|
||||||
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
|
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
|
||||||
try {
|
try {
|
||||||
client.putEntities(generateEntity());
|
client.putEntities(generateEntity());
|
||||||
|
@ -104,7 +104,7 @@ public class TestTimelineClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPostEntitiesConnectionRefused() throws Exception {
|
public void testPostEntitiesConnectionRefused() throws Exception {
|
||||||
mockClientResponse(client, null, false, true);
|
mockEntityClientResponse(client, null, false, true);
|
||||||
try {
|
try {
|
||||||
client.putEntities(generateEntity());
|
client.putEntities(generateEntity());
|
||||||
Assert.fail("RuntimeException is expected");
|
Assert.fail("RuntimeException is expected");
|
||||||
|
@ -118,7 +118,7 @@ public class TestTimelineClient {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
|
||||||
TimelineClientImpl client = createTimelineClient(conf);
|
TimelineClientImpl client = createTimelineClient(conf);
|
||||||
mockClientResponse(
|
mockEntityClientResponse(
|
||||||
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
|
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
|
||||||
try {
|
try {
|
||||||
TimelinePutResponse response = client.putEntities(generateEntity());
|
TimelinePutResponse response = client.putEntities(generateEntity());
|
||||||
|
@ -137,7 +137,7 @@ public class TestTimelineClient {
|
||||||
// Make sure default value is pickup up
|
// Make sure default value is pickup up
|
||||||
conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
|
conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
|
||||||
TimelineClientImpl client = createTimelineClient(conf);
|
TimelineClientImpl client = createTimelineClient(conf);
|
||||||
mockClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR,
|
mockEntityClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR,
|
||||||
false, false);
|
false, false);
|
||||||
try {
|
try {
|
||||||
TimelinePutResponse response = client.putEntities(generateEntity());
|
TimelinePutResponse response = client.putEntities(generateEntity());
|
||||||
|
@ -148,16 +148,50 @@ public class TestTimelineClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ClientResponse mockClientResponse(TimelineClientImpl client,
|
@Test
|
||||||
ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) {
|
public void testPutDomain() throws Exception {
|
||||||
|
mockDomainClientResponse(client, ClientResponse.Status.OK, false);
|
||||||
|
try {
|
||||||
|
client.putDomain(generateDomain());
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutDomainNoResponse() throws Exception {
|
||||||
|
mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false);
|
||||||
|
try {
|
||||||
|
client.putDomain(generateDomain());
|
||||||
|
Assert.fail("Exception is expected");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"Failed to get the response from the timeline server."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutDomainConnectionRefused() throws Exception {
|
||||||
|
mockDomainClientResponse(client, null, true);
|
||||||
|
try {
|
||||||
|
client.putDomain(generateDomain());
|
||||||
|
Assert.fail("RuntimeException is expected");
|
||||||
|
} catch (RuntimeException re) {
|
||||||
|
Assert.assertTrue(re instanceof ClientHandlerException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ClientResponse mockEntityClientResponse(
|
||||||
|
TimelineClientImpl client, ClientResponse.Status status,
|
||||||
|
boolean hasError, boolean hasRuntimeError) {
|
||||||
ClientResponse response = mock(ClientResponse.class);
|
ClientResponse response = mock(ClientResponse.class);
|
||||||
if (hasRuntimeError) {
|
if (hasRuntimeError) {
|
||||||
doThrow(new ClientHandlerException(new ConnectException())).when(client)
|
doThrow(new ClientHandlerException(new ConnectException())).when(client)
|
||||||
.doPostingEntities(any(TimelineEntities.class));
|
.doPostingObject(any(TimelineEntities.class), any(String.class));
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
doReturn(response).when(client)
|
doReturn(response).when(client)
|
||||||
.doPostingEntities(any(TimelineEntities.class));
|
.doPostingObject(any(TimelineEntities.class), any(String.class));
|
||||||
when(response.getClientResponseStatus()).thenReturn(status);
|
when(response.getClientResponseStatus()).thenReturn(status);
|
||||||
TimelinePutResponse.TimelinePutError error =
|
TimelinePutResponse.TimelinePutError error =
|
||||||
new TimelinePutResponse.TimelinePutError();
|
new TimelinePutResponse.TimelinePutError();
|
||||||
|
@ -172,6 +206,21 @@ public class TestTimelineClient {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ClientResponse mockDomainClientResponse(
|
||||||
|
TimelineClientImpl client, ClientResponse.Status status,
|
||||||
|
boolean hasRuntimeError) {
|
||||||
|
ClientResponse response = mock(ClientResponse.class);
|
||||||
|
if (hasRuntimeError) {
|
||||||
|
doThrow(new ClientHandlerException(new ConnectException())).when(client)
|
||||||
|
.doPostingObject(any(TimelineDomain.class), any(String.class));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
doReturn(response).when(client)
|
||||||
|
.doPostingObject(any(TimelineDomain.class), any(String.class));
|
||||||
|
when(response.getClientResponseStatus()).thenReturn(status);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
private static TimelineEntity generateEntity() {
|
private static TimelineEntity generateEntity() {
|
||||||
TimelineEntity entity = new TimelineEntity();
|
TimelineEntity entity = new TimelineEntity();
|
||||||
entity.setEntityId("entity id");
|
entity.setEntityId("entity id");
|
||||||
|
@ -194,6 +243,18 @@ public class TestTimelineClient {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static TimelineDomain generateDomain() {
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setId("namesapce id");
|
||||||
|
domain.setDescription("domain description");
|
||||||
|
domain.setOwner("domain owner");
|
||||||
|
domain.setReaders("domain_reader");
|
||||||
|
domain.setWriters("domain_writer");
|
||||||
|
domain.setCreatedTime(0L);
|
||||||
|
domain.setModifiedTime(1L);
|
||||||
|
return domain;
|
||||||
|
}
|
||||||
|
|
||||||
private static TimelineClientImpl createTimelineClient(
|
private static TimelineClientImpl createTimelineClient(
|
||||||
YarnConfiguration conf) {
|
YarnConfiguration conf) {
|
||||||
TimelineClientImpl client =
|
TimelineClientImpl client =
|
||||||
|
|
|
@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -145,6 +147,14 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
|
private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
|
||||||
"z".getBytes();
|
"z".getBytes();
|
||||||
|
|
||||||
|
private static final byte[] DOMAIN_ENTRY_PREFIX = "d".getBytes();
|
||||||
|
private static final byte[] OWNER_LOOKUP_PREFIX = "o".getBytes();
|
||||||
|
private static final byte[] DESCRIPTION_COLUMN = "d".getBytes();
|
||||||
|
private static final byte[] OWNER_COLUMN = "o".getBytes();
|
||||||
|
private static final byte[] READER_COLUMN = "r".getBytes();
|
||||||
|
private static final byte[] WRITER_COLUMN = "w".getBytes();
|
||||||
|
private static final byte[] TIMESTAMP_COLUMN = "t".getBytes();
|
||||||
|
|
||||||
private static final byte[] EMPTY_BYTES = new byte[0];
|
private static final byte[] EMPTY_BYTES = new byte[0];
|
||||||
|
|
||||||
private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
|
private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
|
||||||
|
@ -1558,5 +1568,209 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
throw new IOException(incompatibleMessage);
|
throw new IOException(incompatibleMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: make data retention work with the domain data as well
|
||||||
|
@Override
|
||||||
|
public void put(TimelineDomain domain) throws IOException {
|
||||||
|
WriteBatch writeBatch = null;
|
||||||
|
try {
|
||||||
|
writeBatch = db.createWriteBatch();
|
||||||
|
if (domain.getId() == null || domain.getId().length() == 0) {
|
||||||
|
throw new IllegalArgumentException("Domain doesn't have an ID");
|
||||||
|
}
|
||||||
|
if (domain.getOwner() == null || domain.getOwner().length() == 0) {
|
||||||
|
throw new IllegalArgumentException("Domain doesn't have an owner.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write description
|
||||||
|
byte[] domainEntryKey = createDomainEntryKey(
|
||||||
|
domain.getId(), DESCRIPTION_COLUMN);
|
||||||
|
byte[] ownerLookupEntryKey = createOwnerLookupKey(
|
||||||
|
domain.getOwner(), domain.getId(), DESCRIPTION_COLUMN);
|
||||||
|
if (domain.getDescription() != null) {
|
||||||
|
writeBatch.put(domainEntryKey, domain.getDescription().getBytes());
|
||||||
|
writeBatch.put(ownerLookupEntryKey, domain.getDescription().getBytes());
|
||||||
|
} else {
|
||||||
|
writeBatch.put(domainEntryKey, EMPTY_BYTES);
|
||||||
|
writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write owner
|
||||||
|
domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
|
||||||
|
ownerLookupEntryKey = createOwnerLookupKey(
|
||||||
|
domain.getOwner(), domain.getId(), OWNER_COLUMN);
|
||||||
|
// Null check for owner is done before
|
||||||
|
writeBatch.put(domainEntryKey, domain.getOwner().getBytes());
|
||||||
|
writeBatch.put(ownerLookupEntryKey, domain.getOwner().getBytes());
|
||||||
|
|
||||||
|
// Write readers
|
||||||
|
domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
|
||||||
|
ownerLookupEntryKey = createOwnerLookupKey(
|
||||||
|
domain.getOwner(), domain.getId(), READER_COLUMN);
|
||||||
|
if (domain.getReaders() != null && domain.getReaders().length() > 0) {
|
||||||
|
writeBatch.put(domainEntryKey, domain.getReaders().getBytes());
|
||||||
|
writeBatch.put(ownerLookupEntryKey, domain.getReaders().getBytes());
|
||||||
|
} else {
|
||||||
|
writeBatch.put(domainEntryKey, EMPTY_BYTES);
|
||||||
|
writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writers
|
||||||
|
domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
|
||||||
|
ownerLookupEntryKey = createOwnerLookupKey(
|
||||||
|
domain.getOwner(), domain.getId(), WRITER_COLUMN);
|
||||||
|
if (domain.getWriters() != null && domain.getWriters().length() > 0) {
|
||||||
|
writeBatch.put(domainEntryKey, domain.getWriters().getBytes());
|
||||||
|
writeBatch.put(ownerLookupEntryKey, domain.getWriters().getBytes());
|
||||||
|
} else {
|
||||||
|
writeBatch.put(domainEntryKey, EMPTY_BYTES);
|
||||||
|
writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write creation time and modification time
|
||||||
|
// We put both timestamps together because they are always retrieved
|
||||||
|
// together, and store them in the same way as we did for the entity's
|
||||||
|
// start time and insert time.
|
||||||
|
domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
|
||||||
|
ownerLookupEntryKey = createOwnerLookupKey(
|
||||||
|
domain.getOwner(), domain.getId(), TIMESTAMP_COLUMN);
|
||||||
|
long currentTimestamp = System.currentTimeMillis();
|
||||||
|
byte[] timestamps = db.get(domainEntryKey);
|
||||||
|
if (timestamps == null) {
|
||||||
|
timestamps = new byte[16];
|
||||||
|
writeReverseOrderedLong(currentTimestamp, timestamps, 0);
|
||||||
|
writeReverseOrderedLong(currentTimestamp, timestamps, 8);
|
||||||
|
} else {
|
||||||
|
writeReverseOrderedLong(currentTimestamp, timestamps, 8);
|
||||||
|
}
|
||||||
|
writeBatch.put(domainEntryKey, timestamps);
|
||||||
|
writeBatch.put(ownerLookupEntryKey, timestamps);
|
||||||
|
db.write(writeBatch);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, writeBatch);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a domain entity key with column name suffix,
|
||||||
|
* of the form DOMAIN_ENTRY_PREFIX + domain id + column name.
|
||||||
|
*/
|
||||||
|
private static byte[] createDomainEntryKey(String domainId,
|
||||||
|
byte[] columnName) throws IOException {
|
||||||
|
return KeyBuilder.newInstance().add(DOMAIN_ENTRY_PREFIX)
|
||||||
|
.add(domainId).add(columnName).getBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an owner lookup key with column name suffix,
|
||||||
|
* of the form OWNER_LOOKUP_PREFIX + owner + domain id + column name.
|
||||||
|
*/
|
||||||
|
private static byte[] createOwnerLookupKey(
|
||||||
|
String owner, String domainId, byte[] columnName) throws IOException {
|
||||||
|
return KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX)
|
||||||
|
.add(owner).add(domainId).add(columnName).getBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDomain getDomain(String domainId)
|
||||||
|
throws IOException {
|
||||||
|
DBIterator iterator = null;
|
||||||
|
try {
|
||||||
|
byte[] prefix = KeyBuilder.newInstance()
|
||||||
|
.add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
|
||||||
|
iterator = db.iterator();
|
||||||
|
iterator.seek(prefix);
|
||||||
|
return getTimelineDomain(iterator, domainId, prefix);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, iterator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDomains getDomains(String owner)
|
||||||
|
throws IOException {
|
||||||
|
DBIterator iterator = null;
|
||||||
|
try {
|
||||||
|
byte[] prefix = KeyBuilder.newInstance()
|
||||||
|
.add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
|
||||||
|
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
||||||
|
for (iterator = db.iterator(), iterator.seek(prefix);
|
||||||
|
iterator.hasNext();) {
|
||||||
|
byte[] key = iterator.peekNext().getKey();
|
||||||
|
if (!prefixMatches(prefix, prefix.length, key)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Iterator to parse the rows of an individual domain
|
||||||
|
KeyParser kp = new KeyParser(key, prefix.length);
|
||||||
|
String domainId = kp.getNextString();
|
||||||
|
byte[] prefixExt = KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX)
|
||||||
|
.add(owner).add(domainId).getBytesForLookup();
|
||||||
|
TimelineDomain domainToReturn =
|
||||||
|
getTimelineDomain(iterator, domainId, prefixExt);
|
||||||
|
if (domainToReturn != null) {
|
||||||
|
domains.add(domainToReturn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Sort the domains to return
|
||||||
|
Collections.sort(domains, new Comparator<TimelineDomain>() {
|
||||||
|
@Override
|
||||||
|
public int compare(
|
||||||
|
TimelineDomain domain1, TimelineDomain domain2) {
|
||||||
|
int result = domain2.getCreatedTime().compareTo(
|
||||||
|
domain1.getCreatedTime());
|
||||||
|
if (result == 0) {
|
||||||
|
return domain2.getModifiedTime().compareTo(
|
||||||
|
domain1.getModifiedTime());
|
||||||
|
} else {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
TimelineDomains domainsToReturn = new TimelineDomains();
|
||||||
|
domainsToReturn.addDomains(domains);
|
||||||
|
return domainsToReturn;
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, iterator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineDomain getTimelineDomain(
|
||||||
|
DBIterator iterator, String domainId, byte[] prefix) throws IOException {
|
||||||
|
// Iterate over all the rows whose key starts with prefix to retrieve the
|
||||||
|
// domain information.
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setId(domainId);
|
||||||
|
boolean noRows = true;
|
||||||
|
for (; iterator.hasNext(); iterator.next()) {
|
||||||
|
byte[] key = iterator.peekNext().getKey();
|
||||||
|
if (!prefixMatches(prefix, prefix.length, key)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (noRows) {
|
||||||
|
noRows = false;
|
||||||
|
}
|
||||||
|
byte[] value = iterator.peekNext().getValue();
|
||||||
|
if (value != null && value.length > 0) {
|
||||||
|
if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
|
||||||
|
domain.setDescription(new String(value));
|
||||||
|
} else if (key[prefix.length] == OWNER_COLUMN[0]) {
|
||||||
|
domain.setOwner(new String(value));
|
||||||
|
} else if (key[prefix.length] == READER_COLUMN[0]) {
|
||||||
|
domain.setReaders(new String(value));
|
||||||
|
} else if (key[prefix.length] == WRITER_COLUMN[0]) {
|
||||||
|
domain.setWriters(new String(value));
|
||||||
|
} else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
|
||||||
|
domain.setCreatedTime(readReverseOrderedLong(value, 0));
|
||||||
|
domain.setModifiedTime(readReverseOrderedLong(value, 8));
|
||||||
|
} else {
|
||||||
|
LOG.error("Unrecognized domain column: " + key[prefix.length]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (noRows) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return domain;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timeline;
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
|
|
||||||
|
@ -61,6 +66,10 @@ public class MemoryTimelineStore
|
||||||
new HashMap<EntityIdentifier, TimelineEntity>();
|
new HashMap<EntityIdentifier, TimelineEntity>();
|
||||||
private Map<EntityIdentifier, Long> entityInsertTimes =
|
private Map<EntityIdentifier, Long> entityInsertTimes =
|
||||||
new HashMap<EntityIdentifier, Long>();
|
new HashMap<EntityIdentifier, Long>();
|
||||||
|
private Map<String, TimelineDomain> domainsById =
|
||||||
|
new HashMap<String, TimelineDomain>();
|
||||||
|
private Map<String, Set<TimelineDomain>> domainsByOwner =
|
||||||
|
new HashMap<String, Set<TimelineDomain>>();
|
||||||
|
|
||||||
public MemoryTimelineStore() {
|
public MemoryTimelineStore() {
|
||||||
super(MemoryTimelineStore.class.getName());
|
super(MemoryTimelineStore.class.getName());
|
||||||
|
@ -210,6 +219,58 @@ public class MemoryTimelineStore
|
||||||
return allEvents;
|
return allEvents;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDomain getDomain(String domainId)
|
||||||
|
throws IOException {
|
||||||
|
TimelineDomain domain = domainsById.get(domainId);
|
||||||
|
if (domain == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return createTimelineDomain(
|
||||||
|
domain.getId(),
|
||||||
|
domain.getDescription(),
|
||||||
|
domain.getOwner(),
|
||||||
|
domain.getReaders(),
|
||||||
|
domain.getWriters(),
|
||||||
|
domain.getCreatedTime(),
|
||||||
|
domain.getModifiedTime());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDomains getDomains(String owner)
|
||||||
|
throws IOException {
|
||||||
|
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
||||||
|
for (TimelineDomain domain : domainsByOwner.get(owner)) {
|
||||||
|
TimelineDomain domainToReturn = createTimelineDomain(
|
||||||
|
domain.getId(),
|
||||||
|
domain.getDescription(),
|
||||||
|
domain.getOwner(),
|
||||||
|
domain.getReaders(),
|
||||||
|
domain.getWriters(),
|
||||||
|
domain.getCreatedTime(),
|
||||||
|
domain.getModifiedTime());
|
||||||
|
domains.add(domainToReturn);
|
||||||
|
}
|
||||||
|
Collections.sort(domains, new Comparator<TimelineDomain>() {
|
||||||
|
@Override
|
||||||
|
public int compare(
|
||||||
|
TimelineDomain domain1, TimelineDomain domain2) {
|
||||||
|
int result = domain2.getCreatedTime().compareTo(
|
||||||
|
domain1.getCreatedTime());
|
||||||
|
if (result == 0) {
|
||||||
|
return domain2.getModifiedTime().compareTo(
|
||||||
|
domain1.getModifiedTime());
|
||||||
|
} else {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
TimelineDomains domainsToReturn = new TimelineDomains();
|
||||||
|
domainsToReturn.addDomains(domains);
|
||||||
|
return domainsToReturn;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized TimelinePutResponse put(TimelineEntities data) {
|
public synchronized TimelinePutResponse put(TimelineEntities data) {
|
||||||
TimelinePutResponse response = new TimelinePutResponse();
|
TimelinePutResponse response = new TimelinePutResponse();
|
||||||
|
@ -308,6 +369,44 @@ public class MemoryTimelineStore
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void put(TimelineDomain domain) throws IOException {
|
||||||
|
TimelineDomain domainToReplace =
|
||||||
|
domainsById.get(domain.getId());
|
||||||
|
long currentTimestamp = System.currentTimeMillis();
|
||||||
|
TimelineDomain domainToStore = createTimelineDomain(
|
||||||
|
domain.getId(), domain.getDescription(), domain.getOwner(),
|
||||||
|
domain.getReaders(), domain.getWriters(),
|
||||||
|
(domainToReplace == null ?
|
||||||
|
currentTimestamp : domainToReplace.getCreatedTime()),
|
||||||
|
currentTimestamp);
|
||||||
|
domainsById.put(domainToStore.getId(), domainToStore);
|
||||||
|
Set<TimelineDomain> domainsByOneOwner =
|
||||||
|
domainsByOwner.get(domainToStore.getOwner());
|
||||||
|
if (domainsByOneOwner == null) {
|
||||||
|
domainsByOneOwner = new HashSet<TimelineDomain>();
|
||||||
|
domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
|
||||||
|
}
|
||||||
|
if (domainToReplace != null) {
|
||||||
|
domainsByOneOwner.remove(domainToReplace);
|
||||||
|
}
|
||||||
|
domainsByOneOwner.add(domainToStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineDomain createTimelineDomain(
|
||||||
|
String id, String description, String owner,
|
||||||
|
String readers, String writers,
|
||||||
|
Long createdTime, Long modifiedTime) {
|
||||||
|
TimelineDomain domainToStore = new TimelineDomain();
|
||||||
|
domainToStore.setId(id);
|
||||||
|
domainToStore.setDescription(description);
|
||||||
|
domainToStore.setOwner(owner);
|
||||||
|
domainToStore.setReaders(readers);
|
||||||
|
domainToStore.setWriters(writers);
|
||||||
|
domainToStore.setCreatedTime(createdTime);
|
||||||
|
domainToStore.setModifiedTime(modifiedTime);
|
||||||
|
return domainToStore;
|
||||||
|
}
|
||||||
|
|
||||||
private static TimelineEntity maskFields(
|
private static TimelineEntity maskFields(
|
||||||
TimelineEntity entity, EnumSet<Field> fields) {
|
TimelineEntity entity, EnumSet<Field> fields) {
|
||||||
// Conceal the fields that are not going to be exposed
|
// Conceal the fields that are not going to be exposed
|
||||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
||||||
|
@ -286,6 +288,78 @@ public class TimelineDataManager {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add or update an domain. If the domain already exists, only the owner
|
||||||
|
* and the admin can update it.
|
||||||
|
*/
|
||||||
|
public void putDomain(TimelineDomain domain,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
TimelineDomain existingDomain =
|
||||||
|
store.getDomain(domain.getId());
|
||||||
|
if (existingDomain != null) {
|
||||||
|
if (!timelineACLsManager.checkAccess(callerUGI, existingDomain)) {
|
||||||
|
throw new YarnException(callerUGI.getShortUserName() +
|
||||||
|
" is not allowed to override an existing domain " +
|
||||||
|
existingDomain.getId());
|
||||||
|
}
|
||||||
|
// Set it again in case ACLs are not enabled: The domain can be
|
||||||
|
// modified by every body, but the owner is not changed.
|
||||||
|
domain.setOwner(existingDomain.getOwner());
|
||||||
|
}
|
||||||
|
store.put(domain);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a single domain of the particular ID. If callerUGI is not the owner
|
||||||
|
* or the admin of the domain, we need to hide the details from him, and
|
||||||
|
* only allow him to see the ID.
|
||||||
|
*/
|
||||||
|
public TimelineDomain getDomain(String domainId,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
TimelineDomain domain = store.getDomain(domainId);
|
||||||
|
if (domain != null) {
|
||||||
|
if (timelineACLsManager.checkAccess(callerUGI, domain)) {
|
||||||
|
return domain;
|
||||||
|
} else {
|
||||||
|
hideDomainDetails(domain);
|
||||||
|
return domain;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all the domains that belong to the given owner. If callerUGI is not
|
||||||
|
* the owner or the admin of the domain, we need to hide the details from
|
||||||
|
* him, and only allow him to see the ID.
|
||||||
|
*/
|
||||||
|
public TimelineDomains getDomains(String owner,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
TimelineDomains domains = store.getDomains(owner);
|
||||||
|
boolean hasAccess = true;
|
||||||
|
boolean isChecked = false;
|
||||||
|
for (TimelineDomain domain : domains.getDomains()) {
|
||||||
|
// The owner for each domain is the same, just need to check on
|
||||||
|
if (!isChecked) {
|
||||||
|
hasAccess = timelineACLsManager.checkAccess(callerUGI, domain);
|
||||||
|
isChecked = true;
|
||||||
|
}
|
||||||
|
if (!hasAccess) {
|
||||||
|
hideDomainDetails(domain);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return domains;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void hideDomainDetails(TimelineDomain domain) {
|
||||||
|
domain.setDescription(null);
|
||||||
|
domain.setOwner(null);
|
||||||
|
domain.setReaders(null);
|
||||||
|
domain.setWriters(null);
|
||||||
|
domain.setCreatedTime(null);
|
||||||
|
domain.setModifiedTime(null);
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean extendFields(EnumSet<Field> fieldEnums) {
|
private static boolean extendFields(EnumSet<Field> fieldEnums) {
|
||||||
boolean modified = false;
|
boolean modified = false;
|
||||||
if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
|
if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface is for retrieving timeline information.
|
* This interface is for retrieving timeline information.
|
||||||
|
@ -152,4 +154,25 @@ public interface TimelineReader {
|
||||||
TimelineEvents getEntityTimelines(String entityType,
|
TimelineEvents getEntityTimelines(String entityType,
|
||||||
SortedSet<String> entityIds, Long limit, Long windowStart,
|
SortedSet<String> entityIds, Long limit, Long windowStart,
|
||||||
Long windowEnd, Set<String> eventTypes) throws IOException;
|
Long windowEnd, Set<String> eventTypes) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method retrieves the domain information for a given ID.
|
||||||
|
*
|
||||||
|
* @return a {@link TimelineDomain} object.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
TimelineDomain getDomain(
|
||||||
|
String domainId) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method retrieves all the domains that belong to a given owner.
|
||||||
|
* The domains are sorted according to the created time firstly and the
|
||||||
|
* modified time secondly in descending order.
|
||||||
|
*
|
||||||
|
* @param owner
|
||||||
|
* the domain owner
|
||||||
|
* @return an {@link TimelineDomains} object.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
TimelineDomains getDomains(String owner) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,13 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timeline;
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface is for storing timeline information.
|
* This interface is for storing timeline information.
|
||||||
*/
|
*/
|
||||||
|
@ -37,10 +38,21 @@ public interface TimelineWriter {
|
||||||
* individual put request objects will be reported in the response.
|
* individual put request objects will be reported in the response.
|
||||||
*
|
*
|
||||||
* @param data
|
* @param data
|
||||||
* An {@link TimelineEntities} object.
|
* a {@link TimelineEntities} object.
|
||||||
* @return An {@link TimelinePutResponse} object.
|
* @return a {@link TimelinePutResponse} object.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
TimelinePutResponse put(TimelineEntities data) throws IOException;
|
TimelinePutResponse put(TimelineEntities data) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store domain information to the timeline store. If A domain of the
|
||||||
|
* same ID already exists in the timeline store, it will be COMPLETELY updated
|
||||||
|
* with the given domain.
|
||||||
|
*
|
||||||
|
* @param domain
|
||||||
|
* a {@link TimelineDomain} object
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void put(TimelineDomain domain) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AdminACLsManager;
|
import org.apache.hadoop.yarn.security.AdminACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
|
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
|
||||||
|
@ -81,6 +82,31 @@ public class TimelineACLsManager {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||||
|
TimelineDomain domain) throws YarnException, IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Verifying the access of "
|
||||||
|
+ (callerUGI == null ? null : callerUGI.getShortUserName())
|
||||||
|
+ " on the timeline domain " + domain);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!adminAclsManager.areACLsEnabled()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
String owner = domain.getOwner();
|
||||||
|
if (owner == null || owner.length() == 0) {
|
||||||
|
throw new YarnException("Owner information of the timeline domain "
|
||||||
|
+ domain.getId() + " is corrupted.");
|
||||||
|
}
|
||||||
|
if (callerUGI != null
|
||||||
|
&& (adminAclsManager.isAdmin(callerUGI) ||
|
||||||
|
callerUGI.getShortUserName().equals(owner))) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public AdminACLsManager
|
public AdminACLsManager
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timeline.webapp;
|
package org.apache.hadoop.yarn.server.timeline.webapp;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -32,6 +33,7 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
import javax.ws.rs.GET;
|
import javax.ws.rs.GET;
|
||||||
import javax.ws.rs.POST;
|
import javax.ws.rs.POST;
|
||||||
|
import javax.ws.rs.PUT;
|
||||||
import javax.ws.rs.Path;
|
import javax.ws.rs.Path;
|
||||||
import javax.ws.rs.PathParam;
|
import javax.ws.rs.PathParam;
|
||||||
import javax.ws.rs.Produces;
|
import javax.ws.rs.Produces;
|
||||||
|
@ -40,6 +42,7 @@ import javax.ws.rs.WebApplicationException;
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
import javax.ws.rs.core.Response.Status;
|
||||||
import javax.xml.bind.annotation.XmlAccessType;
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
import javax.xml.bind.annotation.XmlAccessorType;
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
import javax.xml.bind.annotation.XmlElement;
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
@ -53,7 +56,10 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
|
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||||
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
||||||
|
@ -259,6 +265,100 @@ public class TimelineWebServices {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the given domain into the timeline store, and return the errors
|
||||||
|
* that happen during storing.
|
||||||
|
*/
|
||||||
|
@PUT
|
||||||
|
@Path("/domain")
|
||||||
|
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
|
||||||
|
public Response putDomain(
|
||||||
|
@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
TimelineDomain domain) {
|
||||||
|
init(res);
|
||||||
|
UserGroupInformation callerUGI = getUser(req);
|
||||||
|
if (callerUGI == null) {
|
||||||
|
String msg = "The owner of the posted timeline domain is not set";
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new ForbiddenException(msg);
|
||||||
|
}
|
||||||
|
domain.setOwner(callerUGI.getShortUserName());
|
||||||
|
try {
|
||||||
|
timelineDataManager.putDomain(domain, callerUGI);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// The user doesn't have the access to override the existing domain.
|
||||||
|
LOG.error(e.getMessage(), e);
|
||||||
|
throw new ForbiddenException(e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error putting domain", e);
|
||||||
|
throw new WebApplicationException(e,
|
||||||
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
|
}
|
||||||
|
return Response.status(Status.OK).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a single domain of the given domain Id.
|
||||||
|
*/
|
||||||
|
@GET
|
||||||
|
@Path("/domain/{domainId}")
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
|
||||||
|
public TimelineDomain getDomain(
|
||||||
|
@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
@PathParam("domainId") String domainId) {
|
||||||
|
init(res);
|
||||||
|
domainId = parseStr(domainId);
|
||||||
|
if (domainId == null || domainId.length() == 0) {
|
||||||
|
throw new BadRequestException("Domain ID is not specified.");
|
||||||
|
}
|
||||||
|
TimelineDomain domain = null;
|
||||||
|
try {
|
||||||
|
domain = timelineDataManager.getDomain(
|
||||||
|
parseStr(domainId), getUser(req));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error getting domain", e);
|
||||||
|
throw new WebApplicationException(e,
|
||||||
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
|
}
|
||||||
|
if (domain == null) {
|
||||||
|
throw new NotFoundException("Timeline domain ["
|
||||||
|
+ domainId + "] is not found");
|
||||||
|
}
|
||||||
|
return domain;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a list of domains of the given owner.
|
||||||
|
*/
|
||||||
|
@GET
|
||||||
|
@Path("/domain")
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
|
||||||
|
public TimelineDomains getDomains(
|
||||||
|
@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
@QueryParam("owner") String owner) {
|
||||||
|
init(res);
|
||||||
|
owner = parseStr(owner);
|
||||||
|
UserGroupInformation callerUGI = getUser(req);
|
||||||
|
if (owner == null || owner.length() == 0) {
|
||||||
|
if (callerUGI == null) {
|
||||||
|
throw new BadRequestException("Domain owner is not specified.");
|
||||||
|
} else {
|
||||||
|
// By default it's going to list the caller's domains
|
||||||
|
owner = callerUGI.getShortUserName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return timelineDataManager.getDomains(owner, callerUGI);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error getting domains", e);
|
||||||
|
throw new WebApplicationException(e,
|
||||||
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void init(HttpServletResponse response) {
|
private void init(HttpServletResponse response) {
|
||||||
response.setContentType(null);
|
response.setContentType(null);
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,8 +69,9 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
store = new LeveldbTimelineStore();
|
store = new LeveldbTimelineStore();
|
||||||
store.init(config);
|
store.init(config);
|
||||||
store.start();
|
store.start();
|
||||||
loadTestData();
|
loadTestEntityData();
|
||||||
loadVerificationData();
|
loadVerificationEntityData();
|
||||||
|
loadTestDomainData();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -93,7 +94,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
super.testGetSingleEntity();
|
super.testGetSingleEntity();
|
||||||
((LeveldbTimelineStore)store).clearStartTimeCache();
|
((LeveldbTimelineStore)store).clearStartTimeCache();
|
||||||
super.testGetSingleEntity();
|
super.testGetSingleEntity();
|
||||||
loadTestData();
|
loadTestEntityData();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -257,7 +258,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
assertEquals(0, getEntities("type_2").size());
|
assertEquals(0, getEntities("type_2").size());
|
||||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||||
l).size());
|
l).size());
|
||||||
loadTestData();
|
loadTestEntityData();
|
||||||
assertEquals(0, getEntitiesFromTs("type_1", l).size());
|
assertEquals(0, getEntitiesFromTs("type_1", l).size());
|
||||||
assertEquals(0, getEntitiesFromTs("type_2", l).size());
|
assertEquals(0, getEntitiesFromTs("type_2", l).size());
|
||||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||||
|
@ -309,4 +310,14 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
store.start();
|
store.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomain() throws IOException {
|
||||||
|
super.testGetDomain();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomains() throws IOException {
|
||||||
|
super.testGetDomains();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,9 @@ public class TestMemoryTimelineStore extends TimelineStoreTestUtils {
|
||||||
store = new MemoryTimelineStore();
|
store = new MemoryTimelineStore();
|
||||||
store.init(new YarnConfiguration());
|
store.init(new YarnConfiguration());
|
||||||
store.start();
|
store.start();
|
||||||
loadTestData();
|
loadTestEntityData();
|
||||||
loadVerificationData();
|
loadVerificationEntityData();
|
||||||
|
loadTestDomainData();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -82,4 +83,14 @@ public class TestMemoryTimelineStore extends TimelineStoreTestUtils {
|
||||||
super.testGetEvents();
|
super.testGetEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomain() throws IOException {
|
||||||
|
super.testGetDomain();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomains() throws IOException {
|
||||||
|
super.testGetDomains();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,11 +38,11 @@ import java.util.TreeSet;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
||||||
|
|
||||||
public class TimelineStoreTestUtils {
|
public class TimelineStoreTestUtils {
|
||||||
|
@ -88,9 +88,9 @@ public class TimelineStoreTestUtils {
|
||||||
protected long beforeTs;
|
protected long beforeTs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load test data into the given store
|
* Load test entity data into the given store
|
||||||
*/
|
*/
|
||||||
protected void loadTestData() throws IOException {
|
protected void loadTestEntityData() throws IOException {
|
||||||
beforeTs = System.currentTimeMillis()-1;
|
beforeTs = System.currentTimeMillis()-1;
|
||||||
TimelineEntities entities = new TimelineEntities();
|
TimelineEntities entities = new TimelineEntities();
|
||||||
Map<String, Set<Object>> primaryFilters =
|
Map<String, Set<Object>> primaryFilters =
|
||||||
|
@ -184,9 +184,9 @@ public class TimelineStoreTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load verification data
|
* Load verification entity data
|
||||||
*/
|
*/
|
||||||
protected void loadVerificationData() throws Exception {
|
protected void loadVerificationEntityData() throws Exception {
|
||||||
userFilter = new NameValuePair("user", "username");
|
userFilter = new NameValuePair("user", "username");
|
||||||
numericFilter1 = new NameValuePair("appname", Integer.MAX_VALUE);
|
numericFilter1 = new NameValuePair("appname", Integer.MAX_VALUE);
|
||||||
numericFilter2 = new NameValuePair("long", (long)Integer.MAX_VALUE + 1l);
|
numericFilter2 = new NameValuePair("long", (long)Integer.MAX_VALUE + 1l);
|
||||||
|
@ -263,6 +263,51 @@ public class TimelineStoreTestUtils {
|
||||||
events2.add(ev4);
|
events2.add(ev4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TimelineDomain domain1;
|
||||||
|
private TimelineDomain domain2;
|
||||||
|
private TimelineDomain domain3;
|
||||||
|
private long elapsedTime;
|
||||||
|
|
||||||
|
protected void loadTestDomainData() throws IOException {
|
||||||
|
domain1 = new TimelineDomain();
|
||||||
|
domain1.setId("domain_id_1");
|
||||||
|
domain1.setDescription("description_1");
|
||||||
|
domain1.setOwner("owner_1");
|
||||||
|
domain1.setReaders("reader_user_1 reader_group_1");
|
||||||
|
domain1.setWriters("writer_user_1 writer_group_1");
|
||||||
|
store.put(domain1);
|
||||||
|
|
||||||
|
domain2 = new TimelineDomain();
|
||||||
|
domain2.setId("domain_id_2");
|
||||||
|
domain2.setDescription("description_2");
|
||||||
|
domain2.setOwner("owner_2");
|
||||||
|
domain2.setReaders("reader_user_2 reader_group_2");
|
||||||
|
domain2.setWriters("writer_user_2writer_group_2");
|
||||||
|
store.put(domain2);
|
||||||
|
|
||||||
|
// Wait a second before updating the domain information
|
||||||
|
elapsedTime = 1000;
|
||||||
|
try {
|
||||||
|
Thread.sleep(elapsedTime);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
domain2.setDescription("description_3");
|
||||||
|
domain2.setOwner("owner_3");
|
||||||
|
domain2.setReaders("reader_user_3 reader_group_3");
|
||||||
|
domain2.setWriters("writer_user_3 writer_group_3");
|
||||||
|
store.put(domain2);
|
||||||
|
|
||||||
|
domain3 = new TimelineDomain();
|
||||||
|
domain3.setId("domain_id_4");
|
||||||
|
domain3.setDescription("description_4");
|
||||||
|
domain3.setOwner("owner_1");
|
||||||
|
domain3.setReaders("reader_user_4 reader_group_4");
|
||||||
|
domain3.setWriters("writer_user_4 writer_group_4");
|
||||||
|
store.put(domain3);
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetSingleEntity() throws IOException {
|
public void testGetSingleEntity() throws IOException {
|
||||||
// test getting entity info
|
// test getting entity info
|
||||||
verifyEntityInfo(null, null, null, null, null, null,
|
verifyEntityInfo(null, null, null, null, null, null,
|
||||||
|
@ -519,7 +564,7 @@ public class TimelineStoreTestUtils {
|
||||||
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
||||||
// check insert time is not overwritten
|
// check insert time is not overwritten
|
||||||
long beforeTs = this.beforeTs;
|
long beforeTs = this.beforeTs;
|
||||||
loadTestData();
|
loadTestEntityData();
|
||||||
assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size());
|
assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size());
|
||||||
assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size());
|
assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size());
|
||||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||||
|
@ -788,4 +833,39 @@ public class TimelineStoreTestUtils {
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetDomain() throws IOException {
|
||||||
|
TimelineDomain actualDomain1 =
|
||||||
|
store.getDomain(domain1.getId());
|
||||||
|
verifyDomainInfo(domain1, actualDomain1);
|
||||||
|
assertTrue(actualDomain1.getCreatedTime() > 0);
|
||||||
|
assertTrue(actualDomain1.getModifiedTime() > 0);
|
||||||
|
assertEquals(
|
||||||
|
actualDomain1.getCreatedTime(), actualDomain1.getModifiedTime());
|
||||||
|
|
||||||
|
TimelineDomain actualDomain2 =
|
||||||
|
store.getDomain(domain2.getId());
|
||||||
|
verifyDomainInfo(domain2, actualDomain2);
|
||||||
|
assertEquals("domain_id_2", actualDomain2.getId());
|
||||||
|
assertTrue(actualDomain2.getCreatedTime() > 0);
|
||||||
|
assertTrue(actualDomain2.getModifiedTime() > 0);
|
||||||
|
assertTrue(
|
||||||
|
actualDomain2.getCreatedTime() < actualDomain2.getModifiedTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGetDomains() throws IOException {
|
||||||
|
TimelineDomains actualDomains =
|
||||||
|
store.getDomains("owner_1");
|
||||||
|
assertEquals(2, actualDomains.getDomains().size());
|
||||||
|
verifyDomainInfo(domain3, actualDomains.getDomains().get(0));
|
||||||
|
verifyDomainInfo(domain1, actualDomains.getDomains().get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyDomainInfo(
|
||||||
|
TimelineDomain expected, TimelineDomain actual) {
|
||||||
|
assertEquals(expected.getId(), actual.getId());
|
||||||
|
assertEquals(expected.getDescription(), actual.getDescription());
|
||||||
|
assertEquals(expected.getOwner(), actual.getOwner());
|
||||||
|
assertEquals(expected.getReaders(), actual.getReaders());
|
||||||
|
assertEquals(expected.getWriters(), actual.getWriters());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,17 +21,17 @@ package org.apache.hadoop.yarn.server.timeline.security;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestTimelineACLsManager {
|
public class TestTimelineACLsManager {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testYarnACLsNotEnabled() throws Exception {
|
public void testYarnACLsNotEnabledForEntity() throws Exception {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
||||||
TimelineACLsManager timelineACLsManager =
|
TimelineACLsManager timelineACLsManager =
|
||||||
|
@ -47,7 +47,7 @@ public class TestTimelineACLsManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testYarnACLsEnabled() throws Exception {
|
public void testYarnACLsEnabledForEntity() throws Exception {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||||
|
@ -72,7 +72,7 @@ public class TestTimelineACLsManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCorruptedOwnerInfo() throws Exception {
|
public void testCorruptedOwnerInfoForEntity() throws Exception {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "owner");
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "owner");
|
||||||
|
@ -89,4 +89,59 @@ public class TestTimelineACLsManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testYarnACLsNotEnabledForDomain() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
||||||
|
TimelineACLsManager timelineACLsManager =
|
||||||
|
new TimelineACLsManager(conf);
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setOwner("owner");
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Always true when ACLs are not enabled",
|
||||||
|
timelineACLsManager.checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser("user"), domain));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testYarnACLsEnabledForDomain() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||||
|
TimelineACLsManager timelineACLsManager =
|
||||||
|
new TimelineACLsManager(conf);
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setOwner("owner");
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Owner should be allowed to access",
|
||||||
|
timelineACLsManager.checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser("owner"), domain));
|
||||||
|
Assert.assertFalse(
|
||||||
|
"Other shouldn't be allowed to access",
|
||||||
|
timelineACLsManager.checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser("other"), domain));
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Admin should be allowed to access",
|
||||||
|
timelineACLsManager.checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser("admin"), domain));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorruptedOwnerInfoForDomain() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "owner");
|
||||||
|
TimelineACLsManager timelineACLsManager =
|
||||||
|
new TimelineACLsManager(conf);
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
try {
|
||||||
|
timelineACLsManager.checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser("owner"), domain);
|
||||||
|
Assert.fail("Exception is expected");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.assertTrue("It's not the exact expected exception", e.getMessage()
|
||||||
|
.contains("is corrupted."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import javax.servlet.FilterConfig;
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||||
|
@ -44,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -69,7 +72,6 @@ import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||||
import com.sun.jersey.test.framework.JerseyTest;
|
import com.sun.jersey.test.framework.JerseyTest;
|
||||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||||
|
|
||||||
|
|
||||||
public class TestTimelineWebServices extends JerseyTest {
|
public class TestTimelineWebServices extends JerseyTest {
|
||||||
|
|
||||||
private static TimelineStore store;
|
private static TimelineStore store;
|
||||||
|
@ -85,7 +87,7 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
bind(YarnJacksonJaxbJsonProvider.class);
|
bind(YarnJacksonJaxbJsonProvider.class);
|
||||||
bind(TimelineWebServices.class);
|
bind(TimelineWebServices.class);
|
||||||
bind(GenericExceptionHandler.class);
|
bind(GenericExceptionHandler.class);
|
||||||
try{
|
try {
|
||||||
store = mockTimelineStore();
|
store = mockTimelineStore();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
|
@ -100,7 +102,8 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
new TimelineDataManager(store, timelineACLsManager);
|
new TimelineDataManager(store, timelineACLsManager);
|
||||||
bind(TimelineDataManager.class).toInstance(timelineDataManager);
|
bind(TimelineDataManager.class).toInstance(timelineDataManager);
|
||||||
serve("/*").with(GuiceContainer.class);
|
serve("/*").with(GuiceContainer.class);
|
||||||
TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
|
TimelineAuthenticationFilter taFilter =
|
||||||
|
new TimelineAuthenticationFilter();
|
||||||
FilterConfig filterConfig = mock(FilterConfig.class);
|
FilterConfig filterConfig = mock(FilterConfig.class);
|
||||||
when(filterConfig.getInitParameter(AuthenticationFilter.CONFIG_PREFIX))
|
when(filterConfig.getInitParameter(AuthenticationFilter.CONFIG_PREFIX))
|
||||||
.thenReturn(null);
|
.thenReturn(null);
|
||||||
|
@ -159,7 +162,8 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
.filterClass(com.google.inject.servlet.GuiceFilter.class)
|
.filterClass(com.google.inject.servlet.GuiceFilter.class)
|
||||||
.contextPath("jersey-guice-filter")
|
.contextPath("jersey-guice-filter")
|
||||||
.servletPath("/")
|
.servletPath("/")
|
||||||
.clientConfig(new DefaultClientConfig(YarnJacksonJaxbJsonProvider.class))
|
.clientConfig(
|
||||||
|
new DefaultClientConfig(YarnJacksonJaxbJsonProvider.class))
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,7 +281,7 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
WebResource r = resource();
|
WebResource r = resource();
|
||||||
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||||
.path("type_1").queryParam("primaryFilter",
|
.path("type_1").queryParam("primaryFilter",
|
||||||
"long:" + Long.toString((long)Integer.MAX_VALUE + 1l))
|
"long:" + Long.toString((long) Integer.MAX_VALUE + 1l))
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.get(ClientResponse.class);
|
.get(ClientResponse.class);
|
||||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
@ -406,7 +410,8 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
TimelineEntities entities = new TimelineEntities();
|
TimelineEntities entities = new TimelineEntities();
|
||||||
TimelineEntity entity = new TimelineEntity();
|
TimelineEntity entity = new TimelineEntity();
|
||||||
Map<String, Set<Object>> filters = new HashMap<String, Set<Object>>();
|
Map<String, Set<Object>> filters = new HashMap<String, Set<Object>>();
|
||||||
filters.put(TimelineStore.SystemFilter.ENTITY_OWNER.toString(), new HashSet<Object>());
|
filters.put(TimelineStore.SystemFilter.ENTITY_OWNER.toString(),
|
||||||
|
new HashSet<Object>());
|
||||||
entity.setPrimaryFilters(filters);
|
entity.setPrimaryFilters(filters);
|
||||||
entity.setEntityId("test id 6");
|
entity.setEntityId("test id 6");
|
||||||
entity.setEntityType("test type 6");
|
entity.setEntityType("test type 6");
|
||||||
|
@ -418,13 +423,15 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.type(MediaType.APPLICATION_JSON)
|
.type(MediaType.APPLICATION_JSON)
|
||||||
.post(ClientResponse.class, entities);
|
.post(ClientResponse.class, entities);
|
||||||
TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class);
|
TimelinePutResponse putResposne =
|
||||||
|
response.getEntity(TimelinePutResponse.class);
|
||||||
Assert.assertEquals(1, putResposne.getErrors().size());
|
Assert.assertEquals(1, putResposne.getErrors().size());
|
||||||
List<TimelinePutError> errors = putResposne.getErrors();
|
List<TimelinePutError> errors = putResposne.getErrors();
|
||||||
Assert.assertEquals(TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT,
|
Assert.assertEquals(
|
||||||
errors.get(0).getErrorCode());
|
TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT,
|
||||||
|
errors.get(0).getErrorCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPostEntities() throws Exception {
|
public void testPostEntities() throws Exception {
|
||||||
TimelineEntities entities = new TimelineEntities();
|
TimelineEntities entities = new TimelineEntities();
|
||||||
|
@ -449,7 +456,8 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
.type(MediaType.APPLICATION_JSON)
|
.type(MediaType.APPLICATION_JSON)
|
||||||
.post(ClientResponse.class, entities);
|
.post(ClientResponse.class, entities);
|
||||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class);
|
TimelinePutResponse putResposne =
|
||||||
|
response.getEntity(TimelinePutResponse.class);
|
||||||
Assert.assertNotNull(putResposne);
|
Assert.assertNotNull(putResposne);
|
||||||
Assert.assertEquals(0, putResposne.getErrors().size());
|
Assert.assertEquals(0, putResposne.getErrors().size());
|
||||||
// verify the entity exists in the store
|
// verify the entity exists in the store
|
||||||
|
@ -482,7 +490,8 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
.type(MediaType.APPLICATION_JSON)
|
.type(MediaType.APPLICATION_JSON)
|
||||||
.post(ClientResponse.class, entities);
|
.post(ClientResponse.class, entities);
|
||||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
TimelinePutResponse putResponse = response.getEntity(TimelinePutResponse.class);
|
TimelinePutResponse putResponse =
|
||||||
|
response.getEntity(TimelinePutResponse.class);
|
||||||
Assert.assertNotNull(putResponse);
|
Assert.assertNotNull(putResponse);
|
||||||
Assert.assertEquals(0, putResponse.getErrors().size());
|
Assert.assertEquals(0, putResponse.getErrors().size());
|
||||||
|
|
||||||
|
@ -668,4 +677,202 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomain() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain").path("domain_id_1")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
TimelineDomain domain = response.getEntity(TimelineDomain.class);
|
||||||
|
verifyDomain(domain, "domain_id_1", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomainYarnACLsEnabled() {
|
||||||
|
AdminACLsManager oldAdminACLsManager =
|
||||||
|
timelineACLsManager.setAdminACLsManager(adminACLsManager);
|
||||||
|
try {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain").path("domain_id_1")
|
||||||
|
.queryParam("user.name", "owner_1")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
TimelineDomain domain = response.getEntity(TimelineDomain.class);
|
||||||
|
verifyDomain(domain, "domain_id_1", true);
|
||||||
|
|
||||||
|
response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain").path("domain_id_1")
|
||||||
|
.queryParam("user.name", "tester")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
domain = response.getEntity(TimelineDomain.class);
|
||||||
|
verifyDomain(domain, "domain_id_1", false);
|
||||||
|
} finally {
|
||||||
|
timelineACLsManager.setAdminACLsManager(oldAdminACLsManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomains() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain")
|
||||||
|
.queryParam("owner", "owner_1")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
TimelineDomains domains = response.getEntity(TimelineDomains.class);
|
||||||
|
Assert.assertEquals(2, domains.getDomains().size());
|
||||||
|
for (int i = 0; i < domains.getDomains().size(); ++i) {
|
||||||
|
verifyDomain(domains.getDomains().get(i),
|
||||||
|
i == 0 ? "domain_id_4" : "domain_id_1", true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomainsYarnACLsEnabled() throws Exception {
|
||||||
|
AdminACLsManager oldAdminACLsManager =
|
||||||
|
timelineACLsManager.setAdminACLsManager(adminACLsManager);
|
||||||
|
try {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain")
|
||||||
|
.queryParam("user.name", "owner_1")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
TimelineDomains domains = response.getEntity(TimelineDomains.class);
|
||||||
|
Assert.assertEquals(2, domains.getDomains().size());
|
||||||
|
for (int i = 0; i < domains.getDomains().size(); ++i) {
|
||||||
|
verifyDomain(domains.getDomains().get(i),
|
||||||
|
i == 0 ? "domain_id_4" : "domain_id_1", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain")
|
||||||
|
.queryParam("owner", "owner_1")
|
||||||
|
.queryParam("user.name", "tester")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
domains = response.getEntity(TimelineDomains.class);
|
||||||
|
Assert.assertEquals(2, domains.getDomains().size());
|
||||||
|
for (int i = 0; i < domains.getDomains().size(); ++i) {
|
||||||
|
verifyDomain(domains.getDomains().get(i),
|
||||||
|
i == 0 ? "domain_id_4" : "domain_id_1", false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
timelineACLsManager.setAdminACLsManager(oldAdminACLsManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutDomain() throws Exception {
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setId("test_domain_id");
|
||||||
|
WebResource r = resource();
|
||||||
|
// No owner, will be rejected
|
||||||
|
ClientResponse response = r.path("ws").path("v1")
|
||||||
|
.path("timeline").path("domain")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, domain);
|
||||||
|
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
assertEquals(ClientResponse.Status.FORBIDDEN,
|
||||||
|
response.getClientResponseStatus());
|
||||||
|
|
||||||
|
response = r.path("ws").path("v1")
|
||||||
|
.path("timeline").path("domain")
|
||||||
|
.queryParam("user.name", "tester")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, domain);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
|
||||||
|
// Verify the domain exists
|
||||||
|
response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain").path("test_domain_id")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
domain = response.getEntity(TimelineDomain.class);
|
||||||
|
Assert.assertNotNull(domain);
|
||||||
|
Assert.assertEquals("test_domain_id", domain.getId());
|
||||||
|
Assert.assertEquals("tester", domain.getOwner());
|
||||||
|
Assert.assertEquals(null, domain.getDescription());
|
||||||
|
|
||||||
|
// Update the domain
|
||||||
|
domain.setDescription("test_description");
|
||||||
|
response = r.path("ws").path("v1")
|
||||||
|
.path("timeline").path("domain")
|
||||||
|
.queryParam("user.name", "tester")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, domain);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
|
||||||
|
// Verify the domain is updated
|
||||||
|
response = r.path("ws").path("v1").path("timeline")
|
||||||
|
.path("domain").path("test_domain_id")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
domain = response.getEntity(TimelineDomain.class);
|
||||||
|
Assert.assertNotNull(domain);
|
||||||
|
Assert.assertEquals("test_domain_id", domain.getId());
|
||||||
|
Assert.assertEquals("test_description", domain.getDescription());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutDomainYarnACLsEnabled() throws Exception {
|
||||||
|
AdminACLsManager oldAdminACLsManager =
|
||||||
|
timelineACLsManager.setAdminACLsManager(adminACLsManager);
|
||||||
|
try {
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setId("test_domain_id_acl");
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1")
|
||||||
|
.path("timeline").path("domain")
|
||||||
|
.queryParam("user.name", "tester")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, domain);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
|
||||||
|
// Update the domain by another user
|
||||||
|
response = r.path("ws").path("v1")
|
||||||
|
.path("timeline").path("domain")
|
||||||
|
.queryParam("user.name", "other")
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class, domain);
|
||||||
|
assertEquals(Status.FORBIDDEN.getStatusCode(), response.getStatus());
|
||||||
|
} finally {
|
||||||
|
timelineACLsManager.setAdminACLsManager(oldAdminACLsManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyDomain(TimelineDomain domain,
|
||||||
|
String domainId, boolean hasAccess) {
|
||||||
|
Assert.assertNotNull(domain);
|
||||||
|
Assert.assertEquals(domainId, domain.getId());
|
||||||
|
// The specific values have been verified in TestMemoryTimelineStore
|
||||||
|
Assert.assertTrue(hasAccess && domain.getDescription() != null ||
|
||||||
|
!hasAccess && domain.getDescription() == null);
|
||||||
|
Assert.assertTrue(hasAccess && domain.getOwner() != null ||
|
||||||
|
!hasAccess && domain.getOwner() == null);
|
||||||
|
Assert.assertTrue(hasAccess && domain.getReaders() != null ||
|
||||||
|
!hasAccess && domain.getReaders() == null);
|
||||||
|
Assert.assertTrue(hasAccess && domain.getWriters() != null ||
|
||||||
|
!hasAccess && domain.getWriters() == null);
|
||||||
|
Assert.assertTrue(hasAccess && domain.getCreatedTime() != null ||
|
||||||
|
!hasAccess && domain.getCreatedTime() == null);
|
||||||
|
Assert.assertTrue(hasAccess && domain.getModifiedTime() != null ||
|
||||||
|
!hasAccess && domain.getModifiedTime() == null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.EnumSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
|
@ -124,8 +123,8 @@ public class TestTimelineWebServicesWithSSL {
|
||||||
private ClientResponse resp;
|
private ClientResponse resp;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientResponse doPostingEntities(TimelineEntities entities) {
|
public ClientResponse doPostingObject(Object obj, String path) {
|
||||||
resp = super.doPostingEntities(entities);
|
resp = super.doPostingObject(obj, path);
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue