mirror of https://github.com/apache/nifi.git
NIFI-4707: Changed process group parent stack to tree
This commit is contained in:
parent
d65e6b2563
commit
97dc20e2d9
|
@ -24,13 +24,12 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Stack;
|
|
||||||
|
|
||||||
public class ComponentMapHolder {
|
public class ComponentMapHolder {
|
||||||
private static final String REMOTE_INPUT_PORT = "Remote Input Port";
|
private static final String REMOTE_INPUT_PORT = "Remote Input Port";
|
||||||
private static final String REMOTE_OUTPUT_PORT = "Remote Output Port";
|
private static final String REMOTE_OUTPUT_PORT = "Remote Output Port";
|
||||||
private final Map<String,String> componentNameMap = new HashMap<>();
|
private final Map<String,String> componentNameMap = new HashMap<>();
|
||||||
private final Map<String,String> componentToParentGroupMap = new HashMap<>();
|
private final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = new HashMap<>();
|
||||||
private final Map<String,String> sourceToConnectionParentGroupMap = new HashMap<>();
|
private final Map<String,String> sourceToConnectionParentGroupMap = new HashMap<>();
|
||||||
private final Map<String,String> destinationToConnectionParentGroupMap = new HashMap<>();
|
private final Map<String,String> destinationToConnectionParentGroupMap = new HashMap<>();
|
||||||
|
|
||||||
|
@ -46,21 +45,6 @@ public class ComponentMapHolder {
|
||||||
return componentNameMap.get(componentId);
|
return componentNameMap.get(componentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Stack<String> getProcessGroupIdStack(final String startingProcessGroupId) {
|
|
||||||
final Stack<String> stack = new Stack<>();
|
|
||||||
String processGroupId = startingProcessGroupId;
|
|
||||||
stack.push(startingProcessGroupId);
|
|
||||||
while (componentToParentGroupMap.containsKey(processGroupId)) {
|
|
||||||
final String parentGroupId = componentToParentGroupMap.get(processGroupId);
|
|
||||||
if (parentGroupId == null || parentGroupId.isEmpty()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
stack.push(parentGroupId);
|
|
||||||
processGroupId = parentGroupId;
|
|
||||||
}
|
|
||||||
return stack;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getProcessGroupId(final String componentId, final String componentType) {
|
public String getProcessGroupId(final String componentId, final String componentType) {
|
||||||
// Where a Remote Input/Output Port resides is only available at ConnectionStatus.
|
// Where a Remote Input/Output Port resides is only available at ConnectionStatus.
|
||||||
if (REMOTE_INPUT_PORT.equals(componentType)) {
|
if (REMOTE_INPUT_PORT.equals(componentType)) {
|
||||||
|
@ -68,42 +52,53 @@ public class ComponentMapHolder {
|
||||||
} else if (REMOTE_OUTPUT_PORT.equals(componentType)) {
|
} else if (REMOTE_OUTPUT_PORT.equals(componentType)) {
|
||||||
return sourceToConnectionParentGroupMap.get(componentId);
|
return sourceToConnectionParentGroupMap.get(componentId);
|
||||||
}
|
}
|
||||||
|
ParentProcessGroupSearchNode parentNode = componentToParentGroupMap.get(componentId);
|
||||||
|
return parentNode == null ? null : parentNode.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ParentProcessGroupSearchNode getProcessGroupParent(final String componentId) {
|
||||||
return componentToParentGroupMap.get(componentId);
|
return componentToParentGroupMap.get(componentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
|
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) {
|
||||||
final ComponentMapHolder holder = new ComponentMapHolder();
|
final ComponentMapHolder holder = new ComponentMapHolder();
|
||||||
final Map<String,String> componentNameMap = holder.componentNameMap;
|
final Map<String,String> componentNameMap = holder.componentNameMap;
|
||||||
final Map<String,String> componentToParentGroupMap = holder.componentToParentGroupMap;
|
final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = holder.componentToParentGroupMap;
|
||||||
final Map<String,String> sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap;
|
final Map<String,String> sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap;
|
||||||
final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
|
final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
|
||||||
|
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
|
ParentProcessGroupSearchNode parentNode = thisNode;
|
||||||
componentNameMap.put(status.getId(), status.getName());
|
componentNameMap.put(status.getId(), status.getName());
|
||||||
|
// Put a root entry in if one does not yet exist
|
||||||
|
if (parentNode == null) {
|
||||||
|
parentNode = new ParentProcessGroupSearchNode(status.getId(), null);
|
||||||
|
componentToParentGroupMap.put(status.getId(), parentNode);
|
||||||
|
}
|
||||||
|
|
||||||
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
|
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
|
||||||
componentNameMap.put(procStatus.getId(), procStatus.getName());
|
componentNameMap.put(procStatus.getId(), procStatus.getName());
|
||||||
componentToParentGroupMap.put(procStatus.getId(), status.getId());
|
componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final PortStatus portStatus : status.getInputPortStatus()) {
|
for (final PortStatus portStatus : status.getInputPortStatus()) {
|
||||||
componentNameMap.put(portStatus.getId(), portStatus.getName());
|
componentNameMap.put(portStatus.getId(), portStatus.getName());
|
||||||
componentToParentGroupMap.put(portStatus.getId(), status.getId());
|
componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final PortStatus portStatus : status.getOutputPortStatus()) {
|
for (final PortStatus portStatus : status.getOutputPortStatus()) {
|
||||||
componentNameMap.put(portStatus.getId(), portStatus.getName());
|
componentNameMap.put(portStatus.getId(), portStatus.getName());
|
||||||
componentToParentGroupMap.put(portStatus.getId(), status.getId());
|
componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
|
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
|
||||||
componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
|
componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
|
||||||
componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
|
componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
||||||
componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
|
componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
|
||||||
componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
|
componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
|
||||||
// Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
|
// Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
|
||||||
componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
|
componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
|
||||||
componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
|
componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
|
||||||
|
@ -113,8 +108,9 @@ public class ComponentMapHolder {
|
||||||
|
|
||||||
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
|
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
|
||||||
componentNameMap.put(childGroup.getId(), childGroup.getName());
|
componentNameMap.put(childGroup.getId(), childGroup.getName());
|
||||||
componentToParentGroupMap.put(childGroup.getId(), status.getId());
|
ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode);
|
||||||
holder.putAll(createComponentMap(childGroup));
|
componentToParentGroupMap.put(childGroup.getId(), node);
|
||||||
|
holder.putAll(createComponentMap(childGroup, node));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.reporting.util.provenance;
|
||||||
|
|
||||||
|
|
||||||
|
public class ParentProcessGroupSearchNode {
|
||||||
|
|
||||||
|
private final String id;
|
||||||
|
private final ParentProcessGroupSearchNode parent;
|
||||||
|
|
||||||
|
public ParentProcessGroupSearchNode(String id, ParentProcessGroupSearchNode parent) {
|
||||||
|
this.id = id;
|
||||||
|
this.parent = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ParentProcessGroupSearchNode getParent() {
|
||||||
|
return parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
|
@ -122,7 +122,8 @@ public class ProvenanceEventConsumer {
|
||||||
}
|
}
|
||||||
final EventAccess eventAccess = context.getEventAccess();
|
final EventAccess eventAccess = context.getEventAccess();
|
||||||
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
|
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
|
||||||
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
|
final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null);
|
||||||
|
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode);
|
||||||
final StateManager stateManager = context.getStateManager();
|
final StateManager stateManager = context.getStateManager();
|
||||||
|
|
||||||
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
|
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
|
||||||
|
@ -234,17 +235,23 @@ public class ProvenanceEventConsumer {
|
||||||
List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
|
List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
|
||||||
|
|
||||||
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
|
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
|
||||||
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
|
final String componentId = provenanceEventRecord.getComponentId();
|
||||||
|
if (!componentIds.isEmpty() && !componentIds.contains(componentId)) {
|
||||||
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
|
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
|
||||||
// that is being filtered on
|
// that is being filtered on
|
||||||
if (componentMapHolder == null) {
|
if (componentMapHolder == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType());
|
final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType());
|
||||||
if (processGroupId == null || processGroupId.isEmpty()) {
|
if (StringUtils.isEmpty(processGroupId)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid -> componentIds.contains(pgid))) {
|
// Check if any parent process group has the specified component ID
|
||||||
|
ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId);
|
||||||
|
while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) {
|
||||||
|
matchedComponent = matchedComponent.getParent();
|
||||||
|
}
|
||||||
|
if (matchedComponent == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
import org.apache.nifi.components.state.Scope;
|
import org.apache.nifi.components.state.Scope;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventBuilder;
|
import org.apache.nifi.provenance.ProvenanceEventBuilder;
|
||||||
|
@ -55,6 +56,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestSiteToSiteProvenanceReportingTask {
|
public class TestSiteToSiteProvenanceReportingTask {
|
||||||
|
|
||||||
|
@ -65,7 +67,7 @@ public class TestSiteToSiteProvenanceReportingTask {
|
||||||
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
|
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
|
||||||
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
|
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
|
||||||
|
|
||||||
Mockito.when(context.getStateManager())
|
when(context.getStateManager())
|
||||||
.thenReturn(new MockStateManager(task));
|
.thenReturn(new MockStateManager(task));
|
||||||
Mockito.doAnswer(new Answer<PropertyValue>() {
|
Mockito.doAnswer(new Answer<PropertyValue>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -104,6 +106,9 @@ public class TestSiteToSiteProvenanceReportingTask {
|
||||||
return eventsToReturn;
|
return eventsToReturn;
|
||||||
}
|
}
|
||||||
}).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
|
}).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
|
||||||
|
ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
|
||||||
|
processGroupStatus.setId("root");
|
||||||
|
when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus);
|
||||||
|
|
||||||
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
|
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
|
||||||
Mockito.doAnswer(new Answer<Long>() {
|
Mockito.doAnswer(new Answer<Long>() {
|
||||||
|
@ -113,12 +118,12 @@ public class TestSiteToSiteProvenanceReportingTask {
|
||||||
}
|
}
|
||||||
}).when(provenanceRepository).getMaxEventId();
|
}).when(provenanceRepository).getMaxEventId();
|
||||||
|
|
||||||
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
|
when(context.getEventAccess()).thenReturn(eventAccess);
|
||||||
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
|
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
|
||||||
|
|
||||||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||||
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
|
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
|
||||||
Mockito.when(initContext.getLogger()).thenReturn(logger);
|
when(initContext.getLogger()).thenReturn(logger);
|
||||||
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
@ -331,7 +336,7 @@ public class TestSiteToSiteProvenanceReportingTask {
|
||||||
|
|
||||||
// setup the mock EventAccess to return the mock provenance repository
|
// setup the mock EventAccess to return the mock provenance repository
|
||||||
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
|
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
|
||||||
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
|
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
|
||||||
|
|
||||||
task.initialize(initContext);
|
task.initialize(initContext);
|
||||||
|
|
||||||
|
@ -388,7 +393,7 @@ public class TestSiteToSiteProvenanceReportingTask {
|
||||||
}
|
}
|
||||||
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
|
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
|
||||||
|
|
||||||
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
|
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
Assert.fail(e.toString());
|
Assert.fail(e.toString());
|
||||||
|
|
Loading…
Reference in New Issue