NIFI-4707: Fixed ProcessGroup tree

- Removed duplicated creation of a ParentProcessGroupSearchNode for the
root ProcessGroup.
- Removed duplicated creation of a ParentProcessGroupSearchNode for each
component inside a ProcessGroup.
- Fixed ProcessGroup id hierarchy.
- Fixed filtering logic.
- Added unit tests for filtering by ProcessGroupId and Remote
Input/Output ports.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2351
This commit is contained in:
Koji Kawamura 2017-12-25 16:17:27 +09:00 committed by Matthew Burgess
parent 97dc20e2d9
commit 84cecfbeea
3 changed files with 180 additions and 30 deletions

View File

@ -60,7 +60,11 @@ public class ComponentMapHolder {
return componentToParentGroupMap.get(componentId);
}
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) {
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
return createComponentMap(status, new ParentProcessGroupSearchNode(status.getId(), null));
}
private static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisProcessGroupNode) {
final ComponentMapHolder holder = new ComponentMapHolder();
final Map<String,String> componentNameMap = holder.componentNameMap;
final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = holder.componentToParentGroupMap;
@ -68,37 +72,31 @@ public class ComponentMapHolder {
final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
if (status != null) {
ParentProcessGroupSearchNode parentNode = thisNode;
componentNameMap.put(status.getId(), status.getName());
// Put a root entry in if one does not yet exist
if (parentNode == null) {
parentNode = new ParentProcessGroupSearchNode(status.getId(), null);
componentToParentGroupMap.put(status.getId(), parentNode);
}
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
componentNameMap.put(procStatus.getId(), procStatus.getName());
componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
componentToParentGroupMap.put(procStatus.getId(), thisProcessGroupNode);
}
for (final PortStatus portStatus : status.getInputPortStatus()) {
componentNameMap.put(portStatus.getId(), portStatus.getName());
componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode);
}
for (final PortStatus portStatus : status.getOutputPortStatus()) {
componentNameMap.put(portStatus.getId(), portStatus.getName());
componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode);
}
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
componentToParentGroupMap.put(rpgStatus.getId(), thisProcessGroupNode);
}
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
componentToParentGroupMap.put(connectionStatus.getId(), thisProcessGroupNode);
// Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
@ -108,9 +106,9 @@ public class ComponentMapHolder {
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
componentNameMap.put(childGroup.getId(), childGroup.getName());
ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode);
componentToParentGroupMap.put(childGroup.getId(), node);
holder.putAll(createComponentMap(childGroup, node));
ParentProcessGroupSearchNode childProcessGroupNode = new ParentProcessGroupSearchNode(childGroup.getId(), thisProcessGroupNode);
componentToParentGroupMap.put(childGroup.getId(), thisProcessGroupNode);
holder.putAll(createComponentMap(childGroup, childProcessGroupNode));
}
}

View File

@ -122,8 +122,7 @@ public class ProvenanceEventConsumer {
}
final EventAccess eventAccess = context.getEventAccess();
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null);
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode);
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
final StateManager stateManager = context.getStateManager();
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
@ -246,13 +245,15 @@ public class ProvenanceEventConsumer {
if (StringUtils.isEmpty(processGroupId)) {
continue;
}
// Check if any parent process group has the specified component ID
ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId);
while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) {
matchedComponent = matchedComponent.getParent();
}
if (matchedComponent == null) {
continue;
// Check if the process group or any parent process group is specified as a target component ID.
if (!componentIds.contains(processGroupId)) {
ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) {
parentProcessGroup = parentProcessGroup.getParent();
}
if (parentProcessGroup == null) {
continue;
}
}
}
if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {

View File

@ -18,11 +18,15 @@
package org.apache.nifi.reporting;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
@ -49,6 +53,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -65,6 +70,10 @@ public class TestSiteToSiteProvenanceReportingTask {
private final ConfigurationContext confContext = Mockito.mock(ConfigurationContext.class);
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
return setup(event, properties, 2500);
}
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties, long maxEventId) throws IOException {
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
when(context.getStateManager())
@ -85,7 +94,6 @@ public class TestSiteToSiteProvenanceReportingTask {
}
}).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class));
final long maxEventId = 2500;
final AtomicInteger totalEvents = new AtomicInteger(0);
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
@ -106,9 +114,65 @@ public class TestSiteToSiteProvenanceReportingTask {
return eventsToReturn;
}
}).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
processGroupStatus.setId("root");
when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus);
ProcessGroupStatus pgRoot = new ProcessGroupStatus();
pgRoot.setId("root");
when(eventAccess.getControllerStatus()).thenReturn(pgRoot);
// Add child Process Groups.
// Root -> (A, B -> (B2 -> (B3)))
final ProcessGroupStatus pgA = new ProcessGroupStatus();
pgA.setId("pgA");
final ProcessGroupStatus pgB = new ProcessGroupStatus();
pgB.setId("pgB");
final ProcessGroupStatus pgB2 = new ProcessGroupStatus();
pgB2.setId("pgB2");
final ProcessGroupStatus pgB3 = new ProcessGroupStatus();
pgB3.setId("pgB3");
final Collection<ProcessGroupStatus> childPGs = pgRoot.getProcessGroupStatus();
childPGs.add(pgA);
childPGs.add(pgB);
pgB.getProcessGroupStatus().add(pgB2);
pgB2.getProcessGroupStatus().add(pgB3);
// Add Processors.
final ProcessorStatus prcRoot = new ProcessorStatus();
prcRoot.setId("1234");
pgRoot.getProcessorStatus().add(prcRoot);
final ProcessorStatus prcA = new ProcessorStatus();
prcA.setId("A001");
prcA.setName("Processor in PGA");
pgA.getProcessorStatus().add(prcA);
final ProcessorStatus prcB = new ProcessorStatus();
prcB.setId("B001");
prcB.setName("Processor in PGB");
pgB.getProcessorStatus().add(prcB);
final ProcessorStatus prcB2 = new ProcessorStatus();
prcB2.setId("B201");
prcB2.setName("Processor in PGB2");
pgB2.getProcessorStatus().add(prcB2);
final ProcessorStatus prcB3 = new ProcessorStatus();
prcB3.setId("B301");
prcB3.setName("Processor in PGB3");
pgB3.getProcessorStatus().add(prcB3);
// Add connection status to test Remote Input/Output Ports
final ConnectionStatus b2RemoteInputPort = new ConnectionStatus();
b2RemoteInputPort.setGroupId("pgB2");
b2RemoteInputPort.setSourceId("B201");
b2RemoteInputPort.setDestinationId("riB2");
b2RemoteInputPort.setDestinationName("Remote Input Port name");
pgB2.getConnectionStatus().add(b2RemoteInputPort);
final ConnectionStatus b3RemoteOutputPort = new ConnectionStatus();
b3RemoteOutputPort.setGroupId("pgB3");
b3RemoteOutputPort.setSourceId("roB3");
b3RemoteOutputPort.setSourceName("Remote Output Port name");
b3RemoteOutputPort.setDestinationId("B301");
pgB3.getConnectionStatus().add(b3RemoteOutputPort);
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
Mockito.doAnswer(new Answer<Long>() {
@ -307,6 +371,90 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterProcessGroupId() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "pgB2");
// B201 belongs to ProcessGroup B2, so it should be picked.
ProvenanceEventRecord event = createProvenanceEventRecord("B201", "dummy");
MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("B201", reportedEvent.get("componentId").asText());
assertEquals("Processor in PGB2", reportedEvent.get("componentName").asText());
// B301 belongs to PG B3, whose parent is PGB2, so it should be picked, too.
event = createProvenanceEventRecord("B301", "dummy");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("B301", reportedEvent.get("componentId").asText());
assertEquals("Processor in PGB3", reportedEvent.get("componentName").asText());
// A001 belongs to PG A, whose parent is the root PG, so it should be filtered out.
event = createProvenanceEventRecord("A001", "dummy");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testRemotePorts() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "riB2,roB3");
// riB2 is a Remote Input Port in Process Group B2.
ProvenanceEventRecord event = createProvenanceEventRecord("riB2", "Remote Input Port");
MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("riB2", reportedEvent.get("componentId").asText());
assertEquals("Remote Input Port name", reportedEvent.get("componentName").asText());
assertEquals("pgB2", reportedEvent.get("processGroupId").asText());
// roB3 is a Remote Output Port in Process Group B3.
event = createProvenanceEventRecord("roB3", "Remote Output Port");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("roB3", reportedEvent.get("componentId").asText());
assertEquals("Remote Output Port name", reportedEvent.get("componentName").asText());
assertEquals("pgB3", reportedEvent.get("processGroupId").asText());
}
@Test
public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
final long maxEventId = 2500;
@ -353,6 +501,9 @@ public class TestSiteToSiteProvenanceReportingTask {
}
private ProvenanceEventRecord createProvenanceEventRecord() {
return createProvenanceEventRecord("1234", "dummy processor");
}
private ProvenanceEventRecord createProvenanceEventRecord(final String componentId, final String componentType) {
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
@ -369,8 +520,8 @@ public class TestSiteToSiteProvenanceReportingTask {
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, attributes));
builder.setAttributes(prevAttrs, attributes);
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
builder.setComponentId(componentId);
builder.setComponentType(componentType);
return builder.build();
}