NIFI-3674: Implementing SiteToSiteStatusReportingTask

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Joe Gresock 2017-04-07 15:45:40 +00:00 committed by Bryan Bende
parent 7df5c2dc89
commit ea6320d621
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 763 additions and 1 deletions

View File

@ -0,0 +1,415 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@Tags({"status", "metrics", "history", "site", "site to site"})
@CapabilityDescription("Publishes Status events using the Site To Site protocol. "
+ "The component type and name filter regexes form a union: only components matching both regexes will be reported. "
+ "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.")
public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask {
static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder()
.name("Component Type Filter Regex")
.description("A regex specifying which component types to report. Any component type matching this regex will be included. "
+ "Component types are: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup, Connection, InputPort, OutputPort")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder()
.name("Component Name Filter Regex")
.description("A regex specifying which component names to report. Any component name matching this regex will be included.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue(".*")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
private volatile Pattern componentTypeFilter;
private volatile Pattern componentNameFilter;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
properties.add(COMPONENT_TYPE_FILTER_REGEX);
properties.add(COMPONENT_NAME_FILTER_REGEX);
return properties;
}
@Override
public void onTrigger(final ReportingContext context) {
final boolean isClustered = context.isClustered();
final String nodeId = context.getClusterNodeIdentifier();
if (nodeId == null && isClustered) {
getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+ "Will wait for Node Identifier to be established.");
return;
}
componentTypeFilter = Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).getValue());
componentNameFilter = Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).getValue());
final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
URL url;
try {
url = new URL(nifiUrl);
} catch (final MalformedURLException e1) {
// already validated
throw new AssertionError();
}
final String hostname = url.getHost();
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName,
platform, null, new Date());
final JsonArray jsonArray = arrayBuilder.build();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
int fromIndex = 0;
int toIndex = Math.min(batchSize, jsonArray.size());
List<JsonValue> jsonBatch = jsonArray.subList(fromIndex, toIndex);
while(!jsonBatch.isEmpty()) {
// Send the JSON document for the current batch
try {
long start = System.nanoTime();
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
return;
}
final Map<String, String> attributes = new HashMap<>();
final String transactionId = UUID.randomUUID().toString();
attributes.put("reporting.task.transaction.id", transactionId);
attributes.put("mime.type", "application/json");
JsonArrayBuilder jsonBatchArrayBuilder = factory.createArrayBuilder();
for(JsonValue jsonValue : jsonBatch) {
jsonBatchArrayBuilder.add(jsonValue);
}
final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build();
final byte[] data = jsonBatchArray.toString().getBytes(StandardCharsets.UTF_8);
transaction.send(data, attributes);
transaction.confirm();
transaction.complete();
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Status Records to destination in {} ms; Transaction ID = {}",
new Object[]{jsonArray.size(), transferMillis, transactionId});
fromIndex = toIndex;
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
jsonBatch = jsonArray.subList(fromIndex, toIndex);
} catch (final IOException e) {
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
}
}
}
/**
* Returns true only if the component type matches the component type filter
* and the component name matches the component name filter.
*
* @param componentType
* The component type
* @param componentName
* The component name
* @return Whether the component matches both filters
*/
boolean componentMatchesFilters(final String componentType, final String componentName) {
return componentTypeFilter.matcher(componentType).matches()
&& componentNameFilter.matcher(componentName).matches();
}
/**
* Serialize the ProcessGroupStatus and add it to the JsonArrayBuilder.
* @param arrayBuilder
* The JSON Array builder
* @param factory
* The JSON Builder Factory
* @param status
* The ProcessGroupStatus
* @param df
* A date format
* @param hostname
* The current hostname
* @param applicationName
* The root process group name
* @param platform
* The configured platform
* @param parentId
* The parent's component id
*/
void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
final ProcessGroupStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
addField(builder, "componentId", status.getId());
addField(builder, "bytesRead", status.getBytesRead());
addField(builder, "bytesWritten", status.getBytesWritten());
addField(builder, "bytesReceived", status.getBytesReceived());
addField(builder, "bytesSent", status.getBytesSent());
addField(builder, "bytesTransferred", status.getBytesTransferred());
addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
addField(builder, "flowFilesSent", status.getFlowFilesSent());
addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred());
addField(builder, "inputContentSize", status.getInputContentSize());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputContentSize", status.getOutputContentSize());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "queuedContentSize", status.getQueuedContentSize());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "queuedCount", status.getQueuedCount());
arrayBuilder.add(builder.build());
}
for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
}
for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
}
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
}
for(PortStatus portStatus : status.getInputPortStatus()) {
serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df,
hostname, applicationName, platform, status.getId(), currentDate);
}
for(PortStatus portStatus : status.getOutputPortStatus()) {
serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df,
hostname, applicationName, platform, status.getId(), currentDate);
}
for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
}
}
void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
final String platform, final String parentId, final Date currentDate) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "RemoteProcessGroup";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
addField(builder, "componentId", status.getId());
addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount());
addField(builder, "receivedContentSize", status.getReceivedContentSize());
addField(builder, "receivedCount", status.getReceivedCount());
addField(builder, "sentContentSize", status.getSentContentSize());
addField(builder, "sentCount", status.getSentCount());
addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
arrayBuilder.add(builder.build());
}
}
void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
addField(builder, "componentId", status.getId());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "bytesReceived", status.getBytesReceived());
addField(builder, "bytesSent", status.getBytesSent());
addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
addField(builder, "flowFilesSent", status.getFlowFilesSent());
addField(builder, "inputBytes", status.getInputBytes());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
arrayBuilder.add(builder.build());
}
}
void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Connection";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
addField(builder, "componentId", status.getId());
addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes());
addField(builder, "maxQueuedCount", status.getMaxQueuedCount());
addField(builder, "queuedBytes", status.getQueuedBytes());
addField(builder, "queuedCount", status.getQueuedCount());
addField(builder, "inputBytes", status.getInputBytes());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
arrayBuilder.add(builder.build());
}
}
void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Processor";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName);
addField(builder, "componentId", status.getId());
addField(builder, "processorType", status.getType());
addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration());
addField(builder, "bytesRead", status.getBytesRead());
addField(builder, "bytesWritten", status.getBytesWritten());
addField(builder, "bytesReceived", status.getBytesReceived());
addField(builder, "bytesSent", status.getBytesSent());
addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved());
addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
addField(builder, "flowFilesSent", status.getFlowFilesSent());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "inputBytes", status.getInputBytes());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "invocations", status.getInvocations());
addField(builder, "processingNanos", status.getProcessingNanos());
arrayBuilder.add(builder.build());
}
}
private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
final String applicationName, final String platform, final String parentId, final Date currentDate,
final String componentType, final String componentName) {
addField(builder, "statusId", UUID.randomUUID().toString());
addField(builder, "timestampMillis", currentDate.getTime());
addField(builder, "timestamp", df.format(currentDate));
addField(builder, "actorHostname", hostname);
addField(builder, "componentType", componentType);
addField(builder, "componentName", componentName);
addField(builder, "parentId", parentId);
addField(builder, "platform", platform);
addField(builder, "application", applicationName);
}
private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
if (value != null) {
builder.add(key, value.longValue());
}
}
private static void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
if (value != null) {
builder.add(key, value.intValue());
}
}
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
if (value == null) {
return;
}
builder.add(key, value);
}
}

View File

@ -15,3 +15,4 @@
org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
org.apache.nifi.reporting.SiteToSiteStatusReportingTask

View File

@ -0,0 +1,346 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonReader;
import javax.json.JsonString;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestSiteToSiteStatusReportingTask {
private ReportingContext context;
public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String> customProperties,
ProcessGroupStatus pgStatus) throws InitializationException {
final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask();
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getStateManager())
.thenReturn(new MockStateManager(task));
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(pgStatus);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
return task;
}
@Test
public void testSerializedForm() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(16, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals(pgStatus.getId(), componentId.getString());
}
@Test
public void testComponentTypeFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(ProcessGroup|RootProcessGroup)");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(1, task.dataSent.size()); // Only root pg and 3 child pgs
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals(pgStatus.getId(), componentId.getString());
}
@Test
public void testComponentNameFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(3, task.dataSent.size()); // 3 processors for each of 4 groups
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals("root.1.processor.1", componentId.getString());
}
@Test
public void testComponentNameFilter_nested() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 2, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(10, task.dataSent.size()); // 3 + (3 * 3) + (3 * 3 * 3) = 39, or 10 batches of 4
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals("root.1.1.processor.1", componentId.getString());
}
public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix,
int maxRecursion, int currentDepth) {
Collection<ConnectionStatus> cStatus = new ArrayList<>();
Collection<PortStatus> ipStatus = new ArrayList<>();
Collection<PortStatus> opStatus = new ArrayList<>();
Collection<ProcessorStatus> pStatus = new ArrayList<>();
Collection<RemoteProcessGroupStatus> rpgStatus = new ArrayList<>();
Collection<ProcessGroupStatus> childPgStatus = new ArrayList<>();
if (currentDepth < maxRecursion) {
for(int i = 1; i < 4; i++) {
childPgStatus.add(generateProcessGroupStatus(id + "." + i, namePrefix + "." + i,
maxRecursion, currentDepth + 1));
}
}
for(int i = 1; i < 4; i++) {
pStatus.add(generateProcessorStatus(id + ".processor." + i, namePrefix + ".processor." + i));
}
for(int i = 1; i < 4; i++) {
cStatus.add(generateConnectionStatus(id + ".connection." + i, namePrefix + ".connection." + i));
}
for(int i = 1; i < 4; i++) {
rpgStatus.add(generateRemoteProcessGroupStatus(id + ".rpg." + i, namePrefix + ".rpg." + i));
}
for(int i = 1; i < 4; i++) {
ipStatus.add(generatePortStatus(id + ".ip." + i, namePrefix + ".ip." + i));
}
for(int i = 1; i < 4; i++) {
opStatus.add(generatePortStatus(id + ".op." + i, namePrefix + ".op." + i));
}
ProcessGroupStatus pgStatus = new ProcessGroupStatus();
pgStatus.setId(id);
pgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pgStatus.setInputPortStatus(ipStatus);
pgStatus.setOutputPortStatus(opStatus);
pgStatus.setProcessGroupStatus(childPgStatus);
pgStatus.setRemoteProcessGroupStatus(rpgStatus);
pgStatus.setProcessorStatus(pStatus);
pgStatus.setActiveThreadCount(1);
pgStatus.setBytesRead(2L);
pgStatus.setBytesReceived(3l);
pgStatus.setBytesSent(4l);
pgStatus.setBytesTransferred(5l);
pgStatus.setBytesWritten(6l);
pgStatus.setConnectionStatus(cStatus);
pgStatus.setFlowFilesReceived(7);
pgStatus.setFlowFilesSent(8);
pgStatus.setFlowFilesTransferred(9);
pgStatus.setInputContentSize(10l);
pgStatus.setInputCount(11);
pgStatus.setOutputContentSize(12l);
pgStatus.setOutputCount(13);
pgStatus.setQueuedContentSize(14l);
pgStatus.setQueuedCount(15);
return pgStatus;
}
public static PortStatus generatePortStatus(String id, String namePrefix) {
PortStatus pStatus = new PortStatus();
pStatus.setId(id);
pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pStatus.setActiveThreadCount(0);
pStatus.setBytesReceived(1l);
pStatus.setBytesSent(2l);
pStatus.setFlowFilesReceived(3);
pStatus.setFlowFilesSent(4);
pStatus.setInputBytes(5l);
pStatus.setInputCount(6);
pStatus.setOutputBytes(7l);
pStatus.setOutputCount(8);
return pStatus;
}
public static ProcessorStatus generateProcessorStatus(String id, String namePrefix) {
ProcessorStatus pStatus = new ProcessorStatus();
pStatus.setId(id);
pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pStatus.setActiveThreadCount(0);
pStatus.setAverageLineageDuration(1l);
pStatus.setBytesRead(2l);
pStatus.setBytesReceived(3l);
pStatus.setBytesSent(4l);
pStatus.setBytesWritten(5l);
pStatus.setFlowFilesReceived(6);
pStatus.setFlowFilesRemoved(7);
pStatus.setFlowFilesSent(8);
pStatus.setInputBytes(9l);
pStatus.setInputCount(10);
pStatus.setInvocations(11);
pStatus.setOutputBytes(12l);
pStatus.setOutputCount(13);
pStatus.setProcessingNanos(14l);
pStatus.setType("type");
return pStatus;
}
public static RemoteProcessGroupStatus generateRemoteProcessGroupStatus(String id, String namePrefix) {
RemoteProcessGroupStatus rpgStatus = new RemoteProcessGroupStatus();
rpgStatus.setId(id);
rpgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
rpgStatus.setActiveRemotePortCount(0);
rpgStatus.setActiveThreadCount(1);
rpgStatus.setAverageLineageDuration(2l);
rpgStatus.setInactiveRemotePortCount(3);
rpgStatus.setReceivedContentSize(4l);
rpgStatus.setReceivedCount(5);
rpgStatus.setSentContentSize(6l);
rpgStatus.setSentCount(7);
rpgStatus.setTargetUri("uri");
return rpgStatus;
}
public static ConnectionStatus generateConnectionStatus(String id, String namePrefix) {
ConnectionStatus cStatus = new ConnectionStatus();
cStatus.setId(id);
cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
cStatus.setBackPressureBytesThreshold(0l);
cStatus.setBackPressureObjectThreshold(1l);
cStatus.setInputBytes(2l);
cStatus.setInputCount(3);
cStatus.setMaxQueuedBytes(4l);
cStatus.setMaxQueuedCount(5);
cStatus.setOutputBytes(6);
cStatus.setOutputCount(7);
cStatus.setQueuedBytes(8l);
cStatus.setQueuedCount(9);
return cStatus;
}
public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) {
MockFlowFile mockFlowFile = new MockFlowFile(id);
mockFlowFile.putAttributes(attributes);
return mockFlowFile;
}
private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask {
final List<byte[]> dataSent = new ArrayList<>();
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final byte[] data = invocation.getArgumentAt(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
return client;
}
public List<byte[]> getDataSent() {
return dataSent;
}
}
}