NIFI-4654: Support reporting RAS S2S lineage to Atlas

- Added 's2s.port.id' FlowFile attribute to track target remote Port id
- Use 's2s.port.id' to analyze RAW S2S provenance events
- This closes #2863
This commit is contained in:
Koji Kawamura 2018-07-09 11:28:17 +09:00 committed by Matt Gilman
parent b279624398
commit e94f0757db
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
12 changed files with 1882 additions and 27 deletions

View File

@ -23,7 +23,9 @@ public enum SiteToSiteAttributes implements FlowFileAttributeKey {
S2S_HOST("s2s.host"), S2S_HOST("s2s.host"),
S2S_ADDRESS("s2s.address"); S2S_ADDRESS("s2s.address"),
S2S_PORT_ID("s2s.port.id");
private final String key; private final String key;

View File

@ -20,6 +20,7 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -53,14 +54,13 @@ public class NiFiRemotePort extends NiFiS2S {
final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port"); final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port");
final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
final String remotePortId = event.getComponentId(); final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
// Find connections that connects to/from the remote port. // Find connections that connects to/from the remote port.
final String componentId = event.getComponentId();
final List<ConnectionStatus> connections = isRemoteInputPort final List<ConnectionStatus> connections = isRemoteInputPort
? context.findConnectionTo(remotePortId) ? context.findConnectionTo(componentId)
: context.findConnectionFrom(remotePortId); : context.findConnectionFrom(componentId);
if (connections == null || connections.isEmpty()) { if (connections == null || connections.isEmpty()) {
logger.warn("Connection was not found: {}", new Object[]{event}); logger.warn("Connection was not found: {}", new Object[]{event});
return null; return null;
@ -70,7 +70,7 @@ public class NiFiRemotePort extends NiFiS2S {
final ConnectionStatus connection = connections.get(0); final ConnectionStatus connection = connections.get(0);
final Referenceable ref = new Referenceable(type); final Referenceable ref = new Referenceable(type);
ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName()); ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName());
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, s2sUrl.targetPortId)); ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, s2SPort.targetPortId));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
} }
@ -79,4 +79,9 @@ public class NiFiRemotePort extends NiFiS2S {
public String targetComponentTypePattern() { public String targetComponentTypePattern() {
return "^Remote (In|Out)put Port$"; return "^Remote (In|Out)put Port$";
} }
@Override
protected String getRawProtocolPortId(ProvenanceEventRecord event) {
return event.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key());
}
} }

View File

@ -54,7 +54,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
final String rootPortId = event.getComponentId(); final String rootPortId = event.getComponentId();
final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver()); final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
// Find connections connecting to/from the remote port. // Find connections connecting to/from the remote port.
final List<ConnectionStatus> connections = isInputPort final List<ConnectionStatus> connections = isInputPort
@ -69,7 +69,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
final ConnectionStatus connection = connections.get(0); final ConnectionStatus connection = connections.get(0);
final Referenceable ref = new Referenceable(type); final Referenceable ref = new Referenceable(type);
ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName()); ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName());
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, rootPortId)); ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, rootPortId));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
} }
@ -78,4 +78,9 @@ public class NiFiRootGroupPort extends NiFiS2S {
public String targetComponentTypePattern() { public String targetComponentTypePattern() {
return "^(In|Out)put Port$"; return "^(In|Out)put Port$";
} }
@Override
protected String getRawProtocolPortId(ProvenanceEventRecord event) {
return event.getComponentId();
}
} }

View File

@ -18,10 +18,10 @@ package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.resolver.ClusterResolver; import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -29,27 +29,30 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class); private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class);
private static final Pattern RAW_URL_REGEX = Pattern.compile("([0-9a-zA-Z\\-]+)"); private static final Pattern RAW_URL_REGEX = Pattern.compile("nifi://([^:/]+):\\d+/([0-9a-zA-Z\\-]+)");
private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*"); private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*");
protected S2STransitUrl parseTransitURL(String transitUri, ClusterResolver clusterResolver) { protected S2SPort analyzeS2SPort(ProvenanceEventRecord event, ClusterResolver clusterResolver) {
final URL url = parseUrl(transitUri); final String transitUri = event.getTransitUri();
final int protocolIndex = transitUri.indexOf(':');
final String protocol = transitUri.substring(0, protocolIndex).toLowerCase();
final String clusterName = clusterResolver.fromHostNames(url.getHost()); final String targetHostname;
final String targetPortId; final String targetPortId;
final String protocol = url.getProtocol().toLowerCase();
switch (protocol) { switch (protocol) {
case "http": case "http":
case "https": { case "https": {
final Matcher uriMatcher = matchUrl(url, HTTP_URL_REGEX); final Matcher uriMatcher = matchUrl(transitUri, HTTP_URL_REGEX);
targetHostname = parseUri(transitUri).getHost();
targetPortId = uriMatcher.group(2); targetPortId = uriMatcher.group(2);
} }
break; break;
case "nifi": { case "nifi": {
final Matcher uriMatcher = matchUrl(url, RAW_URL_REGEX); final Matcher uriMatcher = matchUrl(transitUri, RAW_URL_REGEX);
targetPortId = uriMatcher.group(1); targetHostname = uriMatcher.group(1);
targetPortId = getRawProtocolPortId(event);
} }
break; break;
@ -58,23 +61,25 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
} }
return new S2STransitUrl(clusterName, targetPortId); final String clusterName = clusterResolver.fromHostNames(targetHostname);
return new S2SPort(clusterName, targetPortId);
} }
private Matcher matchUrl(URL url, Pattern pattern) { abstract protected String getRawProtocolPortId(ProvenanceEventRecord event);
final Matcher uriMatcher = pattern.matcher(url.getPath());
private Matcher matchUrl(String url, Pattern pattern) {
final Matcher uriMatcher = pattern.matcher(url);
if (!uriMatcher.matches()) { if (!uriMatcher.matches()) {
throw new IllegalArgumentException("Unexpected transit URI: " + url); throw new IllegalArgumentException("Unexpected transit URI: " + url);
} }
return uriMatcher; return uriMatcher;
} }
protected static class S2STransitUrl { protected static class S2SPort {
final String clusterName; final String clusterName;
final String targetPortId; final String targetPortId;
public S2STransitUrl(String clusterName, String targetPortId) { public S2SPort(String clusterName, String targetPortId) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.targetPortId = targetPortId; this.targetPortId = targetPortId;
} }

View File

@ -329,7 +329,7 @@ Processor 3</pre>
</td> </td>
<td>rootGroupPortGUID@clusterName <td>rootGroupPortGUID@clusterName
(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td> (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
<td rowspan="4"><strong>NOTE:</strong>Only HTTP S2S protocol is supported. RAW support may be added in the future as it needs NiFi code modification. See <a href="https://issues.apache.org/jira/browse/NIFI-4654">NIFI-4654</a> for detail.</td> <td></td>
</tr> </tr>
<tr> <tr>
<td> <td>
@ -345,6 +345,7 @@ upstream (nifi_flow_path)
</td> </td>
<td>remoteInputPortGUID@clusterName<br/>(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1) <td>remoteInputPortGUID@clusterName<br/>(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1)
<p>NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.</p></td> <p>NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.</p></td>
<td></td>
</tr> </tr>
<tr> <tr>
<td rowspan="2"> <td rowspan="2">
@ -364,6 +365,7 @@ upstream (nifi_flow_path)
</td> </td>
<td>rootGroupPortGUID@clusterName <td>rootGroupPortGUID@clusterName
(e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td> (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
<td></td>
</tr> </tr>
<tr> <tr>
<td> <td>
@ -385,6 +387,7 @@ remote target port
<li>For 'nifi_queue': downstreamPathGUID@clusterName<br/>(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)</li> <li>For 'nifi_queue': downstreamPathGUID@clusterName<br/>(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)</li>
</ul> </ul>
</td> </td>
<td></td>
</tr> </tr>
<tr> <tr>
<td>NiFiRootGroupPort</td> <td>NiFiRootGroupPort</td>

View File

@ -24,6 +24,7 @@ import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas; import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
import org.apache.nifi.atlas.resolver.ClusterResolvers; import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test; import org.junit.Test;
@ -49,7 +50,7 @@ import static org.mockito.Mockito.when;
public class TestNiFiRemotePort { public class TestNiFiRemotePort {
@Test @Test
public void testRemoteInputPort() { public void testRemoteInputPortHTTP() {
final String componentType = "Remote Input Port"; final String componentType = "Remote Input Port";
final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files"; final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class); final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
@ -89,7 +90,7 @@ public class TestNiFiRemotePort {
} }
@Test @Test
public void testRemoteOutputPort() { public void testRemoteOutputPortHTTP() {
final String componentType = "Remote Output Port"; final String componentType = "Remote Output Port";
final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files"; final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
@ -123,5 +124,85 @@ public class TestNiFiRemotePort {
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
} }
@Test
public void testRemoteInputPortRAW() {
final String componentType = "Remote Input Port";
// The UUID in a Transit Uri is a FlowFile UUID
final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82";
final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
when(sendEvent.getEventId()).thenReturn(123L);
// Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client.
when(sendEvent.getComponentId()).thenReturn("s2s-client-component-guid");
when(sendEvent.getComponentType()).thenReturn(componentType);
when(sendEvent.getTransitUri()).thenReturn(transitUri);
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
when(sendEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
connection.setDestinationId("s2s-client-component-guid");
connection.setDestinationName("inputPortA");
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.findConnectionTo(matches("s2s-client-component-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
assertNotNull(analyzer);
final DataSetRefs refs = analyzer.analyze(context, sendEvent);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
assertEquals(1, refs.getComponentIds().size());
// Should report connected componentId.
assertTrue(refs.getComponentIds().contains("s2s-client-component-guid"));
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
public void testRemoteOutputPortRAW() {
final String componentType = "Remote Output Port";
// The UUID in a Transit Uri is a FlowFile UUID
final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
// Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client.
when(record.getComponentId()).thenReturn("s2s-client-component-guid");
when(record.getComponentType()).thenReturn(componentType);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
when(record.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
connection.setSourceId("s2s-client-component-guid");
connection.setSourceName("outputPortA");
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.findConnectionFrom(matches("s2s-client-component-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType());
assertNotNull(analyzer);
final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(1, refs.getInputs().size());
assertEquals(0, refs.getOutputs().size());
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
} }

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.when;
/**
* Tests for RootGroupPorts.
* More complex and detailed tests are available at {@link ITReportLineageToAtlas}.
*/
public class TestNiFiRootGroupPort {
@Test
public void testInputPortHTTP() {
final String componentType = "Input Port";
final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class);
when(receiveEvent.getEventId()).thenReturn(123L);
when(receiveEvent.getComponentId()).thenReturn("port-guid");
when(receiveEvent.getComponentType()).thenReturn(componentType);
when(receiveEvent.getTransitUri()).thenReturn(transitUri);
when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
connection.setSourceId("port-guid");
connection.setSourceName("inputPortA");
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
assertNotNull(analyzer);
final DataSetRefs refs = analyzer.analyze(context, receiveEvent);
assertEquals(1, refs.getInputs().size());
assertEquals(0, refs.getOutputs().size());
assertEquals(1, refs.getComponentIds().size());
// Should report connected componentId.
assertTrue(refs.getComponentIds().contains("port-guid"));
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
public void testRemoteOutputPortHTTP() {
final String componentType = "Output Port";
final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
when(sendEvent.getComponentId()).thenReturn("port-guid");
when(sendEvent.getComponentType()).thenReturn(componentType);
when(sendEvent.getTransitUri()).thenReturn(transitUri);
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
connection.setDestinationId("port-guid");
connection.setDestinationName("outputPortA");
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
assertNotNull(analyzer);
final DataSetRefs refs = analyzer.analyze(context, sendEvent);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
public void testRemoteInputPortRAW() {
final String componentType = "Input Port";
// The UUID in a Transit Uri is a FlowFile UUID
final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82";
final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class);
when(receiveEvent.getEventId()).thenReturn(123L);
when(receiveEvent.getComponentId()).thenReturn("port-guid");
when(receiveEvent.getComponentType()).thenReturn(componentType);
when(receiveEvent.getTransitUri()).thenReturn(transitUri);
when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
connection.setSourceId("port-guid");
connection.setSourceName("inputPortA");
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
assertNotNull(analyzer);
final DataSetRefs refs = analyzer.analyze(context, receiveEvent);
assertEquals(1, refs.getInputs().size());
assertEquals(0, refs.getOutputs().size());
assertEquals(1, refs.getComponentIds().size());
// Should report connected componentId.
assertTrue(refs.getComponentIds().contains("port-guid"));
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
public void testRemoteOutputPortRAW() {
final String componentType = "Output Port";
// The UUID in a Transit Uri is a FlowFile UUID
final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c";
final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
when(sendEvent.getComponentId()).thenReturn("port-guid");
when(sendEvent.getComponentType()).thenReturn(componentType);
when(sendEvent.getTransitUri()).thenReturn(transitUri);
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
connection.setDestinationId("port-guid");
connection.setDestinationName("outputPortA");
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
assertNotNull(analyzer);
final DataSetRefs refs = analyzer.analyze(context, sendEvent);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.lineage.ComputeLineageResult; import org.apache.nifi.provenance.lineage.ComputeLineageResult;
@ -652,6 +653,64 @@ public class ITReportLineageToAtlas {
} }
/**
* A client NiFi sends FlowFiles to a remote NiFi using RAW protocol.
*/
private void testS2SSendRAW(TestConfiguration tc) throws Exception {
final ProvenanceRecords prs = tc.provenanceRecords;
prs.add(pr("ca71e4d9-2a4f-3970", "Generate A", CREATE));
prs.add(pr("c439cdca-e989-3491", "Generate C", CREATE));
prs.add(pr("b775b657-5a5b-3708", "GetTwitter", CREATE));
// The remote port GUID is different than the Remote Input Ports.
final SimpleProvenanceRecord sendEvent1 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
"nifi://nifi.example.com:8081/d668805a-0ad0-44d1-aa65-ac362bf06e10");
sendEvent1.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000");
prs.add(sendEvent1);
final SimpleProvenanceRecord sendEvent2 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
"nifi://nifi.example.com:8081/d4ec2459-d903-4a73-a09e-853f9997d3fb");
sendEvent2.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000");
prs.add(sendEvent2);
prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // C
prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // Twitter
// Generate C created a FlowFile, then it's sent via S2S
tc.addLineage(createLineage(prs, 1, 3, 5));
// GetTwitter created a FlowFile, then it's sent via S2S
tc.addLineage(createLineage(prs, 2, 4, 6));
test(tc);
waitNotificationsGetDelivered();
final Lineage lineage = getLineage();
final Node flow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example");
final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
final Node pathB = lineage.findNode("nifi_flow_path", "Generate B", "333255b6-eb02-3056");
final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
final Node pathI = lineage.findNode("nifi_flow_path", "InactiveProcessor", "7033f311-ac68-3cab");
// UpdateAttribute has multiple incoming paths, so it generates a queue to receive those.
final Node queueU = lineage.findNode("nifi_queue", "queue", "c5392447-e9f1-33ad");
final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "c5392447-e9f1-33ad");
// These are starting paths.
lineage.assertLink(flow, pathA);
lineage.assertLink(flow, pathB);
lineage.assertLink(flow, pathC);
lineage.assertLink(flow, pathT);
lineage.assertLink(flow, pathI);
// Multiple paths connected to the same path.
lineage.assertLink(pathB, queueU);
lineage.assertLink(pathC, queueU);
lineage.assertLink(queueU, pathU);
}
@Test @Test
public void testS2SSendSimple() throws Exception { public void testS2SSendSimple() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SSend"); final TestConfiguration tc = new TestConfiguration("S2SSend");
@ -711,6 +770,65 @@ public class ITReportLineageToAtlas {
lineage.assertLink(genT, pathT); lineage.assertLink(genT, pathT);
} }
@Test
public void testS2SSendSimpleRAW() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
testS2SSendRAW(tc);
final Lineage lineage = getLineage();
// The FlowFile created by Generate A has not been finished (by DROP event, but SIMPLE_PATH strategy can report it.
final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
final Node genA = lineage.findNode("nifi_data", "Generate A", "ca71e4d9-2a4f-3970");
lineage.assertLink(genA, pathA);
final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
// Generate C and GetTwitter have reported proper SEND lineage to the input port.
final Node remoteInputPortD = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
lineage.assertLink(pathC, remoteInputPortQ);
lineage.assertLink(pathT, remoteInputPortQ);
lineage.assertLink(remoteInputPortQ, remoteInputPortP);
lineage.assertLink(remoteInputPortP, remoteInputPortD);
// nifi_data is created for each obscure input processor.
final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
lineage.assertLink(genC, pathC);
lineage.assertLink(genT, pathT);
}
@Test
public void testS2SSendCompleteRAW() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
testS2SSendRAW(tc);
final Lineage lineage = getLineage();
// Complete path has hash.
final Node pathC = lineage.findNode("nifi_flow_path", "Generate C, Remote Input Port",
"c439cdca-e989-3491-0000-000000000000::1605753423@example");
final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter, Remote Input Port",
"b775b657-5a5b-3708-0000-000000000000::3843156947@example");
// Generate C and GetTwitter have reported proper SEND lineage to the input port.
final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
lineage.assertLink(pathC, remoteInputPort);
lineage.assertLink(pathT, remoteInputPort);
// nifi_data is created for each obscure input processor.
final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
lineage.assertLink(genC, pathC);
lineage.assertLink(genT, pathT);
}
/** /**
* A client NiFi gets FlowFiles from a remote NiFi. * A client NiFi gets FlowFiles from a remote NiFi.
*/ */
@ -754,6 +872,50 @@ public class ITReportLineageToAtlas {
} }
/**
* A client NiFi gets FlowFiles from a remote NiFi using RAW protocol.
*/
@Test
public void testS2SGetRAW() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SGetRAW");
final ProvenanceRecords prs = tc.provenanceRecords;
// The remote port GUID is different than the Remote Output Ports.
final SimpleProvenanceRecord receiveEvent = pr("7375f8f6-4604-468d", "Remote Output Port", RECEIVE,
"nifi://nifi.example.com:8081/7f1a5d65-65bb-4473-b1a4-4a742d9af4a7");
receiveEvent.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "392e7343-3950-329b-0000-000000000000");
prs.add(receiveEvent);
test(tc);
waitNotificationsGetDelivered();
final Lineage lineage = getLineage();
final Node flow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example");
final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
// These entities should be created by notification.
final Node remoteOutputPortDataSet = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
final Node remoteOutputPortProcess = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
lineage.assertLink(remoteOutputPortDataSet, remoteOutputPortProcess);
lineage.assertLink(flow, remoteOutputPortProcess);
lineage.assertLink(remoteOutputPortProcess, queueL);
lineage.assertLink(remoteOutputPortProcess, queueP);
lineage.assertLink(remoteOutputPortProcess, queueU);
lineage.assertLink(queueL, pathL);
lineage.assertLink(queueP, pathP);
lineage.assertLink(queueU, pathU);
}
/** /**
* A remote NiFi transfers FlowFiles to remote client NiFis. * A remote NiFi transfers FlowFiles to remote client NiFis.
* This NiFi instance owns RootProcessGroup output port. * This NiFi instance owns RootProcessGroup output port.
@ -810,6 +972,60 @@ public class ITReportLineageToAtlas {
lineage.assertLink(inputPort, path); lineage.assertLink(inputPort, path);
} }
/**
* A remote NiFi transfers FlowFiles to remote client NiFis using RAW protocol.
* This NiFi instance owns RootProcessGroup output port.
*/
@Test
public void testS2STransferRAW() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2STransfer");
final ProvenanceRecords prs = tc.provenanceRecords;
prs.add(pr("392e7343-3950-329b", "Output Port", SEND,
"nifi://nifi.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82"));
test(tc);
waitNotificationsGetDelivered();
final Lineage lineage = getLineage();
final Node flow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
final Node path = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
lineage.assertLink(flow, path);
lineage.assertLink(path, outputPort);
}
/**
* A remote NiFi receives FlowFiles from remote client NiFis using RAW protocol.
* This NiFi instance owns RootProcessGroup input port.
*/
@Test
public void testS2SReceiveRAW() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SReceive");
final ProvenanceRecords prs = tc.provenanceRecords;
prs.add(pr("77919f59-533e-35a3", "Input Port", RECEIVE,
"nifi://nifi.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c"));
test(tc);
waitNotificationsGetDelivered();
final Lineage lineage = getLineage();
final Node flow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
final Node path = lineage.findNode("nifi_flow_path", "input, UpdateAttribute", "77919f59-533e-35a3");
final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
lineage.assertLink(flow, path);
lineage.assertLink(flow, inputPort);
lineage.assertLink(inputPort, path);
}
@Test @Test
public void testS2SReceiveAndSendCombination() throws Exception { public void testS2SReceiveAndSendCombination() throws Exception {
testS2SReceive(); testS2SReceive();
@ -874,6 +1090,70 @@ public class ITReportLineageToAtlas {
} }
@Test
public void testS2SReceiveAndSendCombinationRAW() throws Exception {
testS2SReceiveRAW();
testS2SSendSimpleRAW();
final Lineage lineage = getLineage();
final Node remoteFlow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
final Node localFlow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example");
final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
// Remote flow owns the inputPort.
lineage.assertLink(remoteFlow, inputPort);
// These paths within local flow sends data to the remote flow through the remote input port.
lineage.assertLink(localFlow, pathC);
lineage.assertLink(localFlow, pathT);
lineage.assertLink(pathC, remoteInputPortQ);
lineage.assertLink(pathT, remoteInputPortQ);
lineage.assertLink(remoteInputPortQ, remoteInputPortP);
lineage.assertLink(remoteInputPortP, inputPort);
}
@Test
public void testS2STransferAndGetCombinationRAW() throws Exception {
testS2STransferRAW();
testS2SGetRAW();
final Lineage lineage = getLineage();
final Node remoteFlow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
final Node localFlow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example");
final Node remoteGen = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
// Remote flow owns the outputPort and transfer data generated by GenerateFlowFile.
lineage.assertLink(remoteFlow, remoteGen);
lineage.assertLink(remoteGen, outputPort);
// The Remote Output Port path in local flow gets data from the remote.
lineage.assertLink(localFlow, remoteOutputPortP);
lineage.assertLink(outputPort, remoteOutputPortP);
lineage.assertLink(remoteOutputPortP, queueL);
lineage.assertLink(remoteOutputPortP, queueP);
lineage.assertLink(remoteOutputPortP, queueU);
lineage.assertLink(queueL, pathL);
lineage.assertLink(queueP, pathP);
lineage.assertLink(queueU, pathU);
}
/** /**
* A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything. * A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything.
*/ */

View File

@ -0,0 +1,443 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<template encoding-version="1.1">
<description></description>
<groupId>27b7b6b8-015f-1000-0d31-197ae42bab34</groupId>
<name>S2SGetRAW</name>
<snippet>
<processGroups>
<id>9fc65d0a-ff54-3c07-0000-000000000000</id>
<parentGroupId>c81f8a46-4aa3-313e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>8812c40b-5c71-369f-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
<id>bb530e58-ee14-3cac-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
<id>7375f8f6-4604-468d-0000-000000000000</id>
<type>REMOTE_OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9df33c4b-8c26-33e5-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
<id>97cc5b27-22f3-3c3b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
<id>7375f8f6-4604-468d-0000-000000000000</id>
<type>REMOTE_OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>1ddd5163-7815-3117-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
<id>4f3bfa4c-6427-3aac-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
<id>7375f8f6-4604-468d-0000-000000000000</id>
<type>REMOTE_OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>12073df1-f38b-3cad-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<position>
<x>872.9999891005355</x>
<y>296.0000048267144</y>
</position>
<height>68.00000762939453</height>
<label>A FlowFile is passed to every downstream process paths.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>338.0</width>
</labels>
<processors>
<id>97cc5b27-22f3-3c3b-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<position>
<x>470.9999891005356</x>
<y>679.0000048267144</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>bb530e58-ee14-3cac-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<position>
<x>150.99998910053557</x>
<y>514.0000048267144</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>4f3bfa4c-6427-3aac-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<position>
<x>810.9999891005356</x>
<y>524.0000048267144</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>
<name>Create Missing Directories</name>
</value>
</entry>
<entry>
<key>Maximum File Count</key>
<value>
<name>Maximum File Count</name>
</value>
</entry>
<entry>
<key>Last Modified Time</key>
<value>
<name>Last Modified Time</name>
</value>
</entry>
<entry>
<key>Permissions</key>
<value>
<name>Permissions</name>
</value>
</entry>
<entry>
<key>Owner</key>
<value>
<name>Owner</name>
</value>
</entry>
<entry>
<key>Group</key>
<value>
<name>Group</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Directory</key>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>fail</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>true</value>
</entry>
<entry>
<key>Maximum File Count</key>
</entry>
<entry>
<key>Last Modified Time</key>
</entry>
<entry>
<key>Permissions</key>
</entry>
<entry>
<key>Owner</key>
</entry>
<entry>
<key>Group</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutFile</type>
</processors>
<remoteProcessGroups>
<id>c6acb687-616a-3d36-0000-000000000000</id>
<parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
<position>
<x>451.4000360293711</x>
<y>231.5999721410119</y>
</position>
<communicationsTimeout>30 sec</communicationsTimeout>
<contents>
<inputPorts>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<connected>false</connected>
<exists>true</exists>
<id>015f101e-dcd7-17bd-8899-1a723733521a</id>
<name>input</name>
<targetRunning>true</targetRunning>
<transmitting>false</transmitting>
<useCompression>false</useCompression>
</inputPorts>
<outputPorts>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<connected>true</connected>
<exists>true</exists>
<id>7375f8f6-4604-468d-0000-000000000000</id>
<targetId>392e7343-3950-329b-0000-000000000000</targetId>
<name>output</name>
<targetRunning>true</targetRunning>
<transmitting>true</transmitting>
<useCompression>false</useCompression>
</outputPorts>
</contents>
<proxyHost></proxyHost>
<proxyUser></proxyUser>
<targetUri>http://localhost:8080/nifi</targetUri>
<targetUris>http://localhost:8080/nifi</targetUris>
<transportProtocol>RAW</transportProtocol>
<yieldDuration>10 sec</yieldDuration>
</remoteProcessGroups>
</contents>
<name>S2SGetRAW</name>
</processGroups>
</snippet>
<timestamp>10/20/2017 13:03:49 JST</timestamp>
</template>

View File

@ -0,0 +1,822 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<template encoding-version="1.1">
<description></description>
<groupId>27b7b6b8-015f-1000-0d31-197ae42bab34</groupId>
<name>S2SSendRAW</name>
<snippet>
<processGroups>
<id>b3a9430b-3e40-3a04-0000-000000000000</id>
<parentGroupId>c81f8a46-4aa3-313e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>af5cc030-abbd-31bb-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>1504e817-9715-35fb-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>c5392447-e9f1-33ad-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>aff3eea5-0826-338d-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>18474287-6326-311a-0000-000000000000</groupId>
<id>f31a6b53-3077-4c59-0000-000000000000</id>
<type>REMOTE_INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>7033f311-ac68-3cab-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>071e7d2a-0680-3d9a-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>c5392447-e9f1-33ad-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>c439cdca-e989-3491-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>308a0785-e948-3151-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>18474287-6326-311a-0000-000000000000</groupId>
<id>f31a6b53-3077-4c59-0000-000000000000</id>
<type>REMOTE_INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>b775b657-5a5b-3708-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>470d0e2a-6281-3d3b-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>18474287-6326-311a-0000-000000000000</groupId>
<id>f31a6b53-3077-4c59-0000-000000000000</id>
<type>REMOTE_INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>1504e817-9715-35fb-0000-000000000000</id>
<type>FUNNEL</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4dacb06c-5bb8-3318-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>c5392447-e9f1-33ad-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>333255b6-eb02-3056-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>7521c9ce-7e0c-34ed-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>1504e817-9715-35fb-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>b3a9430b-3e40-3a04-0000-000000000000</groupId>
<id>ca71e4d9-2a4f-3970-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>1504e817-9715-35fb-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>353.013200790822</x>
<y>106.10680357116347</y>
</position>
</funnels>
<labels>
<id>9dbb0346-5ba8-3208-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>615.6071466250744</x>
<y>-91.46566086985234</y>
</position>
<height>82.47064208984375</height>
<label>If multiple paths are sending data through the same remote port,
then we need to fetch previous provenance event from
the actual SEND event, to report which path passed the data.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>467.25079345703125</width>
</labels>
<labels>
<id>4414cff6-9e3d-311f-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>1124.8022214005925</x>
<y>87.1594194315544</y>
</position>
<height>69.47367095947266</height>
<label>If a processor has never generated any data,
then NiFi will not report the lineage to Atlas because it hasn't happened.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>416.84210205078125</width>
</labels>
<processors>
<id>b775b657-5a5b-3708-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>711.1157953067149</x>
<y>1.67367972157831</y>
</position>
<bundle>
<artifact>nifi-social-media-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Twitter Endpoint</key>
<value>
<name>Twitter Endpoint</name>
</value>
</entry>
<entry>
<key>Consumer Key</key>
<value>
<name>Consumer Key</name>
</value>
</entry>
<entry>
<key>Consumer Secret</key>
<value>
<name>Consumer Secret</name>
</value>
</entry>
<entry>
<key>Access Token</key>
<value>
<name>Access Token</name>
</value>
</entry>
<entry>
<key>Access Token Secret</key>
<value>
<name>Access Token Secret</name>
</value>
</entry>
<entry>
<key>Languages</key>
<value>
<name>Languages</name>
</value>
</entry>
<entry>
<key>Terms to Filter On</key>
<value>
<name>Terms to Filter On</name>
</value>
</entry>
<entry>
<key>IDs to Follow</key>
<value>
<name>IDs to Follow</name>
</value>
</entry>
<entry>
<key>Locations to Filter On</key>
<value>
<name>Locations to Filter On</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Twitter Endpoint</key>
<value>Sample Endpoint</value>
</entry>
<entry>
<key>Consumer Key</key>
</entry>
<entry>
<key>Consumer Secret</key>
</entry>
<entry>
<key>Access Token</key>
</entry>
<entry>
<key>Access Token Secret</key>
</entry>
<entry>
<key>Languages</key>
</entry>
<entry>
<key>Terms to Filter On</key>
</entry>
<entry>
<key>IDs to Follow</key>
</entry>
<entry>
<key>Locations to Filter On</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GetTwitter</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.twitter.GetTwitter</type>
</processors>
<processors>
<id>c439cdca-e989-3491-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>674.1998712344492</x>
<y>-474.2524572489417</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate C</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>c5392447-e9f1-33ad-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>350.48686507975015</x>
<y>-234.94570552127266</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>ca71e4d9-2a4f-3970-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>-73.58932859465233</x>
<y>-130.67362652994757</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate A</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>333255b6-eb02-3056-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>167.93687074616798</x>
<y>-480.30510007120733</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate B</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>7033f311-ac68-3cab-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>1147.8839837832775</x>
<y>171.22101629994745</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.5.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>InactiveProcessor</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<remoteProcessGroups>
<id>18474287-6326-311a-0000-000000000000</id>
<parentGroupId>b3a9430b-3e40-3a04-0000-000000000000</parentGroupId>
<position>
<x>534.4000360293711</x>
<y>319.5999721410119</y>
</position>
<communicationsTimeout>30 sec</communicationsTimeout>
<contents>
<inputPorts>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<connected>true</connected>
<exists>true</exists>
<id>f31a6b53-3077-4c59-0000-000000000000</id>
<targetId>77919f59-533e-35a3-0000-000000000000</targetId>
<name>input</name>
<targetRunning>true</targetRunning>
<transmitting>true</transmitting>
<useCompression>false</useCompression>
</inputPorts>
</contents>
<proxyHost></proxyHost>
<proxyUser></proxyUser>
<targetUri>http://localhost:8080/nifi</targetUri>
<targetUris>http://localhost:8080/nifi</targetUris>
<transportProtocol>RAW</transportProtocol>
<yieldDuration>10 sec</yieldDuration>
</remoteProcessGroups>
</contents>
<name>S2SSendRAW</name>
</processGroups>
</snippet>
<timestamp>10/18/2017 16:02:17 JST</timestamp>
</template>

View File

@ -347,6 +347,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl()); logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
final String transitUri = transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key())); final String transitUri = transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key()));
flowFile = session.putAttribute(flowFile, SiteToSiteAttributes.S2S_PORT_ID.key(), getTargetIdentifier());
session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false); session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
session.remove(flowFile); session.remove(flowFile);
@ -413,6 +414,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final Map<String,String> attributes = new HashMap<>(2); final Map<String,String> attributes = new HashMap<>(2);
attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host); attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port); attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
attributes.put(SiteToSiteAttributes.S2S_PORT_ID.key(), getTargetIdentifier());
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);

View File

@ -173,6 +173,7 @@ public class TestStandardRemoteGroupPort {
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(peerUrl + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()), provenanceEvent.getTransitUri()); assertEquals(peerUrl + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()), provenanceEvent.getTransitUri());
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails()); assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
assertEquals("remote-group-port-id", provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()));
} }
} }
@ -220,6 +221,7 @@ public class TestStandardRemoteGroupPort {
final MockFlowFile flowFile = flowFiles.get(0); final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost()); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort()); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(), "remote-group-port-id");
} }
} }
@ -253,6 +255,7 @@ public class TestStandardRemoteGroupPort {
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails()); assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
assertEquals("remote-group-port-id", provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()));
} }
@Test @Test
@ -436,6 +439,7 @@ public class TestStandardRemoteGroupPort {
final MockFlowFile flowFile = flowFiles.get(0); final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost()); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort()); flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(), "remote-group-port-id");
} }
} }