diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index d5dace2856..c3d3da8308 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -64,7 +64,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") - .description("The value to use for the platform field in each provenance event.") + .description("The value to use for the platform field in each status record.") .required(true) .expressionLanguageSupported(true) .defaultValue("nifi") @@ -179,7 +179,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa toIndex = Math.min(fromIndex + batchSize, jsonArray.size()); jsonBatch = jsonArray.subList(fromIndex, toIndex); } catch (final IOException e) { - throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e); } } } @@ -343,6 +343,10 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "inputCount", status.getInputCount()); addField(builder, "outputBytes", status.getOutputBytes()); addField(builder, "outputCount", status.getOutputCount()); + addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold()); + addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold()); + addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount()) + || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes()))); arrayBuilder.add(builder.build()); } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 3c737d1c16..443981c330 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -130,6 +130,24 @@ public class TestSiteToSiteStatusReportingTask { assertEquals(pgStatus.getId(), componentId.getString()); } + @Test + public void testConnectionStatus() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonString backpressure = jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled"); + assertEquals("true", backpressure.getString()); + } + @Test public void testComponentNameFilter() throws IOException, InitializationException { final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);