mirror of https://github.com/apache/nifi.git
NIFI-5417: Add missing component status and metrics to S2SStatusReportingTask and PrometheusReportingTask (#3554)
* NIFI-5417: Add missing component status and metrics to S2SReportingTask and PrometheusReportingTask * NIFI-5417: Added executionNode to schema and doc This closes #3554 Signed-off-by: Yolanda M. Davis <yolanda.m.davis@gmail.com>
This commit is contained in:
parent
15b0f1cfc2
commit
0ccc346aec
|
@ -99,6 +99,12 @@ public class PrometheusMetricsUtil {
|
||||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
|
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
|
||||||
.register(NIFI_REGISTRY);
|
.register(NIFI_REGISTRY);
|
||||||
|
|
||||||
|
private static final Gauge AMOUNT_THREADS_TOTAL_TERMINATED = Gauge.build()
|
||||||
|
.name("nifi_amount_threads_terminated")
|
||||||
|
.help("Total number of threads terminated for the component")
|
||||||
|
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
|
||||||
|
.register(NIFI_REGISTRY);
|
||||||
|
|
||||||
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
|
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
|
||||||
.name("nifi_size_content_output_total")
|
.name("nifi_size_content_output_total")
|
||||||
.help("Total size of content output by the component")
|
.help("Total size of content output by the component")
|
||||||
|
@ -272,7 +278,10 @@ public class PrometheusMetricsUtil {
|
||||||
AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
|
AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
|
||||||
.set(status.getQueuedCount());
|
.set(status.getQueuedCount());
|
||||||
|
|
||||||
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount());
|
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId)
|
||||||
|
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
|
||||||
|
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, componentType, componentName, componentId, parentPGId)
|
||||||
|
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
|
||||||
|
|
||||||
// Report metrics for child process groups if specified
|
// Report metrics for child process groups if specified
|
||||||
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
|
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
|
||||||
|
@ -288,6 +297,16 @@ public class PrometheusMetricsUtil {
|
||||||
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
|
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
|
||||||
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
|
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String procComponentType = "Processor";
|
||||||
|
final String procComponentId = processorStatus.getId();
|
||||||
|
final String procComponentName = processorStatus.getName();
|
||||||
|
final String parentId = processorStatus.getGroupId();
|
||||||
|
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
|
||||||
|
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
|
||||||
|
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
|
||||||
|
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
|
||||||
|
|
||||||
}
|
}
|
||||||
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
||||||
final String connComponentId = connectionStatus.getId();
|
final String connComponentId = connectionStatus.getId();
|
||||||
|
|
|
@ -254,7 +254,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "outputCount", status.getOutputCount());
|
addField(builder, "outputCount", status.getOutputCount());
|
||||||
addField(builder, "queuedContentSize", status.getQueuedContentSize());
|
addField(builder, "queuedContentSize", status.getQueuedContentSize());
|
||||||
addField(builder, "activeThreadCount", status.getActiveThreadCount());
|
addField(builder, "activeThreadCount", status.getActiveThreadCount());
|
||||||
|
addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount());
|
||||||
addField(builder, "queuedCount", status.getQueuedCount());
|
addField(builder, "queuedCount", status.getQueuedCount());
|
||||||
|
addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name());
|
||||||
|
|
||||||
arrayBuilder.add(builder.build());
|
arrayBuilder.add(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -305,6 +307,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "sentContentSize", status.getSentContentSize());
|
addField(builder, "sentContentSize", status.getSentContentSize());
|
||||||
addField(builder, "sentCount", status.getSentCount());
|
addField(builder, "sentCount", status.getSentCount());
|
||||||
addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
|
addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
|
||||||
|
addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name());
|
||||||
|
addField(builder, "targetURI", status.getTargetUri());
|
||||||
|
|
||||||
arrayBuilder.add(builder.build());
|
arrayBuilder.add(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -329,6 +333,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "inputCount", status.getInputCount());
|
addField(builder, "inputCount", status.getInputCount());
|
||||||
addField(builder, "outputBytes", status.getOutputBytes());
|
addField(builder, "outputBytes", status.getOutputBytes());
|
||||||
addField(builder, "outputCount", status.getOutputCount());
|
addField(builder, "outputCount", status.getOutputCount());
|
||||||
|
addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name());
|
||||||
|
addField(builder, "transmitting", status.isTransmitting());
|
||||||
|
|
||||||
arrayBuilder.add(builder.build());
|
arrayBuilder.add(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -359,6 +365,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "outputCount", status.getOutputCount());
|
addField(builder, "outputCount", status.getOutputCount());
|
||||||
addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
|
addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
|
||||||
addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
|
addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
|
||||||
|
addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold());
|
||||||
addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
|
addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
|
||||||
|| (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
|
|| (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
|
||||||
|
|
||||||
|
@ -390,8 +397,12 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "outputCount", status.getOutputCount());
|
addField(builder, "outputCount", status.getOutputCount());
|
||||||
addField(builder, "outputBytes", status.getOutputBytes());
|
addField(builder, "outputBytes", status.getOutputBytes());
|
||||||
addField(builder, "activeThreadCount", status.getActiveThreadCount());
|
addField(builder, "activeThreadCount", status.getActiveThreadCount());
|
||||||
|
addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount());
|
||||||
addField(builder, "invocations", status.getInvocations());
|
addField(builder, "invocations", status.getInvocations());
|
||||||
addField(builder, "processingNanos", status.getProcessingNanos());
|
addField(builder, "processingNanos", status.getProcessingNanos());
|
||||||
|
addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name());
|
||||||
|
addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name());
|
||||||
|
addField(builder, factory, "counters", status.getCounters());
|
||||||
|
|
||||||
arrayBuilder.add(builder.build());
|
arrayBuilder.add(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -411,4 +422,28 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "application", applicationName);
|
addField(builder, "application", applicationName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addField(final JsonObjectBuilder builder, final String key, final Boolean value) {
|
||||||
|
if (value == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.add(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, Long> values) {
|
||||||
|
if (values == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
|
||||||
|
for (final Map.Entry<String, Long> entry : values.entrySet()) {
|
||||||
|
if (entry.getKey() == null || entry.getValue() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
mapBuilder.add(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.add(key, mapBuilder);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,10 @@
|
||||||
// PG + Processors
|
// PG + Processors
|
||||||
{ "name" : "bytesRead", "type" : ["long", "null"]},
|
{ "name" : "bytesRead", "type" : ["long", "null"]},
|
||||||
{ "name" : "bytesWritten", "type" : ["long", "null"]},
|
{ "name" : "bytesWritten", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "terminatedThreadCount", "type" : ["long", "null"]},
|
||||||
|
|
||||||
|
// Processors + Ports
|
||||||
|
{ "name" : "runStatus", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for process group status
|
// fields for process group status
|
||||||
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
|
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
|
||||||
|
@ -79,6 +83,7 @@
|
||||||
{ "name" : "inputContentSize", "type" : ["long", "null"]},
|
{ "name" : "inputContentSize", "type" : ["long", "null"]},
|
||||||
{ "name" : "outputContentSize", "type" : ["long", "null"]},
|
{ "name" : "outputContentSize", "type" : ["long", "null"]},
|
||||||
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
|
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "versionedFlowStatus", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for remote process groups
|
// fields for remote process groups
|
||||||
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
|
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
|
||||||
|
@ -88,12 +93,15 @@
|
||||||
{ "name" : "sentContentSize", "type" : ["long", "null"]},
|
{ "name" : "sentContentSize", "type" : ["long", "null"]},
|
||||||
{ "name" : "sentCount", "type" : ["long", "null"]},
|
{ "name" : "sentCount", "type" : ["long", "null"]},
|
||||||
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
|
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "transmissionStatus", "type" : ["string", "null"]},
|
||||||
|
{ "name" : "targetURI", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for input/output ports + connections + PG
|
// fields for input/output ports + connections + PG
|
||||||
{ "name" : "inputBytes", "type" : ["long", "null"]},
|
{ "name" : "inputBytes", "type" : ["long", "null"]},
|
||||||
{ "name" : "inputCount", "type" : ["long", "null"]},
|
{ "name" : "inputCount", "type" : ["long", "null"]},
|
||||||
{ "name" : "outputBytes", "type" : ["long", "null"]},
|
{ "name" : "outputBytes", "type" : ["long", "null"]},
|
||||||
{ "name" : "outputCount", "type" : ["long", "null"]},
|
{ "name" : "outputCount", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "transmitting", "type" : ["boolean", "null"]},
|
||||||
|
|
||||||
// fields for connections
|
// fields for connections
|
||||||
{ "name" : "sourceId", "type" : ["string", "null"]},
|
{ "name" : "sourceId", "type" : ["string", "null"]},
|
||||||
|
@ -105,6 +113,7 @@
|
||||||
{ "name" : "queuedBytes", "type" : ["long", "null"]},
|
{ "name" : "queuedBytes", "type" : ["long", "null"]},
|
||||||
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
|
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
|
||||||
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
|
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
|
||||||
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
|
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for processors
|
// fields for processors
|
||||||
|
@ -112,7 +121,9 @@
|
||||||
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
|
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
|
||||||
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
|
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
|
||||||
{ "name" : "invocations", "type" : ["long", "null"]},
|
{ "name" : "invocations", "type" : ["long", "null"]},
|
||||||
{ "name" : "processingNanos", "type" : ["long", "null"]}
|
{ "name" : "processingNanos", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "executionNode", "type" : ["string", "null"]},
|
||||||
|
{ "name" : "counters", "type": { "type": "map", "values": "string" }}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
</code>
|
</code>
|
||||||
|
|
|
@ -33,6 +33,10 @@
|
||||||
// PG + Processors
|
// PG + Processors
|
||||||
{ "name" : "bytesRead", "type" : ["long", "null"]},
|
{ "name" : "bytesRead", "type" : ["long", "null"]},
|
||||||
{ "name" : "bytesWritten", "type" : ["long", "null"]},
|
{ "name" : "bytesWritten", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "terminatedThreadCount", "type" : ["long", "null"]},
|
||||||
|
|
||||||
|
// Processors + Ports
|
||||||
|
{ "name" : "runStatus", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for process group status
|
// fields for process group status
|
||||||
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
|
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
|
||||||
|
@ -40,6 +44,7 @@
|
||||||
{ "name" : "inputContentSize", "type" : ["long", "null"]},
|
{ "name" : "inputContentSize", "type" : ["long", "null"]},
|
||||||
{ "name" : "outputContentSize", "type" : ["long", "null"]},
|
{ "name" : "outputContentSize", "type" : ["long", "null"]},
|
||||||
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
|
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "versionedFlowStatus", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for remote process groups
|
// fields for remote process groups
|
||||||
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
|
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
|
||||||
|
@ -49,12 +54,15 @@
|
||||||
{ "name" : "sentContentSize", "type" : ["long", "null"]},
|
{ "name" : "sentContentSize", "type" : ["long", "null"]},
|
||||||
{ "name" : "sentCount", "type" : ["long", "null"]},
|
{ "name" : "sentCount", "type" : ["long", "null"]},
|
||||||
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
|
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "transmissionStatus", "type" : ["string", "null"]},
|
||||||
|
{ "name" : "targetURI", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for input/output ports + connections + PG
|
// fields for input/output ports + connections + PG
|
||||||
{ "name" : "inputBytes", "type" : ["long", "null"]},
|
{ "name" : "inputBytes", "type" : ["long", "null"]},
|
||||||
{ "name" : "inputCount", "type" : ["long", "null"]},
|
{ "name" : "inputCount", "type" : ["long", "null"]},
|
||||||
{ "name" : "outputBytes", "type" : ["long", "null"]},
|
{ "name" : "outputBytes", "type" : ["long", "null"]},
|
||||||
{ "name" : "outputCount", "type" : ["long", "null"]},
|
{ "name" : "outputCount", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "transmitting", "type" : ["boolean", "null"]},
|
||||||
|
|
||||||
// fields for connections
|
// fields for connections
|
||||||
{ "name" : "sourceId", "type" : ["string", "null"]},
|
{ "name" : "sourceId", "type" : ["string", "null"]},
|
||||||
|
@ -66,6 +74,7 @@
|
||||||
{ "name" : "queuedBytes", "type" : ["long", "null"]},
|
{ "name" : "queuedBytes", "type" : ["long", "null"]},
|
||||||
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
|
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
|
||||||
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
|
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
|
||||||
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
|
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
|
||||||
|
|
||||||
// fields for processors
|
// fields for processors
|
||||||
|
@ -73,6 +82,8 @@
|
||||||
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
|
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
|
||||||
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
|
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
|
||||||
{ "name" : "invocations", "type" : ["long", "null"]},
|
{ "name" : "invocations", "type" : ["long", "null"]},
|
||||||
{ "name" : "processingNanos", "type" : ["long", "null"]}
|
{ "name" : "processingNanos", "type" : ["long", "null"]},
|
||||||
|
{ "name" : "executionNode", "type" : ["string", "null"]},
|
||||||
|
{ "name" : "counters", "type": { "type": "map", "values": "string" }}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.nifi.reporting;
|
||||||
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -31,6 +33,7 @@ import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import javax.json.Json;
|
import javax.json.Json;
|
||||||
|
import javax.json.JsonNumber;
|
||||||
import javax.json.JsonObject;
|
import javax.json.JsonObject;
|
||||||
import javax.json.JsonReader;
|
import javax.json.JsonReader;
|
||||||
import javax.json.JsonString;
|
import javax.json.JsonString;
|
||||||
|
@ -42,8 +45,11 @@ import org.apache.nifi.controller.status.PortStatus;
|
||||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||||
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
|
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
|
||||||
|
import org.apache.nifi.controller.status.RunStatus;
|
||||||
|
import org.apache.nifi.controller.status.TransmissionStatus;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.registry.flow.VersionedFlowState;
|
||||||
import org.apache.nifi.remote.Transaction;
|
import org.apache.nifi.remote.Transaction;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
|
@ -108,8 +114,13 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
assertEquals(16, task.dataSent.size());
|
assertEquals(16, task.dataSent.size());
|
||||||
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
|
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
|
||||||
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
|
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
|
||||||
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
|
JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
|
||||||
|
JsonString componentId = firstElement.getJsonString("componentId");
|
||||||
assertEquals(pgStatus.getId(), componentId.getString());
|
assertEquals(pgStatus.getId(), componentId.getString());
|
||||||
|
JsonNumber terminatedThreads = firstElement.getJsonNumber("terminatedThreadCount");
|
||||||
|
assertEquals(1, terminatedThreads.longValue());
|
||||||
|
JsonString versionedFlowState = firstElement.getJsonString("versionedFlowState");
|
||||||
|
assertEquals("UP_TO_DATE", versionedFlowState.getString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -150,6 +161,10 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
JsonString source = object.getJsonString("sourceName");
|
JsonString source = object.getJsonString("sourceName");
|
||||||
assertEquals("true", backpressure.getString());
|
assertEquals("true", backpressure.getString());
|
||||||
assertEquals("source", source.getString());
|
assertEquals("source", source.getString());
|
||||||
|
JsonString dataSizeThreshold = object.getJsonString("backPressureDataSizeThreshold");
|
||||||
|
JsonNumber bytesThreshold = object.getJsonNumber("backPressureBytesThreshold");
|
||||||
|
assertEquals("1 KB", dataSizeThreshold.getString());
|
||||||
|
assertEquals(1024, bytesThreshold.intValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -190,6 +205,80 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
assertEquals("root.1.1.processor.1", componentId.getString());
|
assertEquals("root.1.1.processor.1", componentId.getString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPortStatus() throws IOException, InitializationException {
|
||||||
|
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
|
||||||
|
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)");
|
||||||
|
|
||||||
|
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
|
||||||
|
task.onTrigger(context);
|
||||||
|
|
||||||
|
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
|
||||||
|
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
|
||||||
|
JsonObject object = jsonReader.readArray().getJsonObject(0);
|
||||||
|
JsonString runStatus = object.getJsonString("runStatus");
|
||||||
|
assertEquals(RunStatus.Stopped.name(), runStatus.getString());
|
||||||
|
boolean isTransmitting = object.getBoolean("transmitting");
|
||||||
|
assertFalse(isTransmitting);
|
||||||
|
JsonNumber inputBytes = object.getJsonNumber("inputBytes");
|
||||||
|
assertEquals(5, inputBytes.intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoteProcessGroupStatus() throws IOException, InitializationException {
|
||||||
|
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
|
||||||
|
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)");
|
||||||
|
|
||||||
|
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
|
||||||
|
task.onTrigger(context);
|
||||||
|
|
||||||
|
assertEquals(3, task.dataSent.size());
|
||||||
|
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
|
||||||
|
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
|
||||||
|
JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
|
||||||
|
JsonNumber activeThreadCount = firstElement.getJsonNumber("activeThreadCount");
|
||||||
|
assertEquals(1L, activeThreadCount.longValue());
|
||||||
|
JsonString transmissionStatus = firstElement.getJsonString("transmissionStatus");
|
||||||
|
assertEquals("Transmitting", transmissionStatus.getString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcessorStatus() throws IOException, InitializationException {
|
||||||
|
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
|
||||||
|
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
|
||||||
|
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)");
|
||||||
|
|
||||||
|
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
|
||||||
|
task.onTrigger(context);
|
||||||
|
|
||||||
|
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
|
||||||
|
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
|
||||||
|
JsonObject object = jsonReader.readArray().getJsonObject(0);
|
||||||
|
JsonString runStatus = object.getJsonString("runStatus");
|
||||||
|
assertEquals(RunStatus.Running.name(), runStatus.getString());
|
||||||
|
JsonNumber inputBytes = object.getJsonNumber("inputBytes");
|
||||||
|
assertEquals(9, inputBytes.intValue());
|
||||||
|
JsonObject counterMap = object.getJsonObject("counters");
|
||||||
|
assertNotNull(counterMap);
|
||||||
|
assertEquals(10, counterMap.getInt("counter1"));
|
||||||
|
assertEquals(5, counterMap.getInt("counter2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/***********************************
|
||||||
|
* Test component generator methods
|
||||||
|
***********************************/
|
||||||
|
|
||||||
public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix,
|
public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix,
|
||||||
int maxRecursion, int currentDepth) {
|
int maxRecursion, int currentDepth) {
|
||||||
Collection<ConnectionStatus> cStatus = new ArrayList<>();
|
Collection<ConnectionStatus> cStatus = new ArrayList<>();
|
||||||
|
@ -229,6 +318,7 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
pgStatus.setProcessGroupStatus(childPgStatus);
|
pgStatus.setProcessGroupStatus(childPgStatus);
|
||||||
pgStatus.setRemoteProcessGroupStatus(rpgStatus);
|
pgStatus.setRemoteProcessGroupStatus(rpgStatus);
|
||||||
pgStatus.setProcessorStatus(pStatus);
|
pgStatus.setProcessorStatus(pStatus);
|
||||||
|
pgStatus.setVersionedFlowState(VersionedFlowState.UP_TO_DATE);
|
||||||
|
|
||||||
pgStatus.setActiveThreadCount(1);
|
pgStatus.setActiveThreadCount(1);
|
||||||
pgStatus.setBytesRead(2L);
|
pgStatus.setBytesRead(2L);
|
||||||
|
@ -246,6 +336,7 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
pgStatus.setOutputCount(13);
|
pgStatus.setOutputCount(13);
|
||||||
pgStatus.setQueuedContentSize(14l);
|
pgStatus.setQueuedContentSize(14l);
|
||||||
pgStatus.setQueuedCount(15);
|
pgStatus.setQueuedCount(15);
|
||||||
|
pgStatus.setTerminatedThreadCount(1);
|
||||||
|
|
||||||
return pgStatus;
|
return pgStatus;
|
||||||
}
|
}
|
||||||
|
@ -263,6 +354,8 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
pStatus.setInputCount(6);
|
pStatus.setInputCount(6);
|
||||||
pStatus.setOutputBytes(7l);
|
pStatus.setOutputBytes(7l);
|
||||||
pStatus.setOutputCount(8);
|
pStatus.setOutputCount(8);
|
||||||
|
pStatus.setRunStatus(RunStatus.Stopped);
|
||||||
|
pStatus.setTransmitting(false);
|
||||||
|
|
||||||
return pStatus;
|
return pStatus;
|
||||||
}
|
}
|
||||||
|
@ -287,6 +380,12 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
pStatus.setOutputCount(13);
|
pStatus.setOutputCount(13);
|
||||||
pStatus.setProcessingNanos(14l);
|
pStatus.setProcessingNanos(14l);
|
||||||
pStatus.setType("type");
|
pStatus.setType("type");
|
||||||
|
pStatus.setTerminatedThreadCount(1);
|
||||||
|
pStatus.setRunStatus(RunStatus.Running);
|
||||||
|
pStatus.setCounters(new HashMap<String, Long>() {{
|
||||||
|
put("counter1", 10L);
|
||||||
|
put("counter2", 5L);
|
||||||
|
}});
|
||||||
|
|
||||||
return pStatus;
|
return pStatus;
|
||||||
}
|
}
|
||||||
|
@ -304,6 +403,7 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
rpgStatus.setSentContentSize(6l);
|
rpgStatus.setSentContentSize(6l);
|
||||||
rpgStatus.setSentCount(7);
|
rpgStatus.setSentCount(7);
|
||||||
rpgStatus.setTargetUri("uri");
|
rpgStatus.setTargetUri("uri");
|
||||||
|
rpgStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
|
||||||
|
|
||||||
return rpgStatus;
|
return rpgStatus;
|
||||||
}
|
}
|
||||||
|
@ -312,7 +412,7 @@ public class TestSiteToSiteStatusReportingTask {
|
||||||
ConnectionStatus cStatus = new ConnectionStatus();
|
ConnectionStatus cStatus = new ConnectionStatus();
|
||||||
cStatus.setId(id);
|
cStatus.setId(id);
|
||||||
cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
|
cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
|
||||||
cStatus.setBackPressureBytesThreshold(0l);
|
cStatus.setBackPressureDataSizeThreshold("1 KB"); // sets backPressureBytesThreshold too
|
||||||
cStatus.setBackPressureObjectThreshold(1l);
|
cStatus.setBackPressureObjectThreshold(1l);
|
||||||
cStatus.setInputBytes(2l);
|
cStatus.setInputBytes(2l);
|
||||||
cStatus.setInputCount(3);
|
cStatus.setInputCount(3);
|
||||||
|
|
Loading…
Reference in New Issue