YARN-6734. Ensure sub-application user is extracted & sent to timeline service (Rohith Sharma K S via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-07-28 22:02:19 +05:30
parent 3fb71b1393
commit 9f6540535d
14 changed files with 465 additions and 217 deletions

View File

@ -36,6 +36,7 @@ import java.util.Set;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -334,16 +336,21 @@ public class TestTimelineReaderWebServicesHBaseStorage
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
Configuration c1 = getHBaseTestingUtility().getConfiguration(); Configuration c1 = getHBaseTestingUtility().getConfiguration();
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); runid, entity.getId()), te, remoteUser);
hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
hbi.write(cluster, user, flow2, runid, entity1.getId()), te1, remoteUser);
flowVersion2, runid2, entity3.getId(), te3); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
hbi.write(cluster, user, flow, flowVersion, runid, runid1, entity4.getId()), te4, remoteUser);
"application_1111111111_1111", userEntities); hbi.write(new TimelineCollectorContext(cluster, user, flow2, flowVersion2,
runid2, entity3.getId()), te3, remoteUser);
hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, "application_1111111111_1111"), userEntities, remoteUser);
writeApplicationEntities(hbi, ts); writeApplicationEntities(hbi, ts);
hbi.flush(); hbi.flush();
} finally { } finally {
@ -375,8 +382,9 @@ public class TestTimelineReaderWebServicesHBaseStorage
appEntity.addEvent(finished); appEntity.addEvent(finished);
te.addEntity(appEntity); te.addEntity(appEntity);
hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i, hbi.write(new TimelineCollectorContext("cluster1", "user1", "flow1",
appEntity.getId(), te); "CF7022C10F1354", i, appEntity.getId()), te,
UserGroupInformation.createRemoteUser("user1"));
} }
} }
} }

View File

@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
/** /**
* Utility class that creates the schema and generates test data. * Utility class that creates the schema and generates test data.
@ -155,17 +157,20 @@ public final class DataGeneratorForTest {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(util.getConfiguration()); hbi.init(util.getConfiguration());
hbi.start(); hbi.start();
String cluster = "cluster1"; UserGroupInformation remoteUser =
String user = "user1"; UserGroupInformation.createRemoteUser("user1");
String flow = "some_flow_name"; hbi.write(
String flowVersion = "AB7822C10F1111"; new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
long runid = 1002345678919L; "AB7822C10F1111", 1002345678919L, "application_1111111111_2222"),
String appName = "application_1111111111_2222"; te, remoteUser);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(
appName = "application_1111111111_3333"; new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); "AB7822C10F1111", 1002345678919L, "application_1111111111_3333"),
appName = "application_1111111111_4444"; te1, remoteUser);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); hbi.write(
new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
"AB7822C10F1111", 1002345678919L, "application_1111111111_4444"),
te2, remoteUser);
hbi.stop(); hbi.stop();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -433,15 +438,19 @@ public final class DataGeneratorForTest {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(util.getConfiguration()); hbi.init(util.getConfiguration());
hbi.start(); hbi.start();
String cluster = "cluster1";
String user = "user1"; UserGroupInformation user =
String flow = "some_flow_name"; UserGroupInformation.createRemoteUser("user1");
String flowVersion = "AB7822C10F1111"; TimelineCollectorContext context =
long runid = 1002345678919L; new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
hbi.write(cluster, user, flow, flowVersion, runid, appName1, te); "AB7822C10F1111", 1002345678919L, appName1);
hbi.write(cluster, user, flow, flowVersion, runid, appName2, te); hbi.write(context, te, user);
hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1); hbi.write(context, appTe1, user);
hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2);
context = new TimelineCollectorContext("cluster1", "user1",
"some_flow_name", "AB7822C10F1111", 1002345678919L, appName2);
hbi.write(context, te, user);
hbi.write(context, appTe2, user);
hbi.stop(); hbi.stop();
} finally { } finally {
if (hbi != null) { if (hbi != null) {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@ -162,7 +164,8 @@ public class TestHBaseTimelineStorageApps {
String flow = null; String flow = null;
String flowVersion = "AB7822C10F1111"; String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L; long runid = 1002345678919L;
hbi.write(cluster, user, flow, flowVersion, runid, appId, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appId), te, UserGroupInformation.createRemoteUser(user));
hbi.stop(); hbi.stop();
// retrieve the row // retrieve the row
@ -280,7 +283,8 @@ public class TestHBaseTimelineStorageApps {
String flow = "s!ome_f\tlow _n am!e"; String flow = "s!ome_f\tlow _n am!e";
String flowVersion = "AB7822C10F1111"; String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L; long runid = 1002345678919L;
hbi.write(cluster, user, flow, flowVersion, runid, appId, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appId), te, UserGroupInformation.createRemoteUser(user));
// Write entity again, this time without created time. // Write entity again, this time without created time.
entity = new ApplicationEntity(); entity = new ApplicationEntity();
@ -292,7 +296,8 @@ public class TestHBaseTimelineStorageApps {
entity.addInfo(infoMap1); entity.addInfo(infoMap1);
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entity); te.addEntity(entity);
hbi.write(cluster, user, flow, flowVersion, runid, appId, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appId), te, UserGroupInformation.createRemoteUser(user));
hbi.stop(); hbi.stop();
infoMap.putAll(infoMap1); infoMap.putAll(infoMap1);
@ -514,7 +519,9 @@ public class TestHBaseTimelineStorageApps {
String flowVersion = "1111F01C2287BA"; String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L; long runid = 1009876543218L;
String appName = "application_123465899910_1001"; String appName = "application_123465899910_1001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), entities,
UserGroupInformation.createRemoteUser(user));
hbi.stop(); hbi.stop();
// retrieve the row // retrieve the row
@ -629,14 +636,17 @@ public class TestHBaseTimelineStorageApps {
hbi.init(c1); hbi.init(c1);
hbi.start(); hbi.start();
// Writing application entity. // Writing application entity.
TimelineCollectorContext context = new TimelineCollectorContext("c1",
"u1", "f1", "v1", 1002345678919L, appId);
UserGroupInformation user = UserGroupInformation.createRemoteUser("u1");
try { try {
hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp); hbi.write(context, teApp, user);
Assert.fail("Expected an exception as metric values are non integral"); Assert.fail("Expected an exception as metric values are non integral");
} catch (IOException e) {} } catch (IOException e) {}
// Writing generic entity. // Writing generic entity.
try { try {
hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity); hbi.write(context, teEntity, user);
Assert.fail("Expected an exception as metric values are non integral"); Assert.fail("Expected an exception as metric values are non integral");
} catch (IOException e) {} } catch (IOException e) {}
hbi.stop(); hbi.stop();

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@ -72,6 +74,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -197,13 +204,16 @@ public class TestHBaseTimelineStorageEntities {
hbi.start(); hbi.start();
String cluster = "cluster_test_write_entity"; String cluster = "cluster_test_write_entity";
String user = "user1"; String user = "user1";
String subAppUser = "subAppUser1";
String flow = "some_flow_name"; String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111"; String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L; long runid = 1002345678919L;
String appName = HBaseTimelineStorageUtils.convertApplicationIdToString( String appName = HBaseTimelineStorageUtils.convertApplicationIdToString(
ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1) ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1)
); );
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te,
UserGroupInformation.createRemoteUser(subAppUser));
hbi.stop(); hbi.stop();
// scan the table and see that entity exists // scan the table and see that entity exists
@ -354,6 +364,11 @@ public class TestHBaseTimelineStorageEntities {
assertEquals(metricValues.get(ts - 20000), assertEquals(metricValues.get(ts - 20000),
metric.getValues().get(ts - 20000)); metric.getValues().get(ts - 20000));
} }
// verify for sub application table entities.
verifySubApplicationTableEntities(cluster, user, flow, flowVersion, runid,
appName, subAppUser, c1, entity, id, type, infoMap, isRelatedTo,
relatesTo, conf, metricValues, metrics, cTime, m1);
} finally { } finally {
if (hbi != null) { if (hbi != null) {
hbi.stop(); hbi.stop();
@ -362,6 +377,98 @@ public class TestHBaseTimelineStorageEntities {
} }
} }
private void verifySubApplicationTableEntities(String cluster, String user,
String flow, String flowVersion, Long runid, String appName,
String subAppUser, Configuration c1, TimelineEntity entity, String id,
String type, Map<String, Object> infoMap,
Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
Map<String, String> conf, Map<Long, Number> metricValues,
Set<TimelineMetric> metrics, Long cTime, TimelineMetric m1)
throws IOException {
Scan s = new Scan();
// read from SubApplicationTable
byte[] startRow = new SubApplicationRowKeyPrefix(cluster, subAppUser, null,
null, null, null).getRowKeyPrefix();
s.setStartRow(startRow);
s.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
ResultScanner scanner =
new SubApplicationTable().getResultScanner(c1, conn, s);
int rowCount = 0;
int colCount = 0;
KeyConverter<String> stringKeyConverter = new StringKeyConverter();
for (Result result : scanner) {
if (result != null && !result.isEmpty()) {
rowCount++;
colCount += result.size();
byte[] row1 = result.getRow();
assertTrue(verifyRowKeyForSubApplication(row1, subAppUser, cluster,
user, entity));
// check info column family
String id1 = SubApplicationColumn.ID.readResult(result).toString();
assertEquals(id, id1);
String type1 = SubApplicationColumn.TYPE.readResult(result).toString();
assertEquals(type, type1);
Long cTime1 =
(Long) SubApplicationColumn.CREATED_TIME.readResult(result);
assertEquals(cTime1, cTime);
Map<String, Object> infoColumns = SubApplicationColumnPrefix.INFO
.readResults(result, new StringKeyConverter());
assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
for (Map.Entry<String, Set<String>> isRelatedToEntry : isRelatedTo
.entrySet()) {
Object isRelatedToValue = SubApplicationColumnPrefix.IS_RELATED_TO
.readResult(result, isRelatedToEntry.getKey());
String compoundValue = isRelatedToValue.toString();
// id7?id9?id6
Set<String> isRelatedToValues =
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(),
isRelatedToValues.size());
for (String v : isRelatedToEntry.getValue()) {
assertTrue(isRelatedToValues.contains(v));
}
}
// RelatesTo
for (Map.Entry<String, Set<String>> relatesToEntry : relatesTo
.entrySet()) {
String compoundValue = SubApplicationColumnPrefix.RELATES_TO
.readResult(result, relatesToEntry.getKey()).toString();
// id3?id4?id5
Set<String> relatesToValues =
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
assertEquals(relatesTo.get(relatesToEntry.getKey()).size(),
relatesToValues.size());
for (String v : relatesToEntry.getValue()) {
assertTrue(relatesToValues.contains(v));
}
}
// Configuration
Map<String, Object> configColumns = SubApplicationColumnPrefix.CONFIG
.readResults(result, stringKeyConverter);
assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
SubApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result,
stringKeyConverter);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
matchMetrics(metricValues, metricMap);
}
}
assertEquals(1, rowCount);
assertEquals(16, colCount);
}
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, Long runid, String appName, TimelineEntity te) { String flow, Long runid, String appName, TimelineEntity te) {
@ -409,7 +516,9 @@ public class TestHBaseTimelineStorageEntities {
byte[] startRow = byte[] startRow =
new EntityRowKeyPrefix(cluster, user, flow, runid, appName) new EntityRowKeyPrefix(cluster, user, flow, runid, appName)
.getRowKeyPrefix(); .getRowKeyPrefix();
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), entities,
UserGroupInformation.createRemoteUser(user));
hbi.stop(); hbi.stop();
// scan the table and see that entity exists // scan the table and see that entity exists
Scan s = new Scan(); Scan s = new Scan();
@ -514,7 +623,9 @@ public class TestHBaseTimelineStorageEntities {
String flowVersion = "1111F01C2287BA"; String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L; long runid = 1009876543218L;
String appName = "application_123465899910_2001"; String appName = "application_123465899910_2001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), entities,
UserGroupInformation.createRemoteUser(user));
hbi.stop(); hbi.stop();
// read the timeline entity using the reader this time // read the timeline entity using the reader this time
@ -1762,4 +1873,15 @@ public class TestHBaseTimelineStorageEntities {
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster(); util.shutdownMiniCluster();
} }
private boolean verifyRowKeyForSubApplication(byte[] rowKey, String suAppUser,
String cluster, String user, TimelineEntity te) {
SubApplicationRowKey key = SubApplicationRowKey.parseRowKey(rowKey);
assertEquals(suAppUser, key.getSubAppUserId());
assertEquals(cluster, key.getClusterId());
assertEquals(te.getType(), key.getEntityType());
assertEquals(te.getId(), key.getEntityId());
assertEquals(user, key.getUserId());
return true;
}
} }

View File

@ -38,12 +38,14 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@ -117,13 +119,18 @@ public class TestHBaseStorageFlowActivity {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// write another entity with the right min start time // write another entity with the right min start time
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityMinStartTime); te.addEntity(entityMinStartTime);
appName = "application_100000000000_3333"; appName = "application_100000000000_3333";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// writer another entity for max end time // writer another entity for max end time
TimelineEntity entityMaxEndTime = TestFlowDataGenerator TimelineEntity entityMaxEndTime = TestFlowDataGenerator
@ -131,7 +138,8 @@ public class TestHBaseStorageFlowActivity {
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityMaxEndTime); te.addEntity(entityMaxEndTime);
appName = "application_100000000000_4444"; appName = "application_100000000000_4444";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// writer another entity with greater start time // writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
@ -139,7 +147,8 @@ public class TestHBaseStorageFlowActivity {
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityGreaterStartTime); te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222"; appName = "application_1000000000000000_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// flush everything to hbase // flush everything to hbase
hbi.flush(); hbi.flush();
@ -227,7 +236,8 @@ public class TestHBaseStorageFlowActivity {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
String appName = "application_1111999999_1234"; String appName = "application_1111999999_1234";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, UserGroupInformation.createRemoteUser(user));
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -340,20 +350,27 @@ public class TestHBaseStorageFlowActivity {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
String appName = "application_11888888888_1111"; String appName = "application_11888888888_1111";
hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion1,
runid1, appName), te, remoteUser);
// write an application with to this flow but a different runid/ version // write an application with to this flow but a different runid/ version
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityApp1); te.addEntity(entityApp1);
appName = "application_11888888888_2222"; appName = "application_11888888888_2222";
hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion2,
runid2, appName), te, remoteUser);
// write an application with to this flow but a different runid/ version // write an application with to this flow but a different runid/ version
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityApp1); te.addEntity(entityApp1);
appName = "application_11888888888_3333"; appName = "application_11888888888_3333";
hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion3,
runid3, appName), te, remoteUser);
hbi.flush(); hbi.flush();
} finally { } finally {

View File

@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@ -181,13 +183,18 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// write another entity with the right min start time // write another entity with the right min start time
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityMinStartTime); te.addEntity(entityMinStartTime);
appName = "application_100000000000_3333"; appName = "application_100000000000_3333";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// writer another entity for max end time // writer another entity for max end time
TimelineEntity entityMaxEndTime = TestFlowDataGenerator TimelineEntity entityMaxEndTime = TestFlowDataGenerator
@ -195,7 +202,8 @@ public class TestHBaseStorageFlowRun {
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityMaxEndTime); te.addEntity(entityMaxEndTime);
appName = "application_100000000000_4444"; appName = "application_100000000000_4444";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// writer another entity with greater start time // writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
@ -203,7 +211,8 @@ public class TestHBaseStorageFlowRun {
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityGreaterStartTime); te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222"; appName = "application_1000000000000000_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// flush everything to hbase // flush everything to hbase
hbi.flush(); hbi.flush();
@ -287,15 +296,19 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
String appName = "application_11111111111111_1111"; String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis()); .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
appName = "application_11111111111111_2222"; appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -556,15 +569,22 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
String appName = "application_11111111111111_1111"; String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
1002345678919L, appName), te,
remoteUser);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis()); .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
appName = "application_11111111111111_2222"; appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
1002345678918L, appName), te,
remoteUser);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -643,15 +663,20 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
String appName = "application_11111111111111_1111"; String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis()); .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
appName = "application_11111111111111_2222"; appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te, remoteUser);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -737,6 +762,8 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
for (int i = start; i < count; i++) { for (int i = start; i < count; i++) {
String appName = "application_1060350000000_" + appIdSuffix; String appName = "application_1060350000000_" + appIdSuffix;
@ -746,7 +773,8 @@ public class TestHBaseStorageFlowRun {
te1.addEntity(entityApp1); te1.addEntity(entityApp1);
entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
te1.addEntity(entityApp2); te1.addEntity(entityApp2);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te1, remoteUser);
Thread.sleep(1); Thread.sleep(1);
appName = "application_1001199480000_7" + appIdSuffix; appName = "application_1001199480000_7" + appIdSuffix;
@ -758,7 +786,9 @@ public class TestHBaseStorageFlowRun {
entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
te1.addEntity(entityApp2); te1.addEntity(entityApp2);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te1,
remoteUser);
if (i % 1000 == 0) { if (i % 1000 == 0) {
hbi.flush(); hbi.flush();
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
@ -826,16 +856,23 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, UserGroupInformation remoteUser =
"application_11111111111111_1111", te); UserGroupInformation.createRemoteUser(user);
hbi.write(
new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
1002345678919L, "application_11111111111111_1111"),
te, remoteUser);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
System.currentTimeMillis()); System.currentTimeMillis());
entityApp2.setCreatedTime(1425016502000L); entityApp2.setCreatedTime(1425016502000L);
te.addEntity(entityApp2); te.addEntity(entityApp2);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, hbi.write(
"application_11111111111111_2222", te); new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
1002345678918L, "application_11111111111111_2222"),
te, remoteUser);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -911,15 +948,22 @@ public class TestHBaseStorageFlowRun {
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, UserGroupInformation remoteUser =
"application_11111111111111_1111", te); UserGroupInformation.createRemoteUser(user);
hbi.write(
new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
1002345678919L, "application_11111111111111_1111"),
te, remoteUser);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
System.currentTimeMillis()); System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, hbi.write(
"application_11111111111111_2222", te); new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
1002345678918L, "application_11111111111111_2222"),
te, remoteUser);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {

View File

@ -48,8 +48,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
@ -280,9 +282,12 @@ public class TestHBaseStorageFlowRunCompaction {
Configuration c1 = util.getConfiguration(); Configuration c1 = util.getConfiguration();
TimelineEntities te1 = null; TimelineEntities te1 = null;
TimelineEntity entityApp1 = null; TimelineEntity entityApp1 = null;
UserGroupInformation remoteUser =
UserGroupInformation.createRemoteUser(user);
try { try {
hbi = new HBaseTimelineWriterImpl(); hbi = new HBaseTimelineWriterImpl();
hbi.init(c1); hbi.init(c1);
// now insert count * ( 100 + 100) metrics // now insert count * ( 100 + 100) metrics
// each call to getEntityMetricsApp1 brings back 100 values // each call to getEntityMetricsApp1 brings back 100 values
// of metric1 and 100 of metric2 // of metric1 and 100 of metric2
@ -292,14 +297,16 @@ public class TestHBaseStorageFlowRunCompaction {
te1 = new TimelineEntities(); te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1);
te1.addEntity(entityApp1); te1.addEntity(entityApp1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te1, remoteUser);
appName = "application_2048000000000_7" + appIdSuffix; appName = "application_2048000000000_7" + appIdSuffix;
insertTs++; insertTs++;
te1 = new TimelineEntities(); te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs);
te1.addEntity(entityApp1); te1.addEntity(entityApp1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te1, remoteUser);
} }
} finally { } finally {
String appName = "application_10240000000000_" + appIdSuffix; String appName = "application_10240000000000_" + appIdSuffix;
@ -308,7 +315,8 @@ public class TestHBaseStorageFlowRunCompaction {
insertTs + 1, c1); insertTs + 1, c1);
te1.addEntity(entityApp1); te1.addEntity(entityApp1);
if (hbi != null) { if (hbi != null) {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
runid, appName), te1, remoteUser);
hbi.flush(); hbi.flush();
hbi.close(); hbi.close();
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@ -63,6 +65,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -85,6 +91,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private TypedBufferedMutator<ApplicationTable> applicationTable; private TypedBufferedMutator<ApplicationTable> applicationTable;
private TypedBufferedMutator<FlowActivityTable> flowActivityTable; private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
private TypedBufferedMutator<FlowRunTable> flowRunTable; private TypedBufferedMutator<FlowRunTable> flowRunTable;
private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
/** /**
* Used to convert strings key components to and from storage format. * Used to convert strings key components to and from storage format.
@ -97,6 +104,10 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
*/ */
private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
private enum Tables {
APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE
};
public HBaseTimelineWriterImpl() { public HBaseTimelineWriterImpl() {
super(HBaseTimelineWriterImpl.class.getName()); super(HBaseTimelineWriterImpl.class.getName());
} }
@ -116,17 +127,28 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
flowActivityTable = flowActivityTable =
new FlowActivityTable().getTableMutator(hbaseConf, conn); new FlowActivityTable().getTableMutator(hbaseConf, conn);
subApplicationTable =
new SubApplicationTable().getTableMutator(hbaseConf, conn);
} }
/** /**
* Stores the entire information in TimelineEntities to the timeline store. * Stores the entire information in TimelineEntities to the timeline store.
*/ */
@Override @Override
public TimelineWriteResponse write(String clusterId, String userId, public TimelineWriteResponse write(TimelineCollectorContext context,
String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities data, UserGroupInformation callerUgi)
TimelineEntities data) throws IOException { throws IOException {
TimelineWriteResponse putStatus = new TimelineWriteResponse(); TimelineWriteResponse putStatus = new TimelineWriteResponse();
String clusterId = context.getClusterId();
String userId = context.getUserId();
String flowName = context.getFlowName();
String flowVersion = context.getFlowVersion();
long flowRunId = context.getFlowRunId();
String appId = context.getAppId();
String subApplicationUser = callerUgi.getShortUserName();
// defensive coding to avoid NPE during row key construction // defensive coding to avoid NPE during row key construction
if ((flowName == null) || (appId == null) || (clusterId == null) if ((flowName == null) || (appId == null) || (clusterId == null)
|| (userId == null)) { || (userId == null)) {
@ -152,18 +174,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
new ApplicationRowKey(clusterId, userId, flowName, flowRunId, new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
appId); appId);
rowKey = applicationRowKey.getRowKey(); rowKey = applicationRowKey.getRowKey();
store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE);
} else { } else {
EntityRowKey entityRowKey = EntityRowKey entityRowKey =
new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
te.getType(), te.getIdPrefix(), te.getId()); te.getType(), te.getIdPrefix(), te.getId());
rowKey = entityRowKey.getRowKey(); rowKey = entityRowKey.getRowKey();
store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
} }
storeInfo(rowKey, te, flowVersion, isApplication); if (!isApplication && !userId.equals(subApplicationUser)) {
storeEvents(rowKey, te.getEvents(), isApplication); SubApplicationRowKey subApplicationRowKey =
storeConfig(rowKey, te.getConfigs(), isApplication); new SubApplicationRowKey(subApplicationUser, clusterId,
storeMetrics(rowKey, te.getMetrics(), isApplication); te.getType(), te.getIdPrefix(), te.getId(), userId);
storeRelations(rowKey, te, isApplication); rowKey = subApplicationRowKey.getRowKey();
store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE);
}
if (isApplication) { if (isApplication) {
TimelineEvent event = TimelineEvent event =
@ -304,72 +330,108 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
} }
} }
private void storeRelations(byte[] rowKey, TimelineEntity te,
boolean isApplication) throws IOException {
if (isApplication) {
storeRelations(rowKey, te.getIsRelatedToEntities(),
ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
storeRelations(rowKey, te.getRelatesToEntities(),
ApplicationColumnPrefix.RELATES_TO, applicationTable);
} else {
storeRelations(rowKey, te.getIsRelatedToEntities(),
EntityColumnPrefix.IS_RELATED_TO, entityTable);
storeRelations(rowKey, te.getRelatesToEntities(),
EntityColumnPrefix.RELATES_TO, entityTable);
}
}
/** /**
* Stores the Relations from the {@linkplain TimelineEntity} object. * Stores the Relations from the {@linkplain TimelineEntity} object.
*/ */
private <T> void storeRelations(byte[] rowKey, private <T> void storeRelations(byte[] rowKey,
Map<String, Set<String>> connectedEntities, Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) TypedBufferedMutator<T> table) throws IOException {
throws IOException { if (connectedEntities != null) {
for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
.entrySet()) { .entrySet()) {
// id3?id4?id5 // id3?id4?id5
String compoundValue = String compoundValue =
Separator.VALUES.joinEncoded(connectedEntity.getValue()); Separator.VALUES.joinEncoded(connectedEntity.getValue());
columnPrefix.store(rowKey, table, columnPrefix.store(rowKey, table,
stringKeyConverter.encode(connectedEntity.getKey()), null, stringKeyConverter.encode(connectedEntity.getKey()), null,
compoundValue); compoundValue);
}
} }
} }
/** /**
* Stores information from the {@linkplain TimelineEntity} object. * Stores information from the {@linkplain TimelineEntity} object.
*/ */
private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, private void store(byte[] rowKey, TimelineEntity te,
boolean isApplication) throws IOException { String flowVersion,
Tables table) throws IOException {
if (isApplication) { switch (table) {
case APPLICATION_TABLE:
ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
te.getCreatedTime()); te.getCreatedTime());
ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
flowVersion); flowVersion);
Map<String, Object> info = te.getInfo(); storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
if (info != null) { applicationTable);
for (Map.Entry<String, Object> entry : info.entrySet()) { storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, applicationTable);
stringKeyConverter.encode(entry.getKey()), null, storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT,
entry.getValue()); applicationTable);
} storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG,
} applicationTable);
} else { storeRelations(rowKey, te.getIsRelatedToEntities(),
ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
storeRelations(rowKey, te.getRelatesToEntities(),
ApplicationColumnPrefix.RELATES_TO, applicationTable);
break;
case ENTITY_TABLE:
EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
te.getCreatedTime()); te.getCreatedTime());
EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
Map<String, Object> info = te.getInfo(); storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
if (info != null) { entityTable);
for (Map.Entry<String, Object> entry : info.entrySet()) { storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
EntityColumnPrefix.INFO.store(rowKey, entityTable, entityTable);
stringKeyConverter.encode(entry.getKey()), null, storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT,
entry.getValue()); entityTable);
} storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG,
entityTable);
storeRelations(rowKey, te.getIsRelatedToEntities(),
EntityColumnPrefix.IS_RELATED_TO, entityTable);
storeRelations(rowKey, te.getRelatesToEntities(),
EntityColumnPrefix.RELATES_TO, entityTable);
break;
case SUBAPPLICATION_TABLE:
SubApplicationColumn.ID.store(rowKey, subApplicationTable, null,
te.getId());
SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null,
te.getType());
SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null,
te.getCreatedTime());
SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null,
flowVersion);
storeInfo(rowKey, te.getInfo(), flowVersion,
SubApplicationColumnPrefix.INFO, subApplicationTable);
storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
subApplicationTable);
storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT,
subApplicationTable);
storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG,
subApplicationTable);
storeRelations(rowKey, te.getIsRelatedToEntities(),
SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable);
storeRelations(rowKey, te.getRelatesToEntities(),
SubApplicationColumnPrefix.RELATES_TO, subApplicationTable);
break;
default:
LOG.info("Invalid table name provided.");
break;
}
}
/**
* stores the info information from {@linkplain TimelineEntity}.
*/
private <T> void storeInfo(byte[] rowKey, Map<String, Object> info,
String flowVersion, ColumnPrefix<T> columnPrefix,
TypedBufferedMutator<T> table) throws IOException {
if (info != null) {
for (Map.Entry<String, Object> entry : info.entrySet()) {
columnPrefix.store(rowKey, table,
stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
} }
} }
} }
@ -377,19 +439,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
/** /**
* stores the config information from {@linkplain TimelineEntity}. * stores the config information from {@linkplain TimelineEntity}.
*/ */
private void storeConfig(byte[] rowKey, Map<String, String> config, private <T> void storeConfig(byte[] rowKey, Map<String, String> config,
boolean isApplication) throws IOException { ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
if (config == null) { throws IOException {
return; if (config != null) {
} for (Map.Entry<String, String> entry : config.entrySet()) {
for (Map.Entry<String, String> entry : config.entrySet()) { byte[] configKey = stringKeyConverter.encode(entry.getKey());
byte[] configKey = stringKeyConverter.encode(entry.getKey()); columnPrefix.store(rowKey, table, configKey, null, entry.getValue());
if (isApplication) {
ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
configKey, null, entry.getValue());
} else {
EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
null, entry.getValue());
} }
} }
} }
@ -398,8 +454,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
* stores the {@linkplain TimelineMetric} information from the * stores the {@linkplain TimelineMetric} information from the
* {@linkplain TimelineEvent} object. * {@linkplain TimelineEvent} object.
*/ */
private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics, private <T> void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
boolean isApplication) throws IOException { ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
throws IOException {
if (metrics != null) { if (metrics != null) {
for (TimelineMetric metric : metrics) { for (TimelineMetric metric : metrics) {
byte[] metricColumnQualifier = byte[] metricColumnQualifier =
@ -407,13 +464,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
Map<Long, Number> timeseries = metric.getValues(); Map<Long, Number> timeseries = metric.getValues();
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
Long timestamp = timeseriesEntry.getKey(); Long timestamp = timeseriesEntry.getKey();
if (isApplication) { columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp,
ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, timeseriesEntry.getValue());
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
} else {
EntityColumnPrefix.METRIC.store(rowKey, entityTable,
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
}
} }
} }
} }
@ -422,8 +474,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
/** /**
* Stores the events from the {@linkplain TimelineEvent} object. * Stores the events from the {@linkplain TimelineEvent} object.
*/ */
private void storeEvents(byte[] rowKey, Set<TimelineEvent> events, private <T> void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
boolean isApplication) throws IOException { ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
throws IOException {
if (events != null) { if (events != null) {
for (TimelineEvent event : events) { for (TimelineEvent event : events) {
if (event != null) { if (event != null) {
@ -441,26 +494,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
byte[] columnQualifierBytes = byte[] columnQualifierBytes =
new EventColumnName(eventId, eventTimestamp, null) new EventColumnName(eventId, eventTimestamp, null)
.getColumnQualifier(); .getColumnQualifier();
if (isApplication) { columnPrefix.store(rowKey, table, columnQualifierBytes, null,
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, Separator.EMPTY_BYTES);
columnQualifierBytes, null, Separator.EMPTY_BYTES);
} else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
columnQualifierBytes, null, Separator.EMPTY_BYTES);
}
} else { } else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) { for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
// eventId=infoKey // eventId=infoKey
byte[] columnQualifierBytes = byte[] columnQualifierBytes =
new EventColumnName(eventId, eventTimestamp, info.getKey()) new EventColumnName(eventId, eventTimestamp, info.getKey())
.getColumnQualifier(); .getColumnQualifier();
if (isApplication) { columnPrefix.store(rowKey, table, columnQualifierBytes, null,
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, info.getValue());
columnQualifierBytes, null, info.getValue());
} else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
columnQualifierBytes, null, info.getValue());
}
} // for info: eventInfo } // for info: eventInfo
} }
} }
@ -500,6 +543,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
applicationTable.flush(); applicationTable.flush();
flowRunTable.flush(); flowRunTable.flush();
flowActivityTable.flush(); flowActivityTable.flush();
subApplicationTable.flush();
} }
/** /**
@ -532,11 +576,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// The close API performs flushing and releases any resources held // The close API performs flushing and releases any resources held
flowActivityTable.close(); flowActivityTable.close();
} }
if (subApplicationTable != null) {
subApplicationTable.close();
}
if (conn != null) { if (conn != null) {
LOG.info("closing the hbase Connection"); LOG.info("closing the hbase Connection");
conn.close(); conn.close();
} }
super.serviceStop(); super.serviceStop();
} }
} }

View File

@ -56,26 +56,6 @@ public class SubApplicationRowKeyPrefix extends SubApplicationRowKey
userId); userId);
} }
/**
* Creates a prefix which generates the following rowKeyPrefixes for the sub
* application table:
* {@code subAppUserId!clusterId!entityType!entityPrefix!entityId!userId}.
*
* subAppUserId is usually the doAsUser.
* userId is the yarn user that the AM runs as.
*
* @param clusterId
* identifying the cluster
* @param subAppUserId
* identifying the sub app user
* @param userId
* identifying the user who runs the AM
*/
public SubApplicationRowKeyPrefix(String clusterId, String subAppUserId,
String userId) {
this(subAppUserId, clusterId, null, null, null, userId);
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *

View File

@ -139,7 +139,7 @@ public abstract class TimelineCollector extends CompositeService {
// flush the writer buffer concurrently and swallow any exception // flush the writer buffer concurrently and swallow any exception
// caused by the timeline enitites that are being put here. // caused by the timeline enitites that are being put here.
synchronized (writer) { synchronized (writer) {
response = writeTimelineEntities(entities); response = writeTimelineEntities(entities, callerUgi);
flushBufferedTimelineEntities(); flushBufferedTimelineEntities();
} }
@ -147,15 +147,14 @@ public abstract class TimelineCollector extends CompositeService {
} }
private TimelineWriteResponse writeTimelineEntities( private TimelineWriteResponse writeTimelineEntities(
TimelineEntities entities) throws IOException { TimelineEntities entities, UserGroupInformation callerUgi)
throws IOException {
// Update application metrics for aggregation // Update application metrics for aggregation
updateAggregateStatus(entities, aggregationGroups, updateAggregateStatus(entities, aggregationGroups,
getEntityTypesSkipAggregation()); getEntityTypesSkipAggregation());
final TimelineCollectorContext context = getTimelineEntityContext(); final TimelineCollectorContext context = getTimelineEntityContext();
return writer.write(context.getClusterId(), context.getUserId(), return writer.write(context, entities, callerUgi);
context.getFlowName(), context.getFlowVersion(),
context.getFlowRunId(), context.getAppId(), entities);
} }
/** /**
@ -187,7 +186,7 @@ public abstract class TimelineCollector extends CompositeService {
callerUgi + ")"); callerUgi + ")");
} }
writeTimelineEntities(entities); writeTimelineEntities(entities, callerUgi);
} }
/** /**

View File

@ -28,12 +28,14 @@ import java.io.PrintWriter;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -68,10 +70,17 @@ public class FileSystemTimelineWriterImpl extends AbstractService
} }
@Override @Override
public TimelineWriteResponse write(String clusterId, String userId, public TimelineWriteResponse write(TimelineCollectorContext context,
String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities entities, UserGroupInformation callerUgi)
TimelineEntities entities) throws IOException { throws IOException {
TimelineWriteResponse response = new TimelineWriteResponse(); TimelineWriteResponse response = new TimelineWriteResponse();
String clusterId = context.getClusterId();
String userId = context.getUserId();
String flowName = context.getFlowName();
String flowVersion = context.getFlowVersion();
long flowRunId = context.getFlowRunId();
String appId = context.getAppId();
for (TimelineEntity entity : entities.getEntities()) { for (TimelineEntity entity : entities.getEntities()) {
write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
response); response);

View File

@ -21,10 +21,12 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
/** /**
* This interface is for storing application timeline information. * This interface is for storing application timeline information.
@ -34,25 +36,19 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
public interface TimelineWriter extends Service { public interface TimelineWriter extends Service {
/** /**
* Stores the entire information in {@link TimelineEntities} to the * Stores the entire information in {@link TimelineEntities} to the timeline
* timeline store. Any errors occurring for individual write request objects * store. Any errors occurring for individual write request objects will be
* will be reported in the response. * reported in the response.
* *
* @param clusterId context cluster ID * @param context a {@link TimelineCollectorContext}
* @param userId context user ID * @param data a {@link TimelineEntities} object.
* @param flowName context flow name * @param callerUgi {@link UserGroupInformation}.
* @param flowVersion context flow version
* @param flowRunId run id for the flow.
* @param appId context app ID.
* @param data
* a {@link TimelineEntities} object.
* @return a {@link TimelineWriteResponse} object. * @return a {@link TimelineWriteResponse} object.
* @throws IOException if there is any exception encountered while storing * @throws IOException if there is any exception encountered while storing or
* or writing entities to the backend storage. * writing entities to the back end storage.
*/ */
TimelineWriteResponse write(String clusterId, String userId, TimelineWriteResponse write(TimelineCollectorContext context,
String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities data, UserGroupInformation callerUgi) throws IOException;
TimelineEntities data) throws IOException;
/** /**
* Aggregates the entity information to the timeline store based on which * Aggregates the entity information to the timeline store based on which

View File

@ -41,8 +41,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -156,9 +154,8 @@ public class TestTimelineCollector {
collector.putEntities( collector.putEntities(
entities, UserGroupInformation.createRemoteUser("test-user")); entities, UserGroupInformation.createRemoteUser("test-user"));
verify(writer, times(1)).write( verify(writer, times(1)).write(any(TimelineCollectorContext.class),
anyString(), anyString(), anyString(), anyString(), anyLong(), any(TimelineEntities.class), any(UserGroupInformation.class));
anyString(), any(TimelineEntities.class));
verify(writer, times(1)).flush(); verify(writer, times(1)).flush();
} }
@ -175,9 +172,8 @@ public class TestTimelineCollector {
collector.putEntitiesAsync( collector.putEntitiesAsync(
entities, UserGroupInformation.createRemoteUser("test-user")); entities, UserGroupInformation.createRemoteUser("test-user"));
verify(writer, times(1)).write( verify(writer, times(1)).write(any(TimelineCollectorContext.class),
anyString(), anyString(), anyString(), anyString(), anyLong(), any(TimelineEntities.class), any(UserGroupInformation.class));
anyString(), any(TimelineEntities.class));
verify(writer, never()).flush(); verify(writer, never()).flush();
} }

View File

@ -30,11 +30,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -89,8 +91,10 @@ public class TestFileSystemTimelineWriterImpl {
outputRoot); outputRoot);
fsi.init(conf); fsi.init(conf);
fsi.start(); fsi.start();
fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, fsi.write(
"app_id", te); new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
"flow_version", 12345678L, "app_id"),
te, UserGroupInformation.createRemoteUser("user_id"));
String fileName = fsi.getOutputRoot() + File.separator + "entities" + String fileName = fsi.getOutputRoot() + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" + File.separator + "cluster_id" + File.separator + "user_id" +