YARN-4700. ATS storage has one extra record each time the RM got restarted. (Naganarasimha G R via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-03-04 19:42:22 +05:30 committed by Sangjin Lee
parent 0d02ab8729
commit d56dde490b
7 changed files with 96 additions and 117 deletions

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
@ -53,11 +54,11 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
@ -140,19 +141,22 @@ public TimelineWriteResponse write(String clusterId, String userId,
storeRelations(rowKey, te, isApplication); storeRelations(rowKey, te, isApplication);
if (isApplication) { if (isApplication) {
if (TimelineStorageUtils.isApplicationCreated(te)) { TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te,
ApplicationMetricsConstants.CREATED_EVENT_TYPE);
if (event != null) {
onApplicationCreated(clusterId, userId, flowName, flowVersion, onApplicationCreated(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te); flowRunId, appId, te, event.getTimestamp());
} }
// if it's an application entity, store metrics // if it's an application entity, store metrics
storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
appId, te); appId, te);
// if application has finished, store it's finish time and write final // if application has finished, store it's finish time and write final
// values // values of all metrics
// of all metrics event = TimelineStorageUtils.getApplicationEvent(te,
if (TimelineStorageUtils.isApplicationFinished(te)) { ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
if (event != null) {
onApplicationFinished(clusterId, userId, flowName, flowVersion, onApplicationFinished(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te); flowRunId, appId, te, event.getTimestamp());
} }
} }
} }
@ -161,7 +165,7 @@ public TimelineWriteResponse write(String clusterId, String userId,
private void onApplicationCreated(String clusterId, String userId, private void onApplicationCreated(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId, String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException { TimelineEntity te, long appCreatedTimeStamp) throws IOException {
// store in App to flow table // store in App to flow table
storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te); storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
// store in flow run table // store in flow run table
@ -169,7 +173,7 @@ private void onApplicationCreated(String clusterId, String userId,
flowRunId, appId, te); flowRunId, appId, te);
// store in flow activity table // store in flow activity table
storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te); flowRunId, appId, appCreatedTimeStamp);
} }
/* /*
@ -178,8 +182,9 @@ private void onApplicationCreated(String clusterId, String userId,
*/ */
private void storeInFlowActivityTable(String clusterId, String userId, private void storeInFlowActivityTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId, String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException { long activityTimeStamp) throws IOException {
byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName); byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
userId, flowName);
byte[] qualifier = GenericObjectMapper.write(flowRunId); byte[] qualifier = GenericObjectMapper.write(flowRunId);
FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
null, flowVersion, null, flowVersion,
@ -214,28 +219,28 @@ private void storeInAppToFlowTable(String clusterId, String userId,
*/ */
private void onApplicationFinished(String clusterId, String userId, private void onApplicationFinished(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId, String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException { TimelineEntity te, long appFinishedTimeStamp) throws IOException {
// store in flow run table // store in flow run table
storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId, storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
appId, te); appId, te, appFinishedTimeStamp);
// indicate in the flow activity table that the app has finished // indicate in the flow activity table that the app has finished
storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te); flowRunId, appId, appFinishedTimeStamp);
} }
/* /*
* Update the {@link FlowRunTable} with Application Finished information * Update the {@link FlowRunTable} with Application Finished information
*/ */
private void storeAppFinishedInFlowRunTable(String clusterId, String userId, private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
String flowName, long flowRunId, String appId, TimelineEntity te) String flowName, long flowRunId, String appId, TimelineEntity te,
throws IOException { long appFinishedTimeStamp) throws IOException {
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, byte[] rowKey =
flowRunId); FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID Attribute attributeAppId =
.getAttribute(appId); AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId); appFinishedTimeStamp, attributeAppId);
// store the final value of metrics since application has finished // store the final value of metrics since application has finished
Set<TimelineMetric> metrics = te.getMetrics(); Set<TimelineMetric> metrics = te.getMetrics();

View File

@ -21,9 +21,9 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -307,24 +307,6 @@ public static boolean isApplicationFinished(TimelineEntity te) {
return false; return false;
} }
/**
* get the time at which an app finished.
*
* @param te TimelineEntity object.
* @return true if application has finished else false
*/
public static long getApplicationFinishedTime(TimelineEntity te) {
SortedSet<TimelineEvent> allEvents = te.getEvents();
if ((allEvents != null) && (allEvents.size() > 0)) {
TimelineEvent event = allEvents.last();
if (event.getId().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
return event.getTimestamp();
}
}
return 0L;
}
/** /**
* Checks if the input TimelineEntity object is an ApplicationEntity. * Checks if the input TimelineEntity object is an ApplicationEntity.
* *
@ -336,21 +318,20 @@ public static boolean isApplicationEntity(TimelineEntity te) {
} }
/** /**
* Checks for the APPLICATION_CREATED event.
*
* @param te TimelineEntity object. * @param te TimelineEntity object.
* @return true is application event exists, false otherwise * @param eventId event with this id needs to be fetched
* @return TimelineEvent if TimelineEntity contains the desired event.
*/ */
public static boolean isApplicationCreated(TimelineEntity te) { public static TimelineEvent getApplicationEvent(TimelineEntity te,
String eventId) {
if (isApplicationEntity(te)) { if (isApplicationEntity(te)) {
for (TimelineEvent event : te.getEvents()) { for (TimelineEvent event : te.getEvents()) {
if (event.getId() if (event.getId().equals(eventId)) {
.equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { return event;
return true;
} }
} }
} }
return false; return null;
} }
/** /**

View File

@ -83,36 +83,21 @@ public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
/** /**
* Constructs a row key for the flow activity table as follows: * Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}. * {@code clusterId!dayTimestamp!user!flowName}.
* Will insert into current day's record in the table. Uses current time to
* store top of the day timestamp.
* *
* @param clusterId Cluster Id. * @param clusterId Cluster Id.
* @param userId User Id. * @param eventTs event's TimeStamp.
* @param flowName Flow Name.
* @return byte array with the row key prefix
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowName) {
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
return getRowKey(clusterId, dayTs, userId, flowName);
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}.
*
* @param clusterId Cluster Id.
* @param dayTs Top of the day timestamp.
* @param userId User Id. * @param userId User Id.
* @param flowName Flow Name. * @param flowName Flow Name.
* @return byte array for the row key * @return byte array for the row key
*/ */
public static byte[] getRowKey(String clusterId, long dayTs, String userId, public static byte[] getRowKey(String clusterId, long eventTs, String userId,
String flowName) { String flowName) {
// convert it to Day's time stamp
eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
return Separator.QUALIFIERS.join( return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)),
Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
Bytes.toBytes(Separator.QUALIFIERS.encode(flowName))); Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
} }

View File

@ -62,8 +62,8 @@
import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
@ -128,15 +128,14 @@ private static void loadData() throws Exception {
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
Long expTs = 1436512802000L; event.setTimestamp(cTime);
event.setTimestamp(expTs);
String expKey = "foo_event"; String expKey = "foo_event";
Object expVal = "test"; Object expVal = "test";
event.addInfo(expKey, expVal); event.addInfo(expKey, expVal);
entity.addEvent(event); entity.addEvent(event);
TimelineEvent event11 = new TimelineEvent(); TimelineEvent event11 = new TimelineEvent();
event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
expTs = 1436512802010L; Long expTs = 1425019501000L;
event11.setTimestamp(expTs); event11.setTimestamp(expTs);
entity.addEvent(event11); entity.addEvent(event11);
@ -165,7 +164,7 @@ private static void loadData() throws Exception {
entity1.addMetrics(metrics); entity1.addMetrics(metrics);
TimelineEvent event1 = new TimelineEvent(); TimelineEvent event1 = new TimelineEvent();
event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event1.setTimestamp(expTs); event1.setTimestamp(cTime);
event1.addInfo(expKey, expVal); event1.addInfo(expKey, expVal);
entity1.addEvent(event1); entity1.addEvent(event1);
te1.addEntity(entity1); te1.addEntity(entity1);
@ -182,7 +181,7 @@ private static void loadData() throws Exception {
entity3.setCreatedTime(cTime); entity3.setCreatedTime(cTime);
TimelineEvent event2 = new TimelineEvent(); TimelineEvent event2 = new TimelineEvent();
event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event2.setTimestamp(1436512802037L); event2.setTimestamp(cTime);
event2.addInfo("foo_event", "test"); event2.addInfo("foo_event", "test");
entity3.addEvent(event2); entity3.addEvent(event2);
te3.addEntity(entity3); te3.addEntity(entity3);
@ -196,7 +195,7 @@ private static void loadData() throws Exception {
entity4.setCreatedTime(cTime); entity4.setCreatedTime(cTime);
TimelineEvent event4 = new TimelineEvent(); TimelineEvent event4 = new TimelineEvent();
event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event4.setTimestamp(1436512802037L); event4.setTimestamp(cTime);
event4.addInfo("foo_event", "test"); event4.addInfo("foo_event", "test");
entity4.addEvent(event4); entity4.addEvent(event4);
te4.addEntity(entity4); te4.addEntity(entity4);
@ -785,10 +784,14 @@ public void testGetFlows() throws Exception {
assertNotNull(entities); assertNotNull(entities);
assertEquals(1, entities.size()); assertEquals(1, entities.size());
long firstFlowActivity =
TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs) + "timeline/clusters/cluster1/flows?daterange="
"-" + fmt.format(dayTs + (2*86400000L))); + fmt.format(firstFlowActivity) + "-"
+ fmt.format(dayTs));
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities); assertNotNull(entities);
@ -810,7 +813,7 @@ public void testGetFlows() throws Exception {
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=-" + "timeline/clusters/cluster1/flows?daterange=-" +
fmt.format(dayTs + (2*86400000L))); fmt.format(dayTs));
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities); assertNotNull(entities);
@ -818,7 +821,7 @@ public void testGetFlows() throws Exception {
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=" + "timeline/clusters/cluster1/flows?daterange=" +
fmt.format(dayTs - (2*86400000L)) + "-"); fmt.format(firstFlowActivity) + "-");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities); assertNotNull(entities);

View File

@ -103,8 +103,7 @@ static TimelineEntity getEntity1() {
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
long cTime = 20000000000000L; long cTime = 1425026901000L;
long mTime = 1425026901000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
// add metrics // add metrics
Set<TimelineMetric> metrics = new HashSet<>(); Set<TimelineMetric> metrics = new HashSet<>();
@ -125,8 +124,7 @@ static TimelineEntity getEntity1() {
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
long expTs = 1436512802000L; event.setTimestamp(cTime);
event.setTimestamp(expTs);
String expKey = "foo_event"; String expKey = "foo_event";
Object expVal = "test"; Object expVal = "test";
event.addInfo(expKey, expVal); event.addInfo(expKey, expVal);
@ -134,7 +132,8 @@ static TimelineEntity getEntity1() {
event = new TimelineEvent(); event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
event.setTimestamp(1436512801000L); long expTs = cTime + 21600000;// start time + 6hrs
event.setTimestamp(expTs);
event.addInfo(expKey, expVal); event.addInfo(expKey, expVal);
entity.addEvent(event); entity.addEvent(event);
@ -149,8 +148,7 @@ static TimelineEntity getEntityGreaterStartTime(long startTs) {
entity.setType(type); entity.setType(type);
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
long endTs = 1439379885000L; event.setTimestamp(startTs);
event.setTimestamp(endTs);
String expKey = "foo_event_greater"; String expKey = "foo_event_greater";
String expVal = "test_app_greater"; String expVal = "test_app_greater";
event.addInfo(expKey, expVal); event.addInfo(expKey, expVal);
@ -181,25 +179,23 @@ static TimelineEntity getEntityMinStartTime(long startTs) {
entity.setCreatedTime(startTs); entity.setCreatedTime(startTs);
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(startTs);
entity.addEvent(event); entity.addEvent(event);
return entity; return entity;
} }
static TimelineEntity getFlowApp1() { static TimelineEntity getFlowApp1(long appCreatedTime) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
String id = "flowActivity_test"; String id = "flowActivity_test";
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
long cTime = 1425016501000L; entity.setCreatedTime(appCreatedTime);
entity.setCreatedTime(cTime);
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
long expTs = 1436512802000L; event.setTimestamp(appCreatedTime);
event.setTimestamp(expTs);
String expKey = "foo_event"; String expKey = "foo_event";
Object expVal = "test"; Object expVal = "test";
event.addInfo(expKey, expVal); event.addInfo(expKey, expVal);

View File

@ -112,9 +112,9 @@ public void testWriteFlowRunMinMax() throws Exception {
String flowVersion = "CF7022C10F1354"; String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L; long runid = 1002345678919L;
String appName = "application_100000000000_1111"; String appName = "application_100000000000_1111";
long minStartTs = 10000000000000L; long minStartTs = 1424995200300L;
long greaterStartTs = 30000000000000L; long greaterStartTs = 1424995200300L + 864000L;
long endTs = 1439750690000L; long endTs = 1424995200300L + 86000000L;;
TimelineEntity entityMinStartTime = TestFlowDataGenerator TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime(minStartTs); .getEntityMinStartTime(minStartTs);
@ -155,7 +155,8 @@ public void testWriteFlowRunMinMax() throws Exception {
// check in flow activity table // check in flow activity table
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(TableName
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); byte[] startRow =
FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
Get g = new Get(startRow); Get g = new Get(startRow);
Result r1 = table1.get(g); Result r1 = table1.get(g);
assertNotNull(r1); assertNotNull(r1);
@ -169,8 +170,7 @@ public void testWriteFlowRunMinMax() throws Exception {
assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName()); assertEquals(flow, flowActivityRowKey.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
.currentTimeMillis());
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size()); assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values); checkFlowActivityRunId(runid, flowVersion, values);
@ -216,7 +216,9 @@ public void testWriteFlowActivityOneFlow() throws Exception {
long runid = 1001111178919L; long runid = 1001111178919L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); long appCreatedTime = 1425016501000L;
TimelineEntity entityApp1 =
TestFlowDataGenerator.getFlowApp1(appCreatedTime);
te.addEntity(entityApp1); te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
@ -231,7 +233,8 @@ public void testWriteFlowActivityOneFlow() throws Exception {
hbi.close(); hbi.close();
} }
// check flow activity // check flow activity
checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
appCreatedTime);
// use the reader to verify the data // use the reader to verify the data
HBaseTimelineReaderImpl hbr = null; HBaseTimelineReaderImpl hbr = null;
@ -262,13 +265,16 @@ public void testWriteFlowActivityOneFlow() throws Exception {
} }
private void checkFlowActivityTable(String cluster, String user, String flow, private void checkFlowActivityTable(String cluster, String user, String flow,
String flowVersion, long runid, Configuration c1) throws IOException { String flowVersion, long runid, Configuration c1, long appCreatedTime)
throws IOException {
Scan s = new Scan(); Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); byte[] startRow =
FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
s.setStartRow(startRow); s.setStartRow(startRow);
String clusterStop = cluster + "1"; String clusterStop = cluster + "1";
byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); byte[] stopRow =
FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(TableName
@ -288,8 +294,7 @@ private void checkFlowActivityTable(String cluster, String user, String flow,
assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName()); assertEquals(flow, flowActivityRowKey.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
.currentTimeMillis());
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size()); assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values); checkFlowActivityRunId(runid, flowVersion, values);
@ -319,7 +324,9 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
long runid3 = 3333333333333L; long runid3 = 3333333333333L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); long appCreatedTime = 1425016501000L;
TimelineEntity entityApp1 =
TestFlowDataGenerator.getFlowApp1(appCreatedTime);
te.addEntity(entityApp1); te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
@ -348,7 +355,7 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
} }
// check flow activity // check flow activity
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
runid1, flowVersion2, runid2, flowVersion3, runid3); runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
// use the timeline reader to verify data // use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null; HBaseTimelineReaderImpl hbr = null;
@ -369,8 +376,8 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
assertEquals(cluster, flowActivity.getCluster()); assertEquals(cluster, flowActivity.getCluster());
assertEquals(user, flowActivity.getUser()); assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName()); assertEquals(flow, flowActivity.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System long dayTs =
.currentTimeMillis()); TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivity.getDate().getTime()); assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(3, flowRuns.size()); assertEquals(3, flowRuns.size());
@ -395,14 +402,17 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
private void checkFlowActivityTableSeveralRuns(String cluster, String user, private void checkFlowActivityTableSeveralRuns(String cluster, String user,
String flow, Configuration c1, String flowVersion1, long runid1, String flow, Configuration c1, String flowVersion1, long runid1,
String flowVersion2, long runid2, String flowVersion3, long runid3) String flowVersion2, long runid2, String flowVersion3, long runid3,
long appCreatedTime)
throws IOException { throws IOException {
Scan s = new Scan(); Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); byte[] startRow =
FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
s.setStartRow(startRow); s.setStartRow(startRow);
String clusterStop = cluster + "1"; String clusterStop = cluster + "1";
byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); byte[] stopRow =
FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
s.setStopRow(stopRow); s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName Table table1 = conn.getTable(TableName
@ -419,8 +429,7 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName()); assertEquals(flow, flowActivityRowKey.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
.currentTimeMillis());
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result Map<byte[], byte[]> values = result

View File

@ -52,8 +52,8 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -114,7 +114,7 @@ public void testWriteFlowRunMinMax() throws Exception {
String flowVersion = "CF7022C10F1354"; String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L; long runid = 1002345678919L;
String appName = "application_100000000000_1111"; String appName = "application_100000000000_1111";
long minStartTs = 10000000000000L; long minStartTs = 1425026900000L;
long greaterStartTs = 30000000000000L; long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L; long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator TimelineEntity entityMinStartTime = TestFlowDataGenerator