NIFI-6717 - Add Job ID in attributes for PutBigQueryBatch

This closes #3768

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2019-09-26 07:28:32 -05:00 committed by exceptionfactory
parent d97bec63ed
commit d70359d3e8
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 12 additions and 0 deletions

View File

@ -123,6 +123,9 @@ public class BigQueryAttributes {
public static final String JOB_LINK_ATTR = "bq.job.link";
public static final String JOB_LINK_DESC = "API Link to load job";
public static final String JOB_ID_ATTR = "bq.job.id";
public static final String JOB_ID_DESC = "ID of the BigQuery job";
public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";

View File

@ -73,6 +73,7 @@ import com.google.common.collect.ImmutableList;
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, description = BigQueryAttributes.JOB_ID_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@ -323,6 +324,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
attributes.put(BigQueryAttributes.JOB_ID_ATTR, job.getJobId().getJob());
boolean jobError = (job.getStatus().getError() != null);

View File

@ -80,6 +80,7 @@ public class PutBigQueryBatchIT extends AbstractBigQueryIT {
validateNoServiceExceptionAttribute(flowFile);
}
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(BigQueryAttributes.JOB_ID_ATTR);
}
@Test

View File

@ -21,6 +21,7 @@ import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
@ -58,6 +59,9 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
@Mock
Job job;
@Mock
JobId jobId;
@Mock
JobStatus jobStatus;
@ -113,6 +117,8 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
when(stats.getCreationTime()).thenReturn(0L);
when(stats.getStartTime()).thenReturn(1L);
when(stats.getEndTime()).thenReturn(2L);
when(job.getJobId()).thenReturn(jobId);
when(jobId.getJob()).thenReturn("job-id");
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);