diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 997b175f2a3..1afe878d5e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; @@ -53,11 +54,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; @@ -140,19 +141,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements storeRelations(rowKey, te, isApplication); if (isApplication) { - if (TimelineStorageUtils.isApplicationCreated(te)) { + TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te, + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + if (event != null) { onApplicationCreated(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); + flowRunId, appId, te, event.getTimestamp()); } // if it's an application entity, store metrics storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, appId, te); // if application has finished, store it's finish time and write final - // values - // of all metrics - if (TimelineStorageUtils.isApplicationFinished(te)) { + // values of all metrics + event = TimelineStorageUtils.getApplicationEvent(te, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + if (event != null) { onApplicationFinished(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); + flowRunId, appId, te, event.getTimestamp()); } } } @@ -161,7 +165,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private void onApplicationCreated(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te) throws IOException { + TimelineEntity te, long appCreatedTimeStamp) throws IOException { // store in App to flow table storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te); // store in flow run table @@ -169,7 +173,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements flowRunId, appId, te); // store in flow activity table storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); + flowRunId, appId, appCreatedTimeStamp); } /* @@ -178,8 +182,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements */ private void storeInFlowActivityTable(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te) throws IOException { - byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName); + long activityTimeStamp) throws IOException { + byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp, + userId, flowName); byte[] qualifier = GenericObjectMapper.write(flowRunId); FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, null, flowVersion, @@ -214,28 +219,28 @@ public class HBaseTimelineWriterImpl extends AbstractService implements */ private void onApplicationFinished(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te) throws IOException { + TimelineEntity te, long appFinishedTimeStamp) throws IOException { // store in flow run table storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId, - appId, te); + appId, te, appFinishedTimeStamp); // indicate in the flow activity table that the app has finished storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); + flowRunId, appId, appFinishedTimeStamp); } /* * Update the {@link FlowRunTable} with Application Finished information */ private void storeAppFinishedInFlowRunTable(String clusterId, String userId, - String flowName, long flowRunId, String appId, TimelineEntity te) - throws IOException { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); - Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID - .getAttribute(appId); + String flowName, long flowRunId, String appId, TimelineEntity te, + long appFinishedTimeStamp) throws IOException { + byte[] rowKey = + FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + Attribute attributeAppId = + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, - TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId); + appFinishedTimeStamp, attributeAppId); // store the final value of metrics since application has finished Set metrics = te.getMetrics(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 2328bba09dd..605dbe7a5a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -21,9 +21,9 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; -import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -307,24 +307,6 @@ public final class TimelineStorageUtils { return false; } - /** - * get the time at which an app finished. - * - * @param te TimelineEntity object. - * @return true if application has finished else false - */ - public static long getApplicationFinishedTime(TimelineEntity te) { - SortedSet allEvents = te.getEvents(); - if ((allEvents != null) && (allEvents.size() > 0)) { - TimelineEvent event = allEvents.last(); - if (event.getId().equals( - ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { - return event.getTimestamp(); - } - } - return 0L; - } - /** * Checks if the input TimelineEntity object is an ApplicationEntity. * @@ -336,21 +318,20 @@ public final class TimelineStorageUtils { } /** - * Checks for the APPLICATION_CREATED event. - * * @param te TimelineEntity object. - * @return true is application event exists, false otherwise + * @param eventId event with this id needs to be fetched + * @return TimelineEvent if TimelineEntity contains the desired event. */ - public static boolean isApplicationCreated(TimelineEntity te) { + public static TimelineEvent getApplicationEvent(TimelineEntity te, + String eventId) { if (isApplicationEntity(te)) { for (TimelineEvent event : te.getEvents()) { - if (event.getId() - .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { - return true; + if (event.getId().equals(eventId)) { + return event; } } } - return false; + return null; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index 80b3287e317..2726ae2f351 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -83,36 +83,21 @@ public class FlowActivityRowKey { /** * Constructs a row key for the flow activity table as follows: * {@code clusterId!dayTimestamp!user!flowName}. - * Will insert into current day's record in the table. Uses current time to - * store top of the day timestamp. * * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @return byte array with the row key prefix - */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName) { - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System - .currentTimeMillis()); - return getRowKey(clusterId, dayTs, userId, flowName); - } - - /** - * Constructs a row key for the flow activity table as follows: - * {@code clusterId!dayTimestamp!user!flowName}. - * - * @param clusterId Cluster Id. - * @param dayTs Top of the day timestamp. + * @param eventTs event's TimeStamp. * @param userId User Id. * @param flowName Flow Name. * @return byte array for the row key */ - public static byte[] getRowKey(String clusterId, long dayTs, String userId, + public static byte[] getRowKey(String clusterId, long eventTs, String userId, String flowName) { + // convert it to Day's time stamp + eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs); + return Separator.QUALIFIERS.join( Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), - Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), + Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)), Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), Bytes.toBytes(Separator.QUALIFIERS.encode(flowName))); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 9eaa3dee017..009b488ed81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -62,8 +62,8 @@ import org.junit.Test; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.GenericType; import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.GenericType; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; @@ -128,15 +128,14 @@ public class TestTimelineReaderWebServicesHBaseStorage { TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - Long expTs = 1436512802000L; - event.setTimestamp(expTs); + event.setTimestamp(cTime); String expKey = "foo_event"; Object expVal = "test"; event.addInfo(expKey, expVal); entity.addEvent(event); TimelineEvent event11 = new TimelineEvent(); event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - expTs = 1436512802010L; + Long expTs = 1425019501000L; event11.setTimestamp(expTs); entity.addEvent(event11); @@ -165,7 +164,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity1.addMetrics(metrics); TimelineEvent event1 = new TimelineEvent(); event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event1.setTimestamp(expTs); + event1.setTimestamp(cTime); event1.addInfo(expKey, expVal); entity1.addEvent(event1); te1.addEntity(entity1); @@ -182,7 +181,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity3.setCreatedTime(cTime); TimelineEvent event2 = new TimelineEvent(); event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event2.setTimestamp(1436512802037L); + event2.setTimestamp(cTime); event2.addInfo("foo_event", "test"); entity3.addEvent(event2); te3.addEntity(entity3); @@ -196,7 +195,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity4.setCreatedTime(cTime); TimelineEvent event4 = new TimelineEvent(); event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event4.setTimestamp(1436512802037L); + event4.setTimestamp(cTime); event4.addInfo("foo_event", "test"); entity4.addEvent(event4); te4.addEntity(entity4); @@ -785,10 +784,14 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertNotNull(entities); assertEquals(1, entities.size()); + long firstFlowActivity = + TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); + DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs) + - "-" + fmt.format(dayTs + (2*86400000L))); + "timeline/clusters/cluster1/flows?daterange=" + + fmt.format(firstFlowActivity) + "-" + + fmt.format(dayTs)); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); assertNotNull(entities); @@ -810,7 +813,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=-" + - fmt.format(dayTs + (2*86400000L))); + fmt.format(dayTs)); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); assertNotNull(entities); @@ -818,7 +821,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=" + - fmt.format(dayTs - (2*86400000L)) + "-"); + fmt.format(firstFlowActivity) + "-"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); assertNotNull(entities); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index a4c06f2efee..d45df57460f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -103,8 +103,7 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - long cTime = 20000000000000L; - long mTime = 1425026901000L; + long cTime = 1425026901000L; entity.setCreatedTime(cTime); // add metrics Set metrics = new HashSet<>(); @@ -125,8 +124,7 @@ class TestFlowDataGenerator { TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - long expTs = 1436512802000L; - event.setTimestamp(expTs); + event.setTimestamp(cTime); String expKey = "foo_event"; Object expVal = "test"; event.addInfo(expKey, expVal); @@ -134,7 +132,8 @@ class TestFlowDataGenerator { event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - event.setTimestamp(1436512801000L); + long expTs = cTime + 21600000;// start time + 6hrs + event.setTimestamp(expTs); event.addInfo(expKey, expVal); entity.addEvent(event); @@ -149,8 +148,7 @@ class TestFlowDataGenerator { entity.setType(type); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - long endTs = 1439379885000L; - event.setTimestamp(endTs); + event.setTimestamp(startTs); String expKey = "foo_event_greater"; String expVal = "test_app_greater"; event.addInfo(expKey, expVal); @@ -181,25 +179,23 @@ class TestFlowDataGenerator { entity.setCreatedTime(startTs); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(System.currentTimeMillis()); + event.setTimestamp(startTs); entity.addEvent(event); return entity; } - static TimelineEntity getFlowApp1() { + static TimelineEntity getFlowApp1(long appCreatedTime) { TimelineEntity entity = new TimelineEntity(); String id = "flowActivity_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - long cTime = 1425016501000L; - entity.setCreatedTime(cTime); + entity.setCreatedTime(appCreatedTime); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - long expTs = 1436512802000L; - event.setTimestamp(expTs); + event.setTimestamp(appCreatedTime); String expKey = "foo_event"; Object expVal = "test"; event.addInfo(expKey, expVal); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 9161902c7a9..6b23b6c8426 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -112,9 +112,9 @@ public class TestHBaseStorageFlowActivity { String flowVersion = "CF7022C10F1354"; long runid = 1002345678919L; String appName = "application_100000000000_1111"; - long minStartTs = 10000000000000L; - long greaterStartTs = 30000000000000L; - long endTs = 1439750690000L; + long minStartTs = 1424995200300L; + long greaterStartTs = 1424995200300L + 864000L; + long endTs = 1424995200300L + 86000000L;; TimelineEntity entityMinStartTime = TestFlowDataGenerator .getEntityMinStartTime(minStartTs); @@ -155,7 +155,8 @@ public class TestHBaseStorageFlowActivity { // check in flow activity table Table table1 = conn.getTable(TableName .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); - byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + byte[] startRow = + FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow); Get g = new Get(startRow); Result r1 = table1.get(g); assertNotNull(r1); @@ -169,8 +170,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System - .currentTimeMillis()); + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -216,7 +216,9 @@ public class TestHBaseStorageFlowActivity { long runid = 1001111178919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); + long appCreatedTime = 1425016501000L; + TimelineEntity entityApp1 = + TestFlowDataGenerator.getFlowApp1(appCreatedTime); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -231,7 +233,8 @@ public class TestHBaseStorageFlowActivity { hbi.close(); } // check flow activity - checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1, + appCreatedTime); // use the reader to verify the data HBaseTimelineReaderImpl hbr = null; @@ -262,13 +265,16 @@ public class TestHBaseStorageFlowActivity { } private void checkFlowActivityTable(String cluster, String user, String flow, - String flowVersion, long runid, Configuration c1) throws IOException { + String flowVersion, long runid, Configuration c1, long appCreatedTime) + throws IOException { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); - byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + byte[] startRow = + FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + byte[] stopRow = + FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -288,8 +294,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System - .currentTimeMillis()); + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -319,7 +324,9 @@ public class TestHBaseStorageFlowActivity { long runid3 = 3333333333333L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); + long appCreatedTime = 1425016501000L; + TimelineEntity entityApp1 = + TestFlowDataGenerator.getFlowApp1(appCreatedTime); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -348,7 +355,7 @@ public class TestHBaseStorageFlowActivity { } // check flow activity checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, - runid1, flowVersion2, runid2, flowVersion3, runid3); + runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime); // use the timeline reader to verify data HBaseTimelineReaderImpl hbr = null; @@ -369,8 +376,8 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivity.getCluster()); assertEquals(user, flowActivity.getUser()); assertEquals(flow, flowActivity.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System - .currentTimeMillis()); + long dayTs = + TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivity.getDate().getTime()); Set flowRuns = flowActivity.getFlowRuns(); assertEquals(3, flowRuns.size()); @@ -395,14 +402,17 @@ public class TestHBaseStorageFlowActivity { private void checkFlowActivityTableSeveralRuns(String cluster, String user, String flow, Configuration c1, String flowVersion1, long runid1, - String flowVersion2, long runid2, String flowVersion3, long runid3) + String flowVersion2, long runid2, String flowVersion3, long runid3, + long appCreatedTime) throws IOException { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); - byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + byte[] startRow = + FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + byte[] stopRow = + FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -419,8 +429,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System - .currentTimeMillis()); + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); Map values = result diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 95047992108..b234bfdd7be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -114,7 +114,7 @@ public class TestHBaseStorageFlowRun { String flowVersion = "CF7022C10F1354"; long runid = 1002345678919L; String appName = "application_100000000000_1111"; - long minStartTs = 10000000000000L; + long minStartTs = 1425026900000L; long greaterStartTs = 30000000000000L; long endTs = 1439750690000L; TimelineEntity entityMinStartTime = TestFlowDataGenerator