NIFI-3791 - added back pressure data into S2SStatusReportingTask

This closes #1745.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-05-03 22:56:22 +02:00 committed by Koji Kawamura
parent c07850aec3
commit dc5e032368
2 changed files with 24 additions and 2 deletions

View File

@ -64,7 +64,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
.description("The value to use for the platform field in each status record.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
@ -179,7 +179,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
jsonBatch = jsonArray.subList(fromIndex, toIndex);
} catch (final IOException e) {
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e);
}
}
}
@ -343,6 +343,10 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
|| (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
arrayBuilder.add(builder.build());
}

View File

@ -130,6 +130,24 @@ public class TestSiteToSiteStatusReportingTask {
assertEquals(pgStatus.getId(), componentId.getString());
}
@Test
public void testConnectionStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString backpressure = jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled");
assertEquals("true", backpressure.getString());
}
@Test
public void testComponentNameFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);