diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java
new file mode 100644
index 00000000000..15db9a1185c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineDomain.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * This class contains the information about a timeline service domain, which is
+ * used to a user to host a number of timeline entities, isolating them from
+ * others'. The user can also define the reader and writer users/groups for
+ * the domain, which is used to control the access to its entities.
+ *
+ *
+ * The reader and writer users/groups pattern that the user can supply is the
+ * same as what AccessControlList
takes.
+ *
+ */
+@XmlRootElement(name = "domain")
+@XmlAccessorType(XmlAccessType.NONE)
+@Public
+@Evolving
+public class TimelineDomain {
+
+ private String id;
+ private String description;
+ private String owner;
+ private String readers;
+ private String writers;
+ private Long createdTime;
+ private Long modifiedTime;
+
+ public TimelineDomain() {
+ }
+
+ /**
+ * Get the domain ID.
+ * @return the domain ID
+ */
+ @XmlElement(name = "id")
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Set the domain ID.
+ * @param id the domain ID
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Get the domain description.
+ * @return the domain description
+ */
+ @XmlElement(name = "description")
+ public String getDescription() {
+ return description;
+ }
+
+ /**
+ * Set the domain description.
+ * @param description the domain description
+ */
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ /**
+ * Get the domain owner.
+ * @return the domain owner
+ */
+ @XmlElement(name = "owner")
+ public String getOwner() {
+ return owner;
+ }
+
+ /**
+ * Set the domain owner. The user doesn't need to set it, which will
+ * automatically set to the user who puts the domain.
+ * @param owner the domain owner
+ */
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ /**
+ * Get the reader (and/or reader group) list string.
+ * @return the reader (and/or reader group) list string
+ */
+ @XmlElement(name = "readers")
+ public String getReaders() {
+ return readers;
+ }
+
+ /**
+ * Set the reader (and/or reader group) list string.
+ * @param readers the reader (and/or reader group) list string
+ */
+ public void setReaders(String readers) {
+ this.readers = readers;
+ }
+
+ /**
+ * Get the writer (and/or writer group) list string.
+ * @return the writer (and/or writer group) list string
+ */
+ @XmlElement(name = "writers")
+ public String getWriters() {
+ return writers;
+ }
+
+ /**
+ * Set the writer (and/or writer group) list string.
+ * @param writers the writer (and/or writer group) list string
+ */
+ public void setWriters(String writers) {
+ this.writers = writers;
+ }
+
+ /**
+ * Get the created time of the domain.
+ * @return the created time of the domain
+ */
+ @XmlElement(name = "createdtime")
+ public Long getCreatedTime() {
+ return createdTime;
+ }
+
+ /**
+ * Set the created time of the domain.
+ * @param createdTime the created time of the domain
+ */
+ public void setCreatedTime(Long createdTime) {
+ this.createdTime = createdTime;
+ }
+
+ /**
+ * Get the modified time of the domain.
+ * @return the modified time of the domain
+ */
+ @XmlElement(name = "modifiedtime")
+ public Long getModifiedTime() {
+ return modifiedTime;
+ }
+
+ /**
+ * Set the modified time of the domain.
+ * @param modifiedTime the modified time of the domain
+ */
+ public void setModifiedTime(Long modifiedTime) {
+ this.modifiedTime = modifiedTime;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java
new file mode 100644
index 00000000000..2932e0c5fb1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageDomain.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for timeline domain.
+ */
+public class TestHBaseTimelineStorageDomain {
+
+ private static HBaseTestingUtility util;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ DataGeneratorForTest.createSchema(util.getConfiguration());
+ }
+
+ @Test
+ public void testDomainIdTable() throws Exception {
+ long l = System.currentTimeMillis();
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ String clusterId = "yarn-cluster";
+ TimelineCollectorContext context =
+ new TimelineCollectorContext(clusterId, null, null, null, null, null);
+ TimelineDomain domain2;
+ try {
+ hbi = new HBaseTimelineWriterImpl();
+ hbi.init(c1);
+
+ // write empty domain
+ domain2 = new TimelineDomain();
+ domain2.setCreatedTime(l);
+ domain2.setDescription("domain-2");
+ domain2.setId("domain-2");
+ domain2.setModifiedTime(l);
+ domain2.setOwner("owner1");
+ domain2.setReaders("user1,user2 group1,group2");
+ domain2.setWriters("writer1,writer2");
+ hbi.write(context, domain2);
+
+ // flush everything to hbase
+ hbi.flush();
+ } finally {
+ if (hbi != null) {
+ hbi.close();
+ }
+ }
+
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(BaseTableRW
+ .getTableName(c1, DomainTableRW.TABLE_NAME_CONF_NAME,
+ DomainTableRW.DEFAULT_TABLE_NAME));
+
+ byte[] startRow = new DomainRowKey(clusterId, domain2.getId()).getRowKey();
+ Get g = new Get(startRow);
+ Result result = table1.get(g);
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+
+ byte[] row = result.getRow();
+ DomainRowKey domainRowKey = DomainRowKey.parseRowKey(row);
+ assertEquals(domain2.getId(), domainRowKey.getDomainId());
+ assertEquals(clusterId, domainRowKey.getClusterId());
+
+ Long cTime =
+ (Long) ColumnRWHelper.readResult(result, DomainColumn.CREATED_TIME);
+ String description =
+ (String) ColumnRWHelper.readResult(result, DomainColumn.DESCRIPTION);
+ Long mTime = (Long) ColumnRWHelper
+ .readResult(result, DomainColumn.MODIFICATION_TIME);
+ String owners =
+ (String) ColumnRWHelper.readResult(result, DomainColumn.OWNER);
+ String readers =
+ (String) ColumnRWHelper.readResult(result, DomainColumn.READERS);
+ String writers =
+ (String) ColumnRWHelper.readResult(result, DomainColumn.WRITERS);
+
+ assertEquals(l, cTime.longValue());
+ assertEquals(l, mTime.longValue());
+ assertEquals("domain-2", description);
+ assertEquals("owner1", owners);
+ assertEquals("user1,user2 group1,group2", readers);
+ assertEquals("writer1,writer2", writers);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 29390c52475..3414a56ef35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -56,6 +57,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConve
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
@@ -101,6 +106,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private TypedBufferedMutator flowActivityTable;
private TypedBufferedMutator flowRunTable;
private TypedBufferedMutator subApplicationTable;
+ private TypedBufferedMutator domainTable;
/**
* Used to convert strings key components to and from storage format.
@@ -139,6 +145,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
new FlowActivityTableRW().getTableMutator(hbaseConf, conn);
subApplicationTable =
new SubApplicationTableRW().getTableMutator(hbaseConf, conn);
+ domainTable = new DomainTableRW().getTableMutator(hbaseConf, conn);
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
UserGroupInformation.getLoginUser() :
@@ -231,6 +238,41 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return putStatus;
}
+ @Override
+ public TimelineWriteResponse write(TimelineCollectorContext context,
+ TimelineDomain domain)
+ throws IOException {
+ TimelineWriteResponse putStatus = new TimelineWriteResponse();
+
+ String clusterId = context.getClusterId();
+ String domainId = domain.getId();
+
+ // defensive coding to avoid NPE during row key construction
+ if (clusterId == null) {
+ LOG.warn(
+ "Found null for clusterId. Not proceeding with writing to hbase");
+ return putStatus;
+ }
+
+ DomainRowKey domainRowKey = new DomainRowKey(clusterId, domainId);
+ byte[] rowKey = domainRowKey.getRowKey();
+
+ ColumnRWHelper.store(rowKey, domainTable, DomainColumn.CREATED_TIME, null,
+ domain.getCreatedTime());
+ ColumnRWHelper.store(rowKey, domainTable, DomainColumn.DESCRIPTION, null,
+ domain.getDescription());
+ ColumnRWHelper
+ .store(rowKey, domainTable, DomainColumn.MODIFICATION_TIME, null,
+ domain.getModifiedTime());
+ ColumnRWHelper.store(rowKey, domainTable, DomainColumn.OWNER, null,
+ domain.getOwner());
+ ColumnRWHelper.store(rowKey, domainTable, DomainColumn.READERS, null,
+ domain.getReaders());
+ ColumnRWHelper.store(rowKey, domainTable, DomainColumn.WRITERS, null,
+ domain.getWriters());
+ return putStatus;
+ }
+
private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
String clusterId, String appId, String userId, String flowVersion,
TimelineEntity te, long appCreatedTimeStamp)
@@ -568,6 +610,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
flowRunTable.flush();
flowActivityTable.flush();
subApplicationTable.flush();
+ domainTable.flush();
}
/**
@@ -603,6 +646,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (subApplicationTable != null) {
subApplicationTable.close();
}
+ if (domainTable != null) {
+ domainTable.close();
+ }
if (conn != null) {
LOG.info("closing the hbase Connection");
conn.close();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java
index 1d58e40245e..d2ae18ec181 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/domain/DomainTableRW.java
@@ -44,7 +44,7 @@ public class DomainTableRW extends BaseTableRW {
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/** default value for domain table name. */
- private static final String DEFAULT_TABLE_NAME = "timelineservice.domain";
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.domain";
private static final Logger LOG =
LoggerFactory.getLogger(DomainTableRW.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 8202431459d..e9eeb436ed1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -153,6 +154,33 @@ public abstract class TimelineCollector extends CompositeService {
return response;
}
+ /**
+ * Add or update an domain. If the domain already exists, only the owner
+ * and the admin can update it.
+ *
+ * @param domain domain to post
+ * @param callerUgi the caller UGI
+ * @return the response that contains the result of the post.
+ * @throws IOException if there is any exception encountered while putting
+ * domain.
+ */
+ public TimelineWriteResponse putDomain(TimelineDomain domain,
+ UserGroupInformation callerUgi) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "putDomain(domain=" + domain + ", callerUgi=" + callerUgi + ")");
+ }
+
+ TimelineWriteResponse response;
+ synchronized (writer) {
+ final TimelineCollectorContext context = getTimelineEntityContext();
+ response = writer.write(context, domain);
+ flushBufferedTimelineEntities();
+ }
+
+ return response;
+ }
+
private TimelineWriteResponse writeTimelineEntities(
TimelineEntities entities, UserGroupInformation callerUgi)
throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index 272b478c7be..61dcf9972ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -185,6 +186,55 @@ public class TimelineCollectorWebService {
}
}
+ /**
+ * @param req Servlet request.
+ * @param res Servlet response.
+ * @param domain timeline domain to be put.
+ * @param appId Application Id to which the domain to be put belong to. If
+ * appId is not there or it cannot be parsed, HTTP 400 will be sent back.
+ * @return a Response with appropriate HTTP status.
+ */
+ @PUT
+ @Path("/domain")
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */ })
+ public Response putDomain(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("appid") String appId,
+ TimelineDomain domain) {
+ init(res);
+ UserGroupInformation callerUgi = getUser(req);
+ if (callerUgi == null) {
+ String msg = "The owner of the posted timeline entities is not set";
+ LOG.error(msg);
+ throw new ForbiddenException(msg);
+ }
+
+ try {
+ ApplicationId appID = parseApplicationId(appId);
+ if (appID == null) {
+ return Response.status(Response.Status.BAD_REQUEST).build();
+ }
+ NodeTimelineCollectorManager collectorManager =
+ (NodeTimelineCollectorManager) context.getAttribute(
+ NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
+ TimelineCollector collector = collectorManager.get(appID);
+ if (collector == null) {
+ LOG.error("Application: " + appId + " is not found");
+ throw new NotFoundException(); // different exception?
+ }
+
+ domain.setOwner(callerUgi.getShortUserName());
+ collector.putDomain(domain, callerUgi);
+
+ return Response.ok().build();
+ } catch (Exception e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
private static ApplicationId parseApplicationId(String appId) {
try {
if (appId != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index ee4197000bf..ac0902fc2b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
@@ -88,6 +89,14 @@ public class FileSystemTimelineWriterImpl extends AbstractService
return response;
}
+ @Override
+ public TimelineWriteResponse write(TimelineCollectorContext context,
+ TimelineDomain domain)
+ throws IOException {
+ // TODO implementation for storing domain into FileSystem
+ return null;
+ }
+
private synchronized void write(String clusterId, String userId,
String flowName, String flowVersion, long flowRun, String appId,
TimelineEntity entity, TimelineWriteResponse response)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 12bc1cb3f0e..08cfc8becb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
@@ -50,6 +51,20 @@ public interface TimelineWriter extends Service {
TimelineWriteResponse write(TimelineCollectorContext context,
TimelineEntities data, UserGroupInformation callerUgi) throws IOException;
+ /**
+ * Stores {@link TimelineDomain} object to the timeline
+ * store. Any errors occurring for individual write request objects will be
+ * reported in the response.
+ *
+ * @param context a {@link TimelineCollectorContext}
+ * @param domain a {@link TimelineDomain} object.
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException if there is any exception encountered while storing or
+ * writing entities to the back end storage.
+ */
+ TimelineWriteResponse write(TimelineCollectorContext context,
+ TimelineDomain domain) throws IOException;
+
/**
* Aggregates the entity information to the timeline store based on which
* track this entity is to be rolled up to The tracks along which aggregations
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
index ec454284dab..88e4f25f720 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -177,6 +178,37 @@ public class TestTimelineCollector {
verify(writer, never()).flush();
}
+ /**
+ * Test TimelineCollector's interaction with TimelineWriter upon
+ * putDomain() calls.
+ */
+ @Test public void testPutDomain() throws IOException {
+ TimelineWriter writer = mock(TimelineWriter.class);
+ TimelineCollector collector = new TimelineCollectorForTest(writer);
+
+ TimelineDomain domain =
+ generateDomain("id", "desc", "owner", "reader1,reader2", "writer", 0L,
+ 1L);
+ collector.putDomain(domain, UserGroupInformation.createRemoteUser("owner"));
+
+ verify(writer, times(1))
+ .write(any(TimelineCollectorContext.class), any(TimelineDomain.class));
+ verify(writer, times(1)).flush();
+ }
+
+ private static TimelineDomain generateDomain(String id, String desc,
+ String owner, String reader, String writer, Long cTime, Long mTime) {
+ TimelineDomain domain = new TimelineDomain();
+ domain.setId(id);
+ domain.setDescription(desc);
+ domain.setOwner(owner);
+ domain.setReaders(reader);
+ domain.setWriters(writer);
+ domain.setCreatedTime(cTime);
+ domain.setModifiedTime(mTime);
+ return domain;
+ }
+
private static class TimelineCollectorForTest extends TimelineCollector {
private final TimelineCollectorContext context =
new TimelineCollectorContext();