diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java index 4a379c08db..babfb54ee6 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -123,6 +123,9 @@ public class BigQueryAttributes { public static final String JOB_LINK_ATTR = "bq.job.link"; public static final String JOB_LINK_DESC = "API Link to load job"; + public static final String JOB_ID_ATTR = "bq.job.id"; + public static final String JOB_ID_DESC = "ID of the BigQuery job"; + public static final String JOB_NB_RECORDS_ATTR = "bq.records.count"; public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted"; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index cd3cb09427..cf4b42a9d2 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -73,6 +73,7 @@ import com.google.common.collect.ImmutableList; @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, description = BigQueryAttributes.JOB_ID_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC), @@ -323,6 +324,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); + attributes.put(BigQueryAttributes.JOB_ID_ATTR, job.getJobId().getJob()); boolean jobError = (job.getStatus().getError() != null); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java index bd56340a49..82e7748d34 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java @@ -80,6 +80,7 @@ public class PutBigQueryBatchIT extends AbstractBigQueryIT { validateNoServiceExceptionAttribute(flowFile); } runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(BigQueryAttributes.JOB_ID_ATTR); } @Test diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java index c4063b325a..5b51e0ab5e 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java @@ -21,6 +21,7 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatus; @@ -58,6 +59,9 @@ public class PutBigQueryBatchTest extends AbstractBQTest { @Mock Job job; + @Mock + JobId jobId; + @Mock JobStatus jobStatus; @@ -113,6 +117,8 @@ public class PutBigQueryBatchTest extends AbstractBQTest { when(stats.getCreationTime()).thenReturn(0L); when(stats.getStartTime()).thenReturn(1L); when(stats.getEndTime()).thenReturn(2L); + when(job.getJobId()).thenReturn(jobId); + when(jobId.getJob()).thenReturn("job-id"); final TestRunner runner = buildNewRunner(getProcessor()); addRequiredPropertiesToRunner(runner);