diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java new file mode 100644 index 0000000000..ed6437e08c --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowfile.attributes; + +/** + * FlowFile attributes used during site-to-site transfer. + */ +public enum SiteToSiteAttributes implements FlowFileAttributeKey { + + S2S_HOST("s2s.host"), + + S2S_ADDRESS("s2s.address"); + + private final String key; + + private SiteToSiteAttributes(final String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 3a23601984..b1a1c92573 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +39,7 @@ import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.processor.ProcessContext; @@ -56,6 +59,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -338,6 +342,17 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + + final Communicant communicant = transaction.getCommunicant(); + final String host = StringUtils.isEmpty(communicant.getHost()) ? "unknown" : communicant.getHost(); + final String port = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort()); + + final Map attributes = new HashMap<>(2); + attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host); + attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.importFrom(dataPacket.getData(), flowFile); final long receiveNanos = System.nanoTime() - start; flowFilesReceived.add(flowFile); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index e149481dbe..fe4b1b1ce3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -20,6 +20,7 @@ import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -37,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +47,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -448,7 +451,16 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); - flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + + final String host = StringUtils.isEmpty(peer.getHost()) ? "unknown" : peer.getHost(); + final String port = peer.getPort() <= 0 ? "unknown" : String.valueOf(peer.getPort()); + + final Map attributes = new HashMap<>(4); + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host); + attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port); + + flowFile = session.putAllAttributes(flowFile, attributes); final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid); session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null @@ -506,12 +518,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); } - // For routing purposes, downstream consumers often need to reference Flowfile's originating system - for (FlowFile flowFile : transaction.getFlowFilesSent()){ - flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost()); - flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort()); - } - // Commit the session so that we have persisted the data session.commit(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 23d3fdaf60..43009b4785 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -176,7 +176,7 @@ public class TestStandardRemoteGroupPort { // Return null when it gets called second time. doReturn(dataPacket).doReturn(null).when(this.transaction).receive(); - doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes)); + doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class)); doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile)); port.onTrigger(context, session); @@ -244,7 +244,7 @@ public class TestStandardRemoteGroupPort { // Return null when it's called second time. doReturn(dataPacket).doReturn(null).when(transaction).receive(); - doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes)); + doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class)); doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile)); port.onTrigger(context, session); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java index f5e803d10f..9f86d5b122 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java @@ -505,7 +505,7 @@ public class TestHttpFlowFileServerProtocol { }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class)); // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution // which returns flowFile instance used later. - doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); + doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { final String transitUri = (String)invocation.getArguments()[1]; @@ -567,12 +567,16 @@ public class TestHttpFlowFileServerProtocol { } return flowFile1; }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class)); - // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution - // which returns flowFile instance used later. + + // AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution + // which returns flowFile instance used later, it is called twice for each flow file doReturn(flowFile1) + .doReturn(flowFile1) .doReturn(flowFile2) - .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); + .doReturn(flowFile2) + .when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); + doAnswer(invocation -> { final String transitUri = (String)invocation.getArguments()[1]; final String detail = (String)invocation.getArguments()[3];