From e3b7d7ac1694b8766ae11bc7e8ecf09763bb26db Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Wed, 16 May 2018 10:29:38 -0700 Subject: [PATCH] YARN-7933. [atsv2 read acls] Add TimelineWriter#writeDomain. (Rohith Sharma K S via Haibo Chen) --- .../timelineservice/TimelineDomain.java | 178 ++++++++++++++++++ .../TestHBaseTimelineStorageDomain.java | 126 +++++++++++++ .../storage/HBaseTimelineWriterImpl.java | 46 +++++ .../storage/domain/DomainTableRW.java | 2 +- .../collector/TimelineCollector.java | 28 +++ .../TimelineCollectorWebService.java | 50 +++++ .../storage/FileSystemTimelineWriterImpl.java | 9 + .../storage/TimelineWriter.java | 15 ++ .../collector/TestTimelineCollector.java | 32 ++++ 9 files changed, 485 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java new file mode 100644 index 00000000000..15db9a1185c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + *

+ * This class contains the information about a timeline service domain, which is + * used to a user to host a number of timeline entities, isolating them from + * others'. The user can also define the reader and writer users/groups for + * the domain, which is used to control the access to its entities. + *

+ *

+ * The reader and writer users/groups pattern that the user can supply is the + * same as what AccessControlList takes. + *

+ */ +@XmlRootElement(name = "domain") +@XmlAccessorType(XmlAccessType.NONE) +@Public +@Evolving +public class TimelineDomain { + + private String id; + private String description; + private String owner; + private String readers; + private String writers; + private Long createdTime; + private Long modifiedTime; + + public TimelineDomain() { + } + + /** + * Get the domain ID. + * @return the domain ID + */ + @XmlElement(name = "id") + public String getId() { + return id; + } + + /** + * Set the domain ID. + * @param id the domain ID + */ + public void setId(String id) { + this.id = id; + } + + /** + * Get the domain description. + * @return the domain description + */ + @XmlElement(name = "description") + public String getDescription() { + return description; + } + + /** + * Set the domain description. + * @param description the domain description + */ + public void setDescription(String description) { + this.description = description; + } + + /** + * Get the domain owner. + * @return the domain owner + */ + @XmlElement(name = "owner") + public String getOwner() { + return owner; + } + + /** + * Set the domain owner. The user doesn't need to set it, which will + * automatically set to the user who puts the domain. + * @param owner the domain owner + */ + public void setOwner(String owner) { + this.owner = owner; + } + + /** + * Get the reader (and/or reader group) list string. + * @return the reader (and/or reader group) list string + */ + @XmlElement(name = "readers") + public String getReaders() { + return readers; + } + + /** + * Set the reader (and/or reader group) list string. + * @param readers the reader (and/or reader group) list string + */ + public void setReaders(String readers) { + this.readers = readers; + } + + /** + * Get the writer (and/or writer group) list string. + * @return the writer (and/or writer group) list string + */ + @XmlElement(name = "writers") + public String getWriters() { + return writers; + } + + /** + * Set the writer (and/or writer group) list string. + * @param writers the writer (and/or writer group) list string + */ + public void setWriters(String writers) { + this.writers = writers; + } + + /** + * Get the created time of the domain. + * @return the created time of the domain + */ + @XmlElement(name = "createdtime") + public Long getCreatedTime() { + return createdTime; + } + + /** + * Set the created time of the domain. + * @param createdTime the created time of the domain + */ + public void setCreatedTime(Long createdTime) { + this.createdTime = createdTime; + } + + /** + * Get the modified time of the domain. + * @return the modified time of the domain + */ + @XmlElement(name = "modifiedtime") + public Long getModifiedTime() { + return modifiedTime; + } + + /** + * Set the modified time of the domain. + * @param modifiedTime the modified time of the domain + */ + public void setModifiedTime(Long modifiedTime) { + this.modifiedTime = modifiedTime; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java new file mode 100644 index 00000000000..2932e0c5fb1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test for timeline domain. + */ +public class TestHBaseTimelineStorageDomain { + + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + } + + @Test + public void testDomainIdTable() throws Exception { + long l = System.currentTimeMillis(); + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String clusterId = "yarn-cluster"; + TimelineCollectorContext context = + new TimelineCollectorContext(clusterId, null, null, null, null, null); + TimelineDomain domain2; + try { + hbi = new HBaseTimelineWriterImpl(); + hbi.init(c1); + + // write empty domain + domain2 = new TimelineDomain(); + domain2.setCreatedTime(l); + domain2.setDescription("domain-2"); + domain2.setId("domain-2"); + domain2.setModifiedTime(l); + domain2.setOwner("owner1"); + domain2.setReaders("user1,user2 group1,group2"); + domain2.setWriters("writer1,writer2"); + hbi.write(context, domain2); + + // flush everything to hbase + hbi.flush(); + } finally { + if (hbi != null) { + hbi.close(); + } + } + + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(BaseTableRW + .getTableName(c1, DomainTableRW.TABLE_NAME_CONF_NAME, + DomainTableRW.DEFAULT_TABLE_NAME)); + + byte[] startRow = new DomainRowKey(clusterId, domain2.getId()).getRowKey(); + Get g = new Get(startRow); + Result result = table1.get(g); + assertNotNull(result); + assertTrue(!result.isEmpty()); + + byte[] row = result.getRow(); + DomainRowKey domainRowKey = DomainRowKey.parseRowKey(row); + assertEquals(domain2.getId(), domainRowKey.getDomainId()); + assertEquals(clusterId, domainRowKey.getClusterId()); + + Long cTime = + (Long) ColumnRWHelper.readResult(result, DomainColumn.CREATED_TIME); + String description = + (String) ColumnRWHelper.readResult(result, DomainColumn.DESCRIPTION); + Long mTime = (Long) ColumnRWHelper + .readResult(result, DomainColumn.MODIFICATION_TIME); + String owners = + (String) ColumnRWHelper.readResult(result, DomainColumn.OWNER); + String readers = + (String) ColumnRWHelper.readResult(result, DomainColumn.READERS); + String writers = + (String) ColumnRWHelper.readResult(result, DomainColumn.WRITERS); + + assertEquals(l, cTime.longValue()); + assertEquals(l, mTime.longValue()); + assertEquals("domain-2", description); + assertEquals("owner1", owners); + assertEquals("user1,user2 group1,group2", readers); + assertEquals("writer1,writer2", writers); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 29390c52475..3414a56ef35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @@ -56,6 +57,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConve import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; @@ -101,6 +106,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private TypedBufferedMutator flowActivityTable; private TypedBufferedMutator flowRunTable; private TypedBufferedMutator subApplicationTable; + private TypedBufferedMutator domainTable; /** * Used to convert strings key components to and from storage format. @@ -139,6 +145,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements new FlowActivityTableRW().getTableMutator(hbaseConf, conn); subApplicationTable = new SubApplicationTableRW().getTableMutator(hbaseConf, conn); + domainTable = new DomainTableRW().getTableMutator(hbaseConf, conn); UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ? UserGroupInformation.getLoginUser() : @@ -231,6 +238,41 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return putStatus; } + @Override + public TimelineWriteResponse write(TimelineCollectorContext context, + TimelineDomain domain) + throws IOException { + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + + String clusterId = context.getClusterId(); + String domainId = domain.getId(); + + // defensive coding to avoid NPE during row key construction + if (clusterId == null) { + LOG.warn( + "Found null for clusterId. Not proceeding with writing to hbase"); + return putStatus; + } + + DomainRowKey domainRowKey = new DomainRowKey(clusterId, domainId); + byte[] rowKey = domainRowKey.getRowKey(); + + ColumnRWHelper.store(rowKey, domainTable, DomainColumn.CREATED_TIME, null, + domain.getCreatedTime()); + ColumnRWHelper.store(rowKey, domainTable, DomainColumn.DESCRIPTION, null, + domain.getDescription()); + ColumnRWHelper + .store(rowKey, domainTable, DomainColumn.MODIFICATION_TIME, null, + domain.getModifiedTime()); + ColumnRWHelper.store(rowKey, domainTable, DomainColumn.OWNER, null, + domain.getOwner()); + ColumnRWHelper.store(rowKey, domainTable, DomainColumn.READERS, null, + domain.getReaders()); + ColumnRWHelper.store(rowKey, domainTable, DomainColumn.WRITERS, null, + domain.getWriters()); + return putStatus; + } + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, String clusterId, String appId, String userId, String flowVersion, TimelineEntity te, long appCreatedTimeStamp) @@ -568,6 +610,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements flowRunTable.flush(); flowActivityTable.flush(); subApplicationTable.flush(); + domainTable.flush(); } /** @@ -603,6 +646,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (subApplicationTable != null) { subApplicationTable.close(); } + if (domainTable != null) { + domainTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java index 1d58e40245e..d2ae18ec181 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java @@ -44,7 +44,7 @@ public class DomainTableRW extends BaseTableRW { public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; /** default value for domain table name. */ - private static final String DEFAULT_TABLE_NAME = "timelineservice.domain"; + public static final String DEFAULT_TABLE_NAME = "timelineservice.domain"; private static final Logger LOG = LoggerFactory.getLogger(DomainTableRW.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8202431459d..e9eeb436ed1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -153,6 +154,33 @@ public abstract class TimelineCollector extends CompositeService { return response; } + /** + * Add or update an domain. If the domain already exists, only the owner + * and the admin can update it. + * + * @param domain domain to post + * @param callerUgi the caller UGI + * @return the response that contains the result of the post. + * @throws IOException if there is any exception encountered while putting + * domain. + */ + public TimelineWriteResponse putDomain(TimelineDomain domain, + UserGroupInformation callerUgi) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug( + "putDomain(domain=" + domain + ", callerUgi=" + callerUgi + ")"); + } + + TimelineWriteResponse response; + synchronized (writer) { + final TimelineCollectorContext context = getTimelineEntityContext(); + response = writer.write(context, domain); + flushBufferedTimelineEntities(); + } + + return response; + } + private TimelineWriteResponse writeTimelineEntities( TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 272b478c7be..61dcf9972ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; @@ -185,6 +186,55 @@ public class TimelineCollectorWebService { } } + /** + * @param req Servlet request. + * @param res Servlet response. + * @param domain timeline domain to be put. + * @param appId Application Id to which the domain to be put belong to. If + * appId is not there or it cannot be parsed, HTTP 400 will be sent back. + * @return a Response with appropriate HTTP status. + */ + @PUT + @Path("/domain") + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */ }) + public Response putDomain( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("appid") String appId, + TimelineDomain domain) { + init(res); + UserGroupInformation callerUgi = getUser(req); + if (callerUgi == null) { + String msg = "The owner of the posted timeline entities is not set"; + LOG.error(msg); + throw new ForbiddenException(msg); + } + + try { + ApplicationId appID = parseApplicationId(appId); + if (appID == null) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + NodeTimelineCollectorManager collectorManager = + (NodeTimelineCollectorManager) context.getAttribute( + NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY); + TimelineCollector collector = collectorManager.get(appID); + if (collector == null) { + LOG.error("Application: " + appId + " is not found"); + throw new NotFoundException(); // different exception? + } + + domain.setOwner(callerUgi.getShortUserName()); + collector.putDomain(domain, callerUgi); + + return Response.ok().build(); + } catch (Exception e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + private static ApplicationId parseApplicationId(String appId) { try { if (appId != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index ee4197000bf..ac0902fc2b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; @@ -88,6 +89,14 @@ public class FileSystemTimelineWriterImpl extends AbstractService return response; } + @Override + public TimelineWriteResponse write(TimelineCollectorContext context, + TimelineDomain domain) + throws IOException { + // TODO implementation for storing domain into FileSystem + return null; + } + private synchronized void write(String clusterId, String userId, String flowName, String flowVersion, long flowRun, String appId, TimelineEntity entity, TimelineWriteResponse response) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 12bc1cb3f0e..08cfc8becb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; @@ -50,6 +51,20 @@ public interface TimelineWriter extends Service { TimelineWriteResponse write(TimelineCollectorContext context, TimelineEntities data, UserGroupInformation callerUgi) throws IOException; + /** + * Stores {@link TimelineDomain} object to the timeline + * store. Any errors occurring for individual write request objects will be + * reported in the response. + * + * @param context a {@link TimelineCollectorContext} + * @param domain a {@link TimelineDomain} object. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException if there is any exception encountered while storing or + * writing entities to the back end storage. + */ + TimelineWriteResponse write(TimelineCollectorContext context, + TimelineDomain domain) throws IOException; + /** * Aggregates the entity information to the timeline store based on which * track this entity is to be rolled up to The tracks along which aggregations diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index ec454284dab..88e4f25f720 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -177,6 +178,37 @@ public class TestTimelineCollector { verify(writer, never()).flush(); } + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putDomain() calls. + */ + @Test public void testPutDomain() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineDomain domain = + generateDomain("id", "desc", "owner", "reader1,reader2", "writer", 0L, + 1L); + collector.putDomain(domain, UserGroupInformation.createRemoteUser("owner")); + + verify(writer, times(1)) + .write(any(TimelineCollectorContext.class), any(TimelineDomain.class)); + verify(writer, times(1)).flush(); + } + + private static TimelineDomain generateDomain(String id, String desc, + String owner, String reader, String writer, Long cTime, Long mTime) { + TimelineDomain domain = new TimelineDomain(); + domain.setId(id); + domain.setDescription(desc); + domain.setOwner(owner); + domain.setReaders(reader); + domain.setWriters(writer); + domain.setCreatedTime(cTime); + domain.setModifiedTime(mTime); + return domain; + } + private static class TimelineCollectorForTest extends TimelineCollector { private final TimelineCollectorContext context = new TimelineCollectorContext();