NIFI-2585 Moving attributes into loop in AbstractFlowFileServerProtocol, and also updating StandardRemoteGroupPort to apply the same attributes when doing a pull-based site-to-site.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Bryan Bende 2016-12-08 15:17:06 -05:00
parent 28e5d85493
commit f7d761a28a
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 77 additions and 13 deletions

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flowfile.attributes;
/**
* FlowFile attributes used during site-to-site transfer.
*/
public enum SiteToSiteAttributes implements FlowFileAttributeKey {
S2S_HOST("s2s.host"),
S2S_ADDRESS("s2s.address");
private final String key;
private SiteToSiteAttributes(final String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
}

View File

@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -37,6 +39,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -56,6 +59,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -338,6 +342,17 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
final Communicant communicant = transaction.getCommunicant();
final String host = StringUtils.isEmpty(communicant.getHost()) ? "unknown" : communicant.getHost();
final String port = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort());
final Map<String,String> attributes = new HashMap<>(2);
attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.importFrom(dataPacket.getData(), flowFile); flowFile = session.importFrom(dataPacket.getData(), flowFile);
final long receiveNanos = System.nanoTime() - start; final long receiveNanos = System.nanoTime() - start;
flowFilesReceived.add(flowFile); flowFilesReceived.add(flowFile);

View File

@ -20,6 +20,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Port;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -37,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,6 +47,7 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -448,7 +451,16 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
final long transferNanos = System.nanoTime() - startNanos; final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final String host = StringUtils.isEmpty(peer.getHost()) ? "unknown" : peer.getHost();
final String port = peer.getPort() <= 0 ? "unknown" : String.valueOf(peer.getPort());
final Map<String,String> attributes = new HashMap<>(4);
attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
flowFile = session.putAllAttributes(flowFile, attributes);
final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid); final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid);
session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
@ -506,12 +518,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
} }
// For routing purposes, downstream consumers often need to reference Flowfile's originating system
for (FlowFile flowFile : transaction.getFlowFilesSent()){
flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost());
flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort());
}
// Commit the session so that we have persisted the data // Commit the session so that we have persisted the data
session.commit(); session.commit();

View File

@ -176,7 +176,7 @@ public class TestStandardRemoteGroupPort {
// Return null when it gets called second time. // Return null when it gets called second time.
doReturn(dataPacket).doReturn(null).when(this.transaction).receive(); doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes)); doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile)); doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session); port.onTrigger(context, session);
@ -244,7 +244,7 @@ public class TestStandardRemoteGroupPort {
// Return null when it's called second time. // Return null when it's called second time.
doReturn(dataPacket).doReturn(null).when(transaction).receive(); doReturn(dataPacket).doReturn(null).when(transaction).receive();
doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes)); doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile)); doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session); port.onTrigger(context, session);

View File

@ -505,7 +505,7 @@ public class TestHttpFlowFileServerProtocol {
}).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class)); }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
// AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
// which returns flowFile instance used later. // which returns flowFile instance used later.
doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> { doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1]; final String transitUri = (String)invocation.getArguments()[1];
@ -567,12 +567,16 @@ public class TestHttpFlowFileServerProtocol {
} }
return flowFile1; return flowFile1;
}).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class)); }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
// AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
// which returns flowFile instance used later. // AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution
// which returns flowFile instance used later, it is called twice for each flow file
doReturn(flowFile1) doReturn(flowFile1)
.doReturn(flowFile1)
.doReturn(flowFile2) .doReturn(flowFile2)
.when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); .doReturn(flowFile2)
.when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> { doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1]; final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[3]; final String detail = (String)invocation.getArguments()[3];