YARN-7933. [atsv2 read acls] Add TimelineWriter#writeDomain. (Rohith Sharma K S via Haibo Chen)
This commit is contained in:
parent
0fc988e6a3
commit
e3b7d7ac16
|
@ -0,0 +1,178 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.timelineservice;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This class contains the information about a timeline service domain, which is
|
||||
* used to a user to host a number of timeline entities, isolating them from
|
||||
* others'. The user can also define the reader and writer users/groups for
|
||||
* the domain, which is used to control the access to its entities.
|
||||
* </p>
|
||||
* <p>
|
||||
* The reader and writer users/groups pattern that the user can supply is the
|
||||
* same as what <code>AccessControlList</code> takes.
|
||||
* </p>
|
||||
*/
|
||||
@XmlRootElement(name = "domain")
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
@Public
|
||||
@Evolving
|
||||
public class TimelineDomain {
|
||||
|
||||
private String id;
|
||||
private String description;
|
||||
private String owner;
|
||||
private String readers;
|
||||
private String writers;
|
||||
private Long createdTime;
|
||||
private Long modifiedTime;
|
||||
|
||||
public TimelineDomain() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the domain ID.
|
||||
* @return the domain ID
|
||||
*/
|
||||
@XmlElement(name = "id")
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the domain ID.
|
||||
* @param id the domain ID
|
||||
*/
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the domain description.
|
||||
* @return the domain description
|
||||
*/
|
||||
@XmlElement(name = "description")
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the domain description.
|
||||
* @param description the domain description
|
||||
*/
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the domain owner.
|
||||
* @return the domain owner
|
||||
*/
|
||||
@XmlElement(name = "owner")
|
||||
public String getOwner() {
|
||||
return owner;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the domain owner. The user doesn't need to set it, which will
|
||||
* automatically set to the user who puts the domain.
|
||||
* @param owner the domain owner
|
||||
*/
|
||||
public void setOwner(String owner) {
|
||||
this.owner = owner;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the reader (and/or reader group) list string.
|
||||
* @return the reader (and/or reader group) list string
|
||||
*/
|
||||
@XmlElement(name = "readers")
|
||||
public String getReaders() {
|
||||
return readers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the reader (and/or reader group) list string.
|
||||
* @param readers the reader (and/or reader group) list string
|
||||
*/
|
||||
public void setReaders(String readers) {
|
||||
this.readers = readers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the writer (and/or writer group) list string.
|
||||
* @return the writer (and/or writer group) list string
|
||||
*/
|
||||
@XmlElement(name = "writers")
|
||||
public String getWriters() {
|
||||
return writers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the writer (and/or writer group) list string.
|
||||
* @param writers the writer (and/or writer group) list string
|
||||
*/
|
||||
public void setWriters(String writers) {
|
||||
this.writers = writers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the created time of the domain.
|
||||
* @return the created time of the domain
|
||||
*/
|
||||
@XmlElement(name = "createdtime")
|
||||
public Long getCreatedTime() {
|
||||
return createdTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the created time of the domain.
|
||||
* @param createdTime the created time of the domain
|
||||
*/
|
||||
public void setCreatedTime(Long createdTime) {
|
||||
this.createdTime = createdTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the modified time of the domain.
|
||||
* @return the modified time of the domain
|
||||
*/
|
||||
@XmlElement(name = "modifiedtime")
|
||||
public Long getModifiedTime() {
|
||||
return modifiedTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the modified time of the domain.
|
||||
* @param modifiedTime the modified time of the domain
|
||||
*/
|
||||
public void setModifiedTime(Long modifiedTime) {
|
||||
this.modifiedTime = modifiedTime;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test for timeline domain.
|
||||
*/
|
||||
public class TestHBaseTimelineStorageDomain {
|
||||
|
||||
private static HBaseTestingUtility util;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
util = new HBaseTestingUtility();
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
util.startMiniCluster();
|
||||
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDomainIdTable() throws Exception {
|
||||
long l = System.currentTimeMillis();
|
||||
HBaseTimelineWriterImpl hbi = null;
|
||||
Configuration c1 = util.getConfiguration();
|
||||
String clusterId = "yarn-cluster";
|
||||
TimelineCollectorContext context =
|
||||
new TimelineCollectorContext(clusterId, null, null, null, null, null);
|
||||
TimelineDomain domain2;
|
||||
try {
|
||||
hbi = new HBaseTimelineWriterImpl();
|
||||
hbi.init(c1);
|
||||
|
||||
// write empty domain
|
||||
domain2 = new TimelineDomain();
|
||||
domain2.setCreatedTime(l);
|
||||
domain2.setDescription("domain-2");
|
||||
domain2.setId("domain-2");
|
||||
domain2.setModifiedTime(l);
|
||||
domain2.setOwner("owner1");
|
||||
domain2.setReaders("user1,user2 group1,group2");
|
||||
domain2.setWriters("writer1,writer2");
|
||||
hbi.write(context, domain2);
|
||||
|
||||
// flush everything to hbase
|
||||
hbi.flush();
|
||||
} finally {
|
||||
if (hbi != null) {
|
||||
hbi.close();
|
||||
}
|
||||
}
|
||||
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn.getTable(BaseTableRW
|
||||
.getTableName(c1, DomainTableRW.TABLE_NAME_CONF_NAME,
|
||||
DomainTableRW.DEFAULT_TABLE_NAME));
|
||||
|
||||
byte[] startRow = new DomainRowKey(clusterId, domain2.getId()).getRowKey();
|
||||
Get g = new Get(startRow);
|
||||
Result result = table1.get(g);
|
||||
assertNotNull(result);
|
||||
assertTrue(!result.isEmpty());
|
||||
|
||||
byte[] row = result.getRow();
|
||||
DomainRowKey domainRowKey = DomainRowKey.parseRowKey(row);
|
||||
assertEquals(domain2.getId(), domainRowKey.getDomainId());
|
||||
assertEquals(clusterId, domainRowKey.getClusterId());
|
||||
|
||||
Long cTime =
|
||||
(Long) ColumnRWHelper.readResult(result, DomainColumn.CREATED_TIME);
|
||||
String description =
|
||||
(String) ColumnRWHelper.readResult(result, DomainColumn.DESCRIPTION);
|
||||
Long mTime = (Long) ColumnRWHelper
|
||||
.readResult(result, DomainColumn.MODIFICATION_TIME);
|
||||
String owners =
|
||||
(String) ColumnRWHelper.readResult(result, DomainColumn.OWNER);
|
||||
String readers =
|
||||
(String) ColumnRWHelper.readResult(result, DomainColumn.READERS);
|
||||
String writers =
|
||||
(String) ColumnRWHelper.readResult(result, DomainColumn.WRITERS);
|
||||
|
||||
assertEquals(l, cTime.longValue());
|
||||
assertEquals(l, mTime.longValue());
|
||||
assertEquals("domain-2", description);
|
||||
assertEquals("owner1", owners);
|
||||
assertEquals("user1,user2 group1,group2", readers);
|
||||
assertEquals("writer1,writer2", writers);
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
|
@ -56,6 +57,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConve
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||
|
@ -101,6 +106,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|||
private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
|
||||
private TypedBufferedMutator<FlowRunTable> flowRunTable;
|
||||
private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
|
||||
private TypedBufferedMutator<DomainTable> domainTable;
|
||||
|
||||
/**
|
||||
* Used to convert strings key components to and from storage format.
|
||||
|
@ -139,6 +145,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|||
new FlowActivityTableRW().getTableMutator(hbaseConf, conn);
|
||||
subApplicationTable =
|
||||
new SubApplicationTableRW().getTableMutator(hbaseConf, conn);
|
||||
domainTable = new DomainTableRW().getTableMutator(hbaseConf, conn);
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
|
||||
UserGroupInformation.getLoginUser() :
|
||||
|
@ -231,6 +238,41 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|||
return putStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineWriteResponse write(TimelineCollectorContext context,
|
||||
TimelineDomain domain)
|
||||
throws IOException {
|
||||
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
||||
|
||||
String clusterId = context.getClusterId();
|
||||
String domainId = domain.getId();
|
||||
|
||||
// defensive coding to avoid NPE during row key construction
|
||||
if (clusterId == null) {
|
||||
LOG.warn(
|
||||
"Found null for clusterId. Not proceeding with writing to hbase");
|
||||
return putStatus;
|
||||
}
|
||||
|
||||
DomainRowKey domainRowKey = new DomainRowKey(clusterId, domainId);
|
||||
byte[] rowKey = domainRowKey.getRowKey();
|
||||
|
||||
ColumnRWHelper.store(rowKey, domainTable, DomainColumn.CREATED_TIME, null,
|
||||
domain.getCreatedTime());
|
||||
ColumnRWHelper.store(rowKey, domainTable, DomainColumn.DESCRIPTION, null,
|
||||
domain.getDescription());
|
||||
ColumnRWHelper
|
||||
.store(rowKey, domainTable, DomainColumn.MODIFICATION_TIME, null,
|
||||
domain.getModifiedTime());
|
||||
ColumnRWHelper.store(rowKey, domainTable, DomainColumn.OWNER, null,
|
||||
domain.getOwner());
|
||||
ColumnRWHelper.store(rowKey, domainTable, DomainColumn.READERS, null,
|
||||
domain.getReaders());
|
||||
ColumnRWHelper.store(rowKey, domainTable, DomainColumn.WRITERS, null,
|
||||
domain.getWriters());
|
||||
return putStatus;
|
||||
}
|
||||
|
||||
private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
|
||||
String clusterId, String appId, String userId, String flowVersion,
|
||||
TimelineEntity te, long appCreatedTimeStamp)
|
||||
|
@ -568,6 +610,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|||
flowRunTable.flush();
|
||||
flowActivityTable.flush();
|
||||
subApplicationTable.flush();
|
||||
domainTable.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -603,6 +646,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|||
if (subApplicationTable != null) {
|
||||
subApplicationTable.close();
|
||||
}
|
||||
if (domainTable != null) {
|
||||
domainTable.close();
|
||||
}
|
||||
if (conn != null) {
|
||||
LOG.info("closing the hbase Connection");
|
||||
conn.close();
|
||||
|
|
|
@ -44,7 +44,7 @@ public class DomainTableRW extends BaseTableRW<DomainTable> {
|
|||
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
|
||||
|
||||
/** default value for domain table name. */
|
||||
private static final String DEFAULT_TABLE_NAME = "timelineservice.domain";
|
||||
public static final String DEFAULT_TABLE_NAME = "timelineservice.domain";
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DomainTableRW.class);
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
|
@ -153,6 +154,33 @@ public abstract class TimelineCollector extends CompositeService {
|
|||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add or update an domain. If the domain already exists, only the owner
|
||||
* and the admin can update it.
|
||||
*
|
||||
* @param domain domain to post
|
||||
* @param callerUgi the caller UGI
|
||||
* @return the response that contains the result of the post.
|
||||
* @throws IOException if there is any exception encountered while putting
|
||||
* domain.
|
||||
*/
|
||||
public TimelineWriteResponse putDomain(TimelineDomain domain,
|
||||
UserGroupInformation callerUgi) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"putDomain(domain=" + domain + ", callerUgi=" + callerUgi + ")");
|
||||
}
|
||||
|
||||
TimelineWriteResponse response;
|
||||
synchronized (writer) {
|
||||
final TimelineCollectorContext context = getTimelineEntityContext();
|
||||
response = writer.write(context, domain);
|
||||
flushBufferedTimelineEntities();
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private TimelineWriteResponse writeTimelineEntities(
|
||||
TimelineEntities entities, UserGroupInformation callerUgi)
|
||||
throws IOException {
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
|
@ -185,6 +186,55 @@ public class TimelineCollectorWebService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param req Servlet request.
|
||||
* @param res Servlet response.
|
||||
* @param domain timeline domain to be put.
|
||||
* @param appId Application Id to which the domain to be put belong to. If
|
||||
* appId is not there or it cannot be parsed, HTTP 400 will be sent back.
|
||||
* @return a Response with appropriate HTTP status.
|
||||
*/
|
||||
@PUT
|
||||
@Path("/domain")
|
||||
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */ })
|
||||
public Response putDomain(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@QueryParam("appid") String appId,
|
||||
TimelineDomain domain) {
|
||||
init(res);
|
||||
UserGroupInformation callerUgi = getUser(req);
|
||||
if (callerUgi == null) {
|
||||
String msg = "The owner of the posted timeline entities is not set";
|
||||
LOG.error(msg);
|
||||
throw new ForbiddenException(msg);
|
||||
}
|
||||
|
||||
try {
|
||||
ApplicationId appID = parseApplicationId(appId);
|
||||
if (appID == null) {
|
||||
return Response.status(Response.Status.BAD_REQUEST).build();
|
||||
}
|
||||
NodeTimelineCollectorManager collectorManager =
|
||||
(NodeTimelineCollectorManager) context.getAttribute(
|
||||
NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
|
||||
TimelineCollector collector = collectorManager.get(appID);
|
||||
if (collector == null) {
|
||||
LOG.error("Application: " + appId + " is not found");
|
||||
throw new NotFoundException(); // different exception?
|
||||
}
|
||||
|
||||
domain.setOwner(callerUgi.getShortUserName());
|
||||
collector.putDomain(domain, callerUgi);
|
||||
|
||||
return Response.ok().build();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error putting entities", e);
|
||||
throw new WebApplicationException(e,
|
||||
Response.Status.INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
private static ApplicationId parseApplicationId(String appId) {
|
||||
try {
|
||||
if (appId != null) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||
|
@ -88,6 +89,14 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
|||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineWriteResponse write(TimelineCollectorContext context,
|
||||
TimelineDomain domain)
|
||||
throws IOException {
|
||||
// TODO implementation for storing domain into FileSystem
|
||||
return null;
|
||||
}
|
||||
|
||||
private synchronized void write(String clusterId, String userId,
|
||||
String flowName, String flowVersion, long flowRun, String appId,
|
||||
TimelineEntity entity, TimelineWriteResponse response)
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||
|
@ -50,6 +51,20 @@ public interface TimelineWriter extends Service {
|
|||
TimelineWriteResponse write(TimelineCollectorContext context,
|
||||
TimelineEntities data, UserGroupInformation callerUgi) throws IOException;
|
||||
|
||||
/**
|
||||
* Stores {@link TimelineDomain} object to the timeline
|
||||
* store. Any errors occurring for individual write request objects will be
|
||||
* reported in the response.
|
||||
*
|
||||
* @param context a {@link TimelineCollectorContext}
|
||||
* @param domain a {@link TimelineDomain} object.
|
||||
* @return a {@link TimelineWriteResponse} object.
|
||||
* @throws IOException if there is any exception encountered while storing or
|
||||
* writing entities to the back end storage.
|
||||
*/
|
||||
TimelineWriteResponse write(TimelineCollectorContext context,
|
||||
TimelineDomain domain) throws IOException;
|
||||
|
||||
/**
|
||||
* Aggregates the entity information to the timeline store based on which
|
||||
* track this entity is to be rolled up to The tracks along which aggregations
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
|
@ -177,6 +178,37 @@ public class TestTimelineCollector {
|
|||
verify(writer, never()).flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test TimelineCollector's interaction with TimelineWriter upon
|
||||
* putDomain() calls.
|
||||
*/
|
||||
@Test public void testPutDomain() throws IOException {
|
||||
TimelineWriter writer = mock(TimelineWriter.class);
|
||||
TimelineCollector collector = new TimelineCollectorForTest(writer);
|
||||
|
||||
TimelineDomain domain =
|
||||
generateDomain("id", "desc", "owner", "reader1,reader2", "writer", 0L,
|
||||
1L);
|
||||
collector.putDomain(domain, UserGroupInformation.createRemoteUser("owner"));
|
||||
|
||||
verify(writer, times(1))
|
||||
.write(any(TimelineCollectorContext.class), any(TimelineDomain.class));
|
||||
verify(writer, times(1)).flush();
|
||||
}
|
||||
|
||||
private static TimelineDomain generateDomain(String id, String desc,
|
||||
String owner, String reader, String writer, Long cTime, Long mTime) {
|
||||
TimelineDomain domain = new TimelineDomain();
|
||||
domain.setId(id);
|
||||
domain.setDescription(desc);
|
||||
domain.setOwner(owner);
|
||||
domain.setReaders(reader);
|
||||
domain.setWriters(writer);
|
||||
domain.setCreatedTime(cTime);
|
||||
domain.setModifiedTime(mTime);
|
||||
return domain;
|
||||
}
|
||||
|
||||
private static class TimelineCollectorForTest extends TimelineCollector {
|
||||
private final TimelineCollectorContext context =
|
||||
new TimelineCollectorContext();
|
||||
|
|
Loading…
Reference in New Issue