YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage. Contributed by Zhijie Shen

(cherry picked from commit 68c6232f8423e55b4d152ef3d1d66aeb2d6a555e)
This commit is contained in:
Junping Du 2015-04-09 18:04:27 -07:00 committed by Sangjin Lee
parent 5712b8f9fd
commit 47f35a30bb
26 changed files with 259 additions and 130 deletions

View File

@ -189,8 +189,9 @@ public class Client {
// Timeline domain writer access control
private String modifyACLs = null;
private String flowId = null;
private String flowRunId = null;
private String flowName = null;
private String flowVersion = null;
private long flowRunId = 0L;
// Command line options
private Options opts;
@ -293,9 +294,11 @@ public Client(Configuration conf) throws Exception {
+ "modify the timeline entities in the given domain");
opts.addOption("create", false, "Flag to indicate whether to create the "
+ "domain specified with -domain.");
opts.addOption("flow", true, "ID of the flow which the distributed shell "
opts.addOption("flow_name", true, "Flow name which the distributed shell "
+ "app belongs to");
opts.addOption("flow_run", true, "ID of the flowrun which the distributed "
opts.addOption("flow_version", true, "Flow version which the distributed "
+ "shell app belongs to");
opts.addOption("flow_run_id", true, "Flow run ID which the distributed "
+ "shell app belongs to");
opts.addOption("help", false, "Print usage");
opts.addOption("node_label_expression", true,
@ -486,11 +489,19 @@ public boolean init(String[] args) throws ParseException {
+ cliParser.getOptionValue("container_retry_interval"));
}
if (cliParser.hasOption("flow")) {
flowId = cliParser.getOptionValue("flow");
if (cliParser.hasOption("flow_name")) {
flowName = cliParser.getOptionValue("flow_name");
}
if (cliParser.hasOption("flow_run")) {
flowRunId = cliParser.getOptionValue("flow_run");
if (cliParser.hasOption("flow_version")) {
flowVersion = cliParser.getOptionValue("flow_version");
}
if (cliParser.hasOption("flow_run_id")) {
try {
flowRunId = Long.valueOf(cliParser.getOptionValue("flow_run_id"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Flow run is not a valid long value", e);
}
}
return true;
}
@ -584,10 +595,13 @@ public boolean run() throws IOException, YarnException {
}
Set<String> tags = new HashSet<String>();
if (flowId != null) {
tags.add(TimelineUtils.generateFlowIdTag(flowId));
if (flowName != null) {
tags.add(TimelineUtils.generateFlowNameTag(flowName));
}
if (flowRunId != null) {
if (flowVersion != null) {
tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
}
if (flowRunId != 0) {
tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
}
appContext.setApplicationTags(tags);

View File

@ -332,9 +332,11 @@ public void testDSShell(boolean haveDomain, boolean defaultFlow)
args = mergeArgs(args, timelineArgs);
if (!defaultFlow) {
String[] flowArgs = {
"--flow",
"test_flow_id",
"--flow_run",
"--flow_name",
"test_flow_name",
"--flow_version",
"test_flow_version",
"--flow_run_id",
"12345678"
};
args = mergeArgs(args, flowArgs);
@ -489,7 +491,8 @@ private void checkTimelineV2(
UserGroupInformation.getCurrentUser().getShortUserName() +
(defaultFlow ? "/" +
TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
"/0/" : "/test_flow_id/12345678/") + appId.toString();
"/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
appId.toString();
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
@ -514,8 +517,6 @@ private void checkTimelineV2(
String containerFileName = outputDirContainer + containerTimestampFileName;
File containerFile = new File(containerFileName);
Assert.assertTrue(containerFile.exists());
String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ "_";
// Verify NM posting container metrics info.
String outputDirContainerMetrics = basePath + "/" +

View File

@ -44,7 +44,8 @@
@Evolving
public class TimelineUtils {
public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG";
public static final String FLOW_NAME_TAG_PREFIX = "TIMELINE_FLOW_NAME_TAG";
public static final String FLOW_VERSION_TAG_PREFIX = "TIMELINE_FLOW_VERSION_TAG";
public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG";
private static ObjectMapper mapper;
@ -163,11 +164,36 @@ public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) {
return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
}
public static String generateFlowIdTag(String flowId) {
return FLOW_ID_TAG_PREFIX + ":" + flowId;
/**
* Generate flow name tag
*
* @param flowName flow name that identifies a distinct flow application which
* can be run repeatedly over time
* @return
*/
public static String generateFlowNameTag(String flowName) {
return FLOW_NAME_TAG_PREFIX + ":" + flowName;
}
public static String generateFlowRunIdTag(String flowRunId) {
/**
* Generate flow version tag
*
* @param flowVersion flow version that keeps track of the changes made to the
* flow
* @return
*/
public static String generateFlowVersionTag(String flowVersion) {
return FLOW_VERSION_TAG_PREFIX + ":" + flowVersion;
}
/**
* Generate flow run ID tag
*
* @param flowRunId flow run ID that identifies one instance (or specific
* execution) of that flow
* @return
*/
public static String generateFlowRunIdTag(long flowRunId) {
return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;
}
}

View File

@ -23,11 +23,12 @@
public abstract class GetTimelineCollectorContextResponse {
public static GetTimelineCollectorContextResponse newInstance(
String userId, String flowId, String flowRunId) {
String userId, String flowName, String flowVersion, long flowRunId) {
GetTimelineCollectorContextResponse response =
Records.newRecord(GetTimelineCollectorContextResponse.class);
response.setUserId(userId);
response.setFlowId(flowId);
response.setFlowName(flowName);
response.setFlowVersion(flowVersion);
response.setFlowRunId(flowRunId);
return response;
}
@ -36,11 +37,15 @@ public static GetTimelineCollectorContextResponse newInstance(
public abstract void setUserId(String userId);
public abstract String getFlowId();
public abstract String getFlowName();
public abstract void setFlowId(String flowId);
public abstract void setFlowName(String flowName);
public abstract String getFlowRunId();
public abstract String getFlowVersion();
public abstract void setFlowRunId(String flowRunId);
public abstract void setFlowVersion(String flowVersion);
public abstract long getFlowRunId();
public abstract void setFlowRunId(long flowRunId);
}

View File

@ -102,40 +102,52 @@ public void setUserId(String userId) {
}
@Override
public String getFlowId() {
public String getFlowName() {
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFlowId()) {
if (!p.hasFlowName()) {
return null;
}
return p.getFlowId();
return p.getFlowName();
}
@Override
public void setFlowId(String flowId) {
public void setFlowName(String flowName) {
maybeInitBuilder();
if (flowId == null) {
builder.clearFlowId();
if (flowName == null) {
builder.clearFlowName();
return;
}
builder.setFlowId(flowId);
builder.setFlowName(flowName);
}
@Override
public String getFlowRunId() {
public String getFlowVersion() {
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFlowRunId()) {
if (!p.hasFlowVersion()) {
return null;
}
return p.getFlowVersion();
}
@Override
public void setFlowVersion(String flowVersion) {
maybeInitBuilder();
if (flowVersion == null) {
builder.clearFlowVersion();
return;
}
builder.setFlowVersion(flowVersion);
}
@Override
public long getFlowRunId() {
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getFlowRunId();
}
@Override
public void setFlowRunId(String flowRunId) {
public void setFlowRunId(long flowRunId) {
maybeInitBuilder();
if (flowRunId == null) {
builder.clearFlowRunId();
return;
}
builder.setFlowRunId(flowRunId);
}
}

View File

@ -146,8 +146,9 @@ message GetTimelineCollectorContextRequestProto {
message GetTimelineCollectorContextResponseProto {
optional string user_id = 1;
optional string flow_id = 2;
optional string flow_run_id = 3;
optional string flow_name = 2;
optional string flow_version = 3;
optional int64 flow_run_id = 4;
}
message NMContainerStatusProto {

View File

@ -180,8 +180,9 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
GetTimelineCollectorContextResponse response =
proxy.getTimelineCollectorContext(request);
Assert.assertEquals("test_user_id", response.getUserId());
Assert.assertEquals("test_flow_id", response.getFlowId());
Assert.assertEquals("test_flow_run_id", response.getFlowRunId());
Assert.assertEquals("test_flow_name", response.getFlowName());
Assert.assertEquals("test_flow_version", response.getFlowVersion());
Assert.assertEquals(12345678L, response.getFlowRunId());
} catch (YarnException | IOException e) {
Assert.fail("RPC call failured is not expected here.");
}
@ -392,7 +393,7 @@ public GetTimelineCollectorContextResponse getTimelineCollectorContext(
throws YarnException, IOException {
if (request.getApplicationId().getId() == 1) {
return GetTimelineCollectorContextResponse.newInstance(
"test_user_id", "test_flow_id", "test_flow_run_id");
"test_user_id", "test_flow_name", "test_flow_version", 12345678L);
} else {
throw new YarnException("The application is not found.");
}

View File

@ -130,6 +130,6 @@ public GetTimelineCollectorContextResponse getTimelineCollectorContext(
" doesn't exist on NM.");
}
return GetTimelineCollectorContextResponse.newInstance(
app.getUser(), app.getFlowId(), app.getFlowRunId());
app.getUser(), app.getFlowName(), app.getFlowVersion(), app.getFlowRunId());
}
}

View File

@ -334,8 +334,8 @@ private void recoverApplication(ContainerManagerApplicationProto p)
LOG.info("Recovering application " + appId);
//TODO: Recover flow and flow run ID
ApplicationImpl app = new ApplicationImpl(
dispatcher, p.getUser(), null, null, appId, creds, context, p.getAppLogAggregationInitedTime());
dispatcher, p.getUser(), null, null, 0L, appId, creds, context,
p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@ -954,12 +954,18 @@ protected void startContainerInternal(
try {
if (!isServiceStopped()) {
// Create the application
String flowId = launchContext.getEnvironment().get(
TimelineUtils.FLOW_ID_TAG_PREFIX);
String flowRunId = launchContext.getEnvironment().get(
String flowName = launchContext.getEnvironment().get(
TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment().get(
TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment().get(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
Application application = new ApplicationImpl(
dispatcher, user, flowId, flowRunId, applicationID, credentials, context);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.valueOf(flowRunIdStr);
}
Application application = new ApplicationImpl(dispatcher, user,
flowName, flowVersion, flowRunId, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app "

View File

@ -36,9 +36,11 @@ public interface Application extends EventHandler<ApplicationEvent> {
ApplicationState getApplicationState();
String getFlowId();
String getFlowName();
String getFlowRunId();
String getFlowVersion();
long getFlowRunId();
TimelineClient getTimelineClient();

View File

@ -74,8 +74,9 @@ public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
final String flowId;
final String flowRunId;
final String flowName;
final String flowVersion;
final long flowRunId;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
@ -101,13 +102,14 @@ public class ApplicationImpl implements Application {
private long applicationLogInitedTimestamp = -1;
private final NMStateStoreService appStateStore;
public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
String flowRunId, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
public ApplicationImpl(Dispatcher dispatcher, String user, String flowName,
String flowVersion, long flowRunId, ApplicationId appId,
Credentials credentials, Context context,
long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
this.user = user;
this.flowId = flowId;
this.flowName = flowName;
this.flowVersion = flowVersion;
this.flowRunId = flowRunId;
this.appId = appId;
this.credentials = credentials;
@ -122,9 +124,9 @@ public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
}
public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
String flowRunId, ApplicationId appId, Credentials credentials,
Context context) {
this(dispatcher, user, flowId, flowRunId, appId, credentials,
String flowVersion, long flowRunId, ApplicationId appId,
Credentials credentials, Context context) {
this(dispatcher, user, flowId, flowVersion, flowRunId, appId, credentials,
context, -1);
Configuration conf = context.getConf();
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
@ -591,11 +593,18 @@ public LogAggregationContext getLogAggregationContext() {
}
}
public String getFlowId() {
return flowId;
@Override
public String getFlowName() {
return flowName;
}
public String getFlowRunId() {
@Override
public String getFlowVersion() {
return flowVersion;
}
@Override
public long getFlowRunId() {
return flowRunId;
}
}

View File

@ -551,7 +551,7 @@ private class WrappedApplication {
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(
dispatcher, this.user, null, null, appId, null, context);
dispatcher, this.user, null, null, 0, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
Container container = createMockedContainer(this.appId, i);

View File

@ -40,8 +40,9 @@ public class MockApp implements Application {
Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
ApplicationState appState;
Application app;
String flowId;
String flowRunId;
String flowName;
String flowVersion;
long flowRunId;
TimelineClient timelineClient = null;
public MockApp(int uniqId) {
@ -59,6 +60,14 @@ public MockApp(String user, long clusterTimeStamp, int uniqId) {
appState = ApplicationState.NEW;
}
public MockApp(String user, long clusterTimeStamp, int uniqId,
String flowName, String flowVersion, long flowRunId) {
this(user, clusterTimeStamp, uniqId);
this.flowName = flowName;
this.flowVersion = flowVersion;
this.flowRunId = flowRunId;
}
public void setState(ApplicationState state) {
this.appState = state;
}
@ -81,11 +90,15 @@ public ApplicationState getApplicationState() {
public void handle(ApplicationEvent event) {}
public String getFlowId() {
return flowId;
public String getFlowName() {
return flowName;
}
public String getFlowRunId() {
public String getFlowVersion() {
return flowVersion;
}
public long getFlowRunId() {
return flowRunId;
}

View File

@ -339,7 +339,7 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
null, null, appId, null, nmContext));
null, null, 0, appId, null, nmContext));
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), new Configuration(), "user", appId, 1);

View File

@ -173,6 +173,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
@ -583,6 +584,26 @@ public SubmitApplicationResponse submitApplication(
throw RPCUtil.getRemoteException(ie);
}
// Sanity check for flow run
String value = null;
try {
for (String tag : submissionContext.getApplicationTags()) {
if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
tag.startsWith(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
Long.valueOf(value);
}
}
} catch (NumberFormatException e) {
LOG.warn("Invalid to flow run: " + value +
". Flow run should be a long integer", e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(e);
}
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {

View File

@ -232,22 +232,9 @@ protected void setupTokens(
// Set flow context info
for (String tag :
rmContext.getRMApps().get(applicationId).getApplicationTags()) {
if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX + ":") ||
tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) {
String value = tag.substring(
TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1);
if (!value.isEmpty()) {
environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value);
}
}
if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
String value = tag.substring(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
if (!value.isEmpty()) {
environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value);
}
}
setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
}
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
@ -269,6 +256,17 @@ protected void setupTokens(
container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
}
private static void setFlowTags(
Map<String, String> environment, String tagPrefix, String tag) {
if (tag.startsWith(tagPrefix + ":") ||
tag.startsWith(tagPrefix.toLowerCase() + ":")) {
String value = tag.substring(tagPrefix.length() + 1);
if (!value.isEmpty()) {
environment.put(tagPrefix, value);
}
}
}
@VisibleForTesting
protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
Token<AMRMTokenIdentifier> amrmToken =

View File

@ -97,7 +97,7 @@ protected CollectorNodemanagerProtocol getNMCollectorService() {
mock(CollectorNodemanagerProtocol.class);
try {
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null);
GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
when(protocol.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);
} catch (YarnException | IOException e) {

View File

@ -54,10 +54,12 @@ protected void serviceInit(Configuration conf) throws Exception {
// context info from NM.
// Current user usually is not the app user, but keep this field non-null
context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
// Use app ID to generate a default flow ID for orphan app
context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
// Set the flow run ID to 0 if it's an orphan app
context.setFlowRunId("0");
// Use app ID to generate a default flow name for orphan app
context.setFlowName(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
// Set the flow version to string 1 if it's an orphan app
context.setFlowVersion("1");
// Set the flow run ID to 1 if it's an orphan app
context.setFlowRunId(1L);
context.setAppId(appId.toString());
super.serviceInit(conf);
}

View File

@ -100,8 +100,8 @@ public TimelineWriteResponse putEntities(TimelineEntities entities,
TimelineCollectorContext context = getTimelineEntityContext();
return writer.write(context.getClusterId(), context.getUserId(),
context.getFlowId(), context.getFlowRunId(), context.getAppId(),
entities);
context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
context.getAppId(), entities);
}
/**

View File

@ -22,19 +22,21 @@ public class TimelineCollectorContext {
private String clusterId;
private String userId;
private String flowId;
private String flowRunId;
private String flowName;
private String flowVersion;
private long flowRunId;
private String appId;
public TimelineCollectorContext() {
this(null, null, null, null, null);
this(null, null, null, null, 0L, null);
}
public TimelineCollectorContext(String clusterId, String userId,
String flowId, String flowRunId, String appId) {
String flowName, String flowVersion, long flowRunId, String appId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowId = flowId;
this.flowName = flowName;
this.flowVersion = flowVersion;
this.flowRunId = flowRunId;
this.appId = appId;
}
@ -55,19 +57,27 @@ public void setUserId(String userId) {
this.userId = userId;
}
public String getFlowId() {
return flowId;
public String getFlowName() {
return flowName;
}
public void setFlowId(String flowId) {
this.flowId = flowId;
public void setFlowName(String flowName) {
this.flowName = flowName;
}
public String getFlowRunId() {
public String getFlowVersion() {
return flowVersion;
}
public void setFlowVersion(String flowVersion) {
this.flowVersion = flowVersion;
}
public long getFlowRunId() {
return flowRunId;
}
public void setFlowRunId(String flowRunId) {
public void setFlowRunId(long flowRunId) {
this.flowRunId = flowRunId;
}

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -273,12 +272,16 @@ private void updateTimelineCollectorContext(
if (userId != null && !userId.isEmpty()) {
collector.getTimelineEntityContext().setUserId(userId);
}
String flowId = response.getFlowId();
if (flowId != null && !flowId.isEmpty()) {
collector.getTimelineEntityContext().setFlowId(flowId);
String flowName = response.getFlowName();
if (flowName != null && !flowName.isEmpty()) {
collector.getTimelineEntityContext().setFlowName(flowName);
}
String flowRunId = response.getFlowRunId();
if (flowRunId != null && !flowRunId.isEmpty()) {
String flowVersion = response.getFlowVersion();
if (flowVersion != null && !flowVersion.isEmpty()) {
collector.getTimelineEntityContext().setFlowVersion(flowVersion);
}
long flowRunId = response.getFlowRunId();
if (flowRunId != 0L) {
collector.getTimelineEntityContext().setFlowRunId(flowRunId);
}
}

View File

@ -65,22 +65,23 @@ public class FileSystemTimelineWriterImpl extends AbstractService
@Override
public TimelineWriteResponse write(String clusterId, String userId,
String flowId, String flowRunId, String appId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntities entities) throws IOException {
TimelineWriteResponse response = new TimelineWriteResponse();
for (TimelineEntity entity : entities.getEntities()) {
write(clusterId, userId, flowId, flowRunId, appId, entity, response);
write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
response);
}
return response;
}
private void write(String clusterId, String userId,
String flowId, String flowRunId, String appId, TimelineEntity entity,
private void write(String clusterId, String userId, String flowName,
String flowVersion, long flowRun, String appId, TimelineEntity entity,
TimelineWriteResponse response) throws IOException {
PrintWriter out = null;
try {
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId,
flowRunId, appId, entity.getType());
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName,
flowVersion, String.valueOf(flowRun), appId, entity.getType());
String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));

View File

@ -41,8 +41,9 @@ public interface TimelineWriter extends Service {
*
* @param clusterId context cluster ID
* @param userId context user ID
* @param flowId context flow ID
* @param flowRunId context flow run ID
* @param flowName context flow name
* @param flowVersion context flow version
* @param flowRunId
* @param appId context app ID
* @param data
* a {@link TimelineEntities} object.
@ -50,7 +51,7 @@ public interface TimelineWriter extends Service {
* @throws IOException
*/
TimelineWriteResponse write(String clusterId, String userId,
String flowId, String flowRunId, String appId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntities data) throws IOException;
/**

View File

@ -162,7 +162,7 @@ private TimelineCollectorManager createCollectorManager() {
CollectorNodemanagerProtocol nmCollectorService =
mock(CollectorNodemanagerProtocol.class);
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null);
GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
try {
when(nmCollectorService.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);

View File

@ -1,3 +1,4 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -146,7 +147,7 @@ private TimelineCollectorManager createCollectorManager() {
CollectorNodemanagerProtocol nmCollectorService =
mock(CollectorNodemanagerProtocol.class);
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null);
GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
try {
when(nmCollectorService.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);

View File

@ -57,11 +57,13 @@ public void testWriteEntityToFile() throws Exception {
fsi = new FileSystemTimelineWriterImpl();
fsi.init(new YarnConfiguration());
fsi.start();
fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te);
fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L,
"app_id", te);
String fileName = fsi.getOutputRoot() +
"/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type +
"/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" +
type + "/" + id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = Paths.get(fileName);
File f = new File(fileName);
assertTrue(f.exists() && !f.isDirectory());