From e94f0757db617c649ae054d7c914feade9e1185e Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 9 Jul 2018 11:28:17 +0900 Subject: [PATCH] NIFI-4654: Support reporting RAS S2S lineage to Atlas - Added 's2s.port.id' FlowFile attribute to track target remote Port id - Use 's2s.port.id' to analyze RAW S2S provenance events - This closes #2863 --- .../attributes/SiteToSiteAttributes.java | 4 +- .../provenance/analyzer/NiFiRemotePort.java | 17 +- .../analyzer/NiFiRootGroupPort.java | 9 +- .../atlas/provenance/analyzer/NiFiS2S.java | 35 +- .../additionalDetails.html | 5 +- .../analyzer/TestNiFiRemotePort.java | 85 +- .../analyzer/TestNiFiRootGroupPort.java | 203 +++++ .../reporting/ITReportLineageToAtlas.java | 280 ++++++ .../resources/flow-templates/S2SGetRAW.xml | 443 ++++++++++ .../resources/flow-templates/S2SSendRAW.xml | 822 ++++++++++++++++++ .../nifi/remote/StandardRemoteGroupPort.java | 2 + .../remote/TestStandardRemoteGroupPort.java | 4 + 12 files changed, 1882 insertions(+), 27 deletions(-) create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SSendRAW.xml diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java index ed6437e08c..7af0f0b40c 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java @@ -23,7 +23,9 @@ public enum SiteToSiteAttributes implements FlowFileAttributeKey { S2S_HOST("s2s.host"), - S2S_ADDRESS("s2s.address"); + S2S_ADDRESS("s2s.address"), + + S2S_PORT_ID("s2s.port.id"); private final String key; diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java index e2118f7df9..69b2d47731 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java @@ -20,6 +20,7 @@ import org.apache.atlas.typesystem.Referenceable; import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.slf4j.Logger; @@ -53,14 +54,13 @@ public class NiFiRemotePort extends NiFiS2S { final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port"); final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; - final String remotePortId = event.getComponentId(); - - final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver()); + final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver()); // Find connections that connects to/from the remote port. + final String componentId = event.getComponentId(); final List connections = isRemoteInputPort - ? context.findConnectionTo(remotePortId) - : context.findConnectionFrom(remotePortId); + ? context.findConnectionTo(componentId) + : context.findConnectionFrom(componentId); if (connections == null || connections.isEmpty()) { logger.warn("Connection was not found: {}", new Object[]{event}); return null; @@ -70,7 +70,7 @@ public class NiFiRemotePort extends NiFiS2S { final ConnectionStatus connection = connections.get(0); final Referenceable ref = new Referenceable(type); ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName()); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, s2sUrl.targetPortId)); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, s2SPort.targetPortId)); return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); } @@ -79,4 +79,9 @@ public class NiFiRemotePort extends NiFiS2S { public String targetComponentTypePattern() { return "^Remote (In|Out)put Port$"; } + + @Override + protected String getRawProtocolPortId(ProvenanceEventRecord event) { + return event.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()); + } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java index 4f66025d5f..e791c62c51 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java @@ -54,7 +54,7 @@ public class NiFiRootGroupPort extends NiFiS2S { final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; final String rootPortId = event.getComponentId(); - final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver()); + final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver()); // Find connections connecting to/from the remote port. final List connections = isInputPort @@ -69,7 +69,7 @@ public class NiFiRootGroupPort extends NiFiS2S { final ConnectionStatus connection = connections.get(0); final Referenceable ref = new Referenceable(type); ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName()); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, rootPortId)); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, rootPortId)); return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); } @@ -78,4 +78,9 @@ public class NiFiRootGroupPort extends NiFiS2S { public String targetComponentTypePattern() { return "^(In|Out)put Port$"; } + + @Override + protected String getRawProtocolPortId(ProvenanceEventRecord event) { + return event.getComponentId(); + } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java index 762a1aa817..d205b3e905 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java @@ -18,10 +18,10 @@ package org.apache.nifi.atlas.provenance.analyzer; import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URL; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -29,27 +29,30 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer { private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class); - private static final Pattern RAW_URL_REGEX = Pattern.compile("([0-9a-zA-Z\\-]+)"); + private static final Pattern RAW_URL_REGEX = Pattern.compile("nifi://([^:/]+):\\d+/([0-9a-zA-Z\\-]+)"); private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*"); - protected S2STransitUrl parseTransitURL(String transitUri, ClusterResolver clusterResolver) { - final URL url = parseUrl(transitUri); + protected S2SPort analyzeS2SPort(ProvenanceEventRecord event, ClusterResolver clusterResolver) { + final String transitUri = event.getTransitUri(); + final int protocolIndex = transitUri.indexOf(':'); + final String protocol = transitUri.substring(0, protocolIndex).toLowerCase(); - final String clusterName = clusterResolver.fromHostNames(url.getHost()); + final String targetHostname; final String targetPortId; - final String protocol = url.getProtocol().toLowerCase(); switch (protocol) { case "http": case "https": { - final Matcher uriMatcher = matchUrl(url, HTTP_URL_REGEX); + final Matcher uriMatcher = matchUrl(transitUri, HTTP_URL_REGEX); + targetHostname = parseUri(transitUri).getHost(); targetPortId = uriMatcher.group(2); } break; case "nifi": { - final Matcher uriMatcher = matchUrl(url, RAW_URL_REGEX); - targetPortId = uriMatcher.group(1); + final Matcher uriMatcher = matchUrl(transitUri, RAW_URL_REGEX); + targetHostname = uriMatcher.group(1); + targetPortId = getRawProtocolPortId(event); } break; @@ -58,23 +61,25 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer { } - return new S2STransitUrl(clusterName, targetPortId); - + final String clusterName = clusterResolver.fromHostNames(targetHostname); + return new S2SPort(clusterName, targetPortId); } - private Matcher matchUrl(URL url, Pattern pattern) { - final Matcher uriMatcher = pattern.matcher(url.getPath()); + abstract protected String getRawProtocolPortId(ProvenanceEventRecord event); + + private Matcher matchUrl(String url, Pattern pattern) { + final Matcher uriMatcher = pattern.matcher(url); if (!uriMatcher.matches()) { throw new IllegalArgumentException("Unexpected transit URI: " + url); } return uriMatcher; } - protected static class S2STransitUrl { + protected static class S2SPort { final String clusterName; final String targetPortId; - public S2STransitUrl(String clusterName, String targetPortId) { + public S2SPort(String clusterName, String targetPortId) { this.clusterName = clusterName; this.targetPortId = targetPortId; } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html index cd82079e41..b94a915302 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html @@ -329,7 +329,7 @@ Processor 3 rootGroupPortGUID@clusterName (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1) - NOTE:Only HTTP S2S protocol is supported. RAW support may be added in the future as it needs NiFi code modification. See NIFI-4654 for detail. + @@ -345,6 +345,7 @@ upstream (nifi_flow_path) remoteInputPortGUID@clusterName
(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1)

NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.

+ @@ -364,6 +365,7 @@ upstream (nifi_flow_path) rootGroupPortGUID@clusterName (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1) + @@ -385,6 +387,7 @@ remote target port
  • For 'nifi_queue': downstreamPathGUID@clusterName
    (e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)
  • + NiFiRootGroupPort diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java index 3040d5068e..b0a6654a00 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java @@ -24,6 +24,7 @@ import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas; import org.apache.nifi.atlas.resolver.ClusterResolvers; import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.junit.Test; @@ -49,7 +50,7 @@ import static org.mockito.Mockito.when; public class TestNiFiRemotePort { @Test - public void testRemoteInputPort() { + public void testRemoteInputPortHTTP() { final String componentType = "Remote Input Port"; final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files"; final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class); @@ -89,7 +90,7 @@ public class TestNiFiRemotePort { } @Test - public void testRemoteOutputPort() { + public void testRemoteOutputPortHTTP() { final String componentType = "Remote Output Port"; final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files"; final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); @@ -123,5 +124,85 @@ public class TestNiFiRemotePort { assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } + @Test + public void testRemoteInputPortRAW() { + final String componentType = "Remote Input Port"; + // The UUID in a Transit Uri is a FlowFile UUID + final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82"; + final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class); + when(sendEvent.getEventId()).thenReturn(123L); + // Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client. + when(sendEvent.getComponentId()).thenReturn("s2s-client-component-guid"); + when(sendEvent.getComponentType()).thenReturn(componentType); + when(sendEvent.getTransitUri()).thenReturn(transitUri); + when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); + when(sendEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setDestinationId("s2s-client-component-guid"); + connection.setDestinationName("inputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionTo(matches("s2s-client-component-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, sendEvent); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + assertEquals(1, refs.getComponentIds().size()); + // Should report connected componentId. + assertTrue(refs.getComponentIds().contains("s2s-client-component-guid")); + + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); + assertEquals("inputPortA", ref.get(ATTR_NAME)); + assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testRemoteOutputPortRAW() { + final String componentType = "Remote Output Port"; + // The UUID in a Transit Uri is a FlowFile UUID + final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + // Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client. + when(record.getComponentId()).thenReturn("s2s-client-component-guid"); + when(record.getComponentType()).thenReturn(componentType); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); + when(record.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setSourceId("s2s-client-component-guid"); + connection.setSourceName("outputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionFrom(matches("s2s-client-component-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); + assertEquals("outputPortA", ref.get(ATTR_NAME)); + assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java new file mode 100644 index 0000000000..61d59da65d --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.when; + +/** + * Tests for RootGroupPorts. + * More complex and detailed tests are available at {@link ITReportLineageToAtlas}. + */ +public class TestNiFiRootGroupPort { + + @Test + public void testInputPortHTTP() { + final String componentType = "Input Port"; + final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files"; + final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class); + when(receiveEvent.getEventId()).thenReturn(123L); + when(receiveEvent.getComponentId()).thenReturn("port-guid"); + when(receiveEvent.getComponentType()).thenReturn(componentType); + when(receiveEvent.getTransitUri()).thenReturn(transitUri); + when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setSourceId("port-guid"); + connection.setSourceName("inputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, receiveEvent); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + assertEquals(1, refs.getComponentIds().size()); + // Should report connected componentId. + assertTrue(refs.getComponentIds().contains("port-guid")); + + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); + assertEquals("inputPortA", ref.get(ATTR_NAME)); + assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testRemoteOutputPortHTTP() { + final String componentType = "Output Port"; + final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files"; + final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class); + when(sendEvent.getComponentId()).thenReturn("port-guid"); + when(sendEvent.getComponentType()).thenReturn(componentType); + when(sendEvent.getTransitUri()).thenReturn(transitUri); + when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setDestinationId("port-guid"); + connection.setDestinationName("outputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, sendEvent); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); + assertEquals("outputPortA", ref.get(ATTR_NAME)); + assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testRemoteInputPortRAW() { + final String componentType = "Input Port"; + // The UUID in a Transit Uri is a FlowFile UUID + final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82"; + final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class); + when(receiveEvent.getEventId()).thenReturn(123L); + when(receiveEvent.getComponentId()).thenReturn("port-guid"); + when(receiveEvent.getComponentType()).thenReturn(componentType); + when(receiveEvent.getTransitUri()).thenReturn(transitUri); + when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setSourceId("port-guid"); + connection.setSourceName("inputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, receiveEvent); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + assertEquals(1, refs.getComponentIds().size()); + // Should report connected componentId. + assertTrue(refs.getComponentIds().contains("port-guid")); + + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); + assertEquals("inputPortA", ref.get(ATTR_NAME)); + assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testRemoteOutputPortRAW() { + final String componentType = "Output Port"; + // The UUID in a Transit Uri is a FlowFile UUID + final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c"; + final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class); + when(sendEvent.getComponentId()).thenReturn("port-guid"); + when(sendEvent.getComponentType()).thenReturn(componentType); + when(sendEvent.getTransitUri()).thenReturn(transitUri); + when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setDestinationId("port-guid"); + connection.setDestinationName("outputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, sendEvent); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); + assertEquals("outputPortA", ref.get(ATTR_NAME)); + assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java index b16e088436..370025bd54 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java @@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.provenance.lineage.ComputeLineageResult; @@ -652,6 +653,64 @@ public class ITReportLineageToAtlas { } + /** + * A client NiFi sends FlowFiles to a remote NiFi using RAW protocol. + */ + private void testS2SSendRAW(TestConfiguration tc) throws Exception { + final ProvenanceRecords prs = tc.provenanceRecords; + prs.add(pr("ca71e4d9-2a4f-3970", "Generate A", CREATE)); + prs.add(pr("c439cdca-e989-3491", "Generate C", CREATE)); + prs.add(pr("b775b657-5a5b-3708", "GetTwitter", CREATE)); + + // The remote port GUID is different than the Remote Input Ports. + final SimpleProvenanceRecord sendEvent1 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND, + "nifi://nifi.example.com:8081/d668805a-0ad0-44d1-aa65-ac362bf06e10"); + sendEvent1.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000"); + prs.add(sendEvent1); + + final SimpleProvenanceRecord sendEvent2 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND, + "nifi://nifi.example.com:8081/d4ec2459-d903-4a73-a09e-853f9997d3fb"); + sendEvent2.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000"); + prs.add(sendEvent2); + + prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // C + prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // Twitter + + // Generate C created a FlowFile, then it's sent via S2S + tc.addLineage(createLineage(prs, 1, 3, 5)); + // GetTwitter created a FlowFile, then it's sent via S2S + tc.addLineage(createLineage(prs, 2, 4, 6)); + + test(tc); + + waitNotificationsGetDelivered(); + + final Lineage lineage = getLineage(); + + final Node flow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example"); + final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970"); + final Node pathB = lineage.findNode("nifi_flow_path", "Generate B", "333255b6-eb02-3056"); + final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491"); + final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708"); + final Node pathI = lineage.findNode("nifi_flow_path", "InactiveProcessor", "7033f311-ac68-3cab"); + // UpdateAttribute has multiple incoming paths, so it generates a queue to receive those. + final Node queueU = lineage.findNode("nifi_queue", "queue", "c5392447-e9f1-33ad"); + final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "c5392447-e9f1-33ad"); + + // These are starting paths. + lineage.assertLink(flow, pathA); + lineage.assertLink(flow, pathB); + lineage.assertLink(flow, pathC); + lineage.assertLink(flow, pathT); + lineage.assertLink(flow, pathI); + + // Multiple paths connected to the same path. + lineage.assertLink(pathB, queueU); + lineage.assertLink(pathC, queueU); + lineage.assertLink(queueU, pathU); + + } + @Test public void testS2SSendSimple() throws Exception { final TestConfiguration tc = new TestConfiguration("S2SSend"); @@ -711,6 +770,65 @@ public class ITReportLineageToAtlas { lineage.assertLink(genT, pathT); } + @Test + public void testS2SSendSimpleRAW() throws Exception { + final TestConfiguration tc = new TestConfiguration("S2SSendRAW"); + + testS2SSendRAW(tc); + + final Lineage lineage = getLineage(); + + // The FlowFile created by Generate A has not been finished (by DROP event, but SIMPLE_PATH strategy can report it. + final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970"); + final Node genA = lineage.findNode("nifi_data", "Generate A", "ca71e4d9-2a4f-3970"); + lineage.assertLink(genA, pathA); + + final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491"); + final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708"); + + // Generate C and GetTwitter have reported proper SEND lineage to the input port. + final Node remoteInputPortD = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3"); + final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59"); + final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59"); + lineage.assertLink(pathC, remoteInputPortQ); + lineage.assertLink(pathT, remoteInputPortQ); + lineage.assertLink(remoteInputPortQ, remoteInputPortP); + lineage.assertLink(remoteInputPortP, remoteInputPortD); + + // nifi_data is created for each obscure input processor. + final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491"); + final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708"); + lineage.assertLink(genC, pathC); + lineage.assertLink(genT, pathT); + } + + @Test + public void testS2SSendCompleteRAW() throws Exception { + final TestConfiguration tc = new TestConfiguration("S2SSendRAW"); + tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); + + testS2SSendRAW(tc); + + final Lineage lineage = getLineage(); + + // Complete path has hash. + final Node pathC = lineage.findNode("nifi_flow_path", "Generate C, Remote Input Port", + "c439cdca-e989-3491-0000-000000000000::1605753423@example"); + final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter, Remote Input Port", + "b775b657-5a5b-3708-0000-000000000000::3843156947@example"); + + // Generate C and GetTwitter have reported proper SEND lineage to the input port. + final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3"); + lineage.assertLink(pathC, remoteInputPort); + lineage.assertLink(pathT, remoteInputPort); + + // nifi_data is created for each obscure input processor. + final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491"); + final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708"); + lineage.assertLink(genC, pathC); + lineage.assertLink(genT, pathT); + } + /** * A client NiFi gets FlowFiles from a remote NiFi. */ @@ -754,6 +872,50 @@ public class ITReportLineageToAtlas { } + /** + * A client NiFi gets FlowFiles from a remote NiFi using RAW protocol. + */ + @Test + public void testS2SGetRAW() throws Exception { + final TestConfiguration tc = new TestConfiguration("S2SGetRAW"); + final ProvenanceRecords prs = tc.provenanceRecords; + // The remote port GUID is different than the Remote Output Ports. + final SimpleProvenanceRecord receiveEvent = pr("7375f8f6-4604-468d", "Remote Output Port", RECEIVE, + "nifi://nifi.example.com:8081/7f1a5d65-65bb-4473-b1a4-4a742d9af4a7"); + receiveEvent.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "392e7343-3950-329b-0000-000000000000"); + prs.add(receiveEvent); + + test(tc); + + waitNotificationsGetDelivered(); + + final Lineage lineage = getLineage(); + + final Node flow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example"); + final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b"); + final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac"); + final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac"); + + // These entities should be created by notification. + final Node remoteOutputPortDataSet = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b"); + final Node remoteOutputPortProcess = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d"); + final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b"); + final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac"); + final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac"); + + lineage.assertLink(remoteOutputPortDataSet, remoteOutputPortProcess); + + lineage.assertLink(flow, remoteOutputPortProcess); + lineage.assertLink(remoteOutputPortProcess, queueL); + lineage.assertLink(remoteOutputPortProcess, queueP); + lineage.assertLink(remoteOutputPortProcess, queueU); + + lineage.assertLink(queueL, pathL); + lineage.assertLink(queueP, pathP); + lineage.assertLink(queueU, pathU); + + } + /** * A remote NiFi transfers FlowFiles to remote client NiFis. * This NiFi instance owns RootProcessGroup output port. @@ -810,6 +972,60 @@ public class ITReportLineageToAtlas { lineage.assertLink(inputPort, path); } + /** + * A remote NiFi transfers FlowFiles to remote client NiFis using RAW protocol. + * This NiFi instance owns RootProcessGroup output port. + */ + @Test + public void testS2STransferRAW() throws Exception { + final TestConfiguration tc = new TestConfiguration("S2STransfer"); + + final ProvenanceRecords prs = tc.provenanceRecords; + prs.add(pr("392e7343-3950-329b", "Output Port", SEND, + "nifi://nifi.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82")); + + test(tc); + + waitNotificationsGetDelivered(); + + final Lineage lineage = getLineage(); + + final Node flow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example"); + final Node path = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a"); + final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b"); + + lineage.assertLink(flow, path); + lineage.assertLink(path, outputPort); + } + + /** + * A remote NiFi receives FlowFiles from remote client NiFis using RAW protocol. + * This NiFi instance owns RootProcessGroup input port. + */ + @Test + public void testS2SReceiveRAW() throws Exception { + final TestConfiguration tc = new TestConfiguration("S2SReceive"); + + final ProvenanceRecords prs = tc.provenanceRecords; + prs.add(pr("77919f59-533e-35a3", "Input Port", RECEIVE, + "nifi://nifi.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c")); + + test(tc); + + waitNotificationsGetDelivered(); + + final Lineage lineage = getLineage(); + + final Node flow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example"); + final Node path = lineage.findNode("nifi_flow_path", "input, UpdateAttribute", "77919f59-533e-35a3"); + final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3"); + + lineage.assertLink(flow, path); + lineage.assertLink(flow, inputPort); + + lineage.assertLink(inputPort, path); + } + @Test public void testS2SReceiveAndSendCombination() throws Exception { testS2SReceive(); @@ -874,6 +1090,70 @@ public class ITReportLineageToAtlas { } + @Test + public void testS2SReceiveAndSendCombinationRAW() throws Exception { + testS2SReceiveRAW(); + testS2SSendSimpleRAW(); + + final Lineage lineage = getLineage(); + + final Node remoteFlow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example"); + final Node localFlow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example"); + final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59"); + final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59"); + final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3"); + final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491"); + final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708"); + + // Remote flow owns the inputPort. + lineage.assertLink(remoteFlow, inputPort); + + // These paths within local flow sends data to the remote flow through the remote input port. + lineage.assertLink(localFlow, pathC); + lineage.assertLink(localFlow, pathT); + lineage.assertLink(pathC, remoteInputPortQ); + lineage.assertLink(pathT, remoteInputPortQ); + lineage.assertLink(remoteInputPortQ, remoteInputPortP); + lineage.assertLink(remoteInputPortP, inputPort); + + } + + @Test + public void testS2STransferAndGetCombinationRAW() throws Exception { + testS2STransferRAW(); + testS2SGetRAW(); + + final Lineage lineage = getLineage(); + + final Node remoteFlow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example"); + final Node localFlow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example"); + final Node remoteGen = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a"); + final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b"); + + final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d"); + final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b"); + final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac"); + final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac"); + final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b"); + final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac"); + final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac"); + + // Remote flow owns the outputPort and transfer data generated by GenerateFlowFile. + lineage.assertLink(remoteFlow, remoteGen); + lineage.assertLink(remoteGen, outputPort); + + // The Remote Output Port path in local flow gets data from the remote. + lineage.assertLink(localFlow, remoteOutputPortP); + lineage.assertLink(outputPort, remoteOutputPortP); + lineage.assertLink(remoteOutputPortP, queueL); + lineage.assertLink(remoteOutputPortP, queueP); + lineage.assertLink(remoteOutputPortP, queueU); + lineage.assertLink(queueL, pathL); + lineage.assertLink(queueP, pathP); + lineage.assertLink(queueU, pathU); + + } + /** * A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything. */ diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml new file mode 100644 index 0000000000..20a759adcf --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml @@ -0,0 +1,443 @@ + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SSendRAW.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SSendRAW.xml new file mode 100644 index 0000000000..189050a784 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SSendRAW.xml @@ -0,0 +1,822 @@ + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 1f9f60012d..53e7c612f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -347,6 +347,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl()); final String transitUri = transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key())); + flowFile = session.putAttribute(flowFile, SiteToSiteAttributes.S2S_PORT_ID.key(), getTargetIdentifier()); session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false); session.remove(flowFile); @@ -413,6 +414,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final Map attributes = new HashMap<>(2); attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host); attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port); + attributes.put(SiteToSiteAttributes.S2S_PORT_ID.key(), getTargetIdentifier()); flowFile = session.putAllAttributes(flowFile, attributes); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 1b4194373b..9ced95603b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -173,6 +173,7 @@ public class TestStandardRemoteGroupPort { assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(peerUrl + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()), provenanceEvent.getTransitUri()); assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails()); + assertEquals("remote-group-port-id", provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())); } } @@ -220,6 +221,7 @@ public class TestStandardRemoteGroupPort { final MockFlowFile flowFile = flowFiles.get(0); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost()); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort()); + flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(), "remote-group-port-id"); } } @@ -253,6 +255,7 @@ public class TestStandardRemoteGroupPort { assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails()); + assertEquals("remote-group-port-id", provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())); } @Test @@ -436,6 +439,7 @@ public class TestStandardRemoteGroupPort { final MockFlowFile flowFile = flowFiles.get(0); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost()); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort()); + flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(), "remote-group-port-id"); } }