NIFI-2585: Add attributes to track s2s host and port

- Removed host and port field from Peer since the same information is
  available in PeerDescription
- Refactored variable names in SocketRemoteSiteListener to improve readability
- Changed how SocketRemoteSiteListener constructs PeerDescription
  instance. It used to use hard-coded 'localhost' as hostname, and
  getPort() which returns server's port. Since the peer is a remote peer,
  i.e the client, it should be client hostname and port.
- Added hostname resolution at DataTransferResource to make s2s.host
  value consistent with RAW transport. Without this, RAW uses hostname
  while HTTP uses IP address. It will be hard to be used from downstream flows.
- Replaced heavy use of mockito which was difficult to maintain, with
  nifi-mock
- Added SiteToSiteAttributes and more assertions in unit tests

This closes #1342.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2016-12-20 11:19:22 +09:00 committed by Bryan Bende
parent f7d761a28a
commit 908e7d3131
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
6 changed files with 305 additions and 269 deletions

View File

@ -29,8 +29,6 @@ public class Peer implements Communicant {
private final CommunicationsSession commsSession; private final CommunicationsSession commsSession;
private final String url; private final String url;
private final String clusterUrl; private final String clusterUrl;
private final String host;
private final int port;
private final Map<String, Long> penaltyExpirationMap = new HashMap<>(); private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false; private boolean closed = false;
@ -42,9 +40,8 @@ public class Peer implements Communicant {
this.clusterUrl = clusterUrl; this.clusterUrl = clusterUrl;
try { try {
final URI uri = new URI(peerUrl); // Parse peerUrl to validate it.
this.port = uri.getPort(); new URI(peerUrl);
this.host = uri.getHost();
} catch (final Exception e) { } catch (final Exception e) {
throw new IllegalArgumentException("Invalid URL: " + peerUrl); throw new IllegalArgumentException("Invalid URL: " + peerUrl);
} }
@ -104,7 +101,7 @@ public class Peer implements Communicant {
@Override @Override
public String getHost() { public String getHost() {
return host; return description.getHostname();
} }
@Override @Override
@ -141,7 +138,7 @@ public class Peer implements Communicant {
@Override @Override
public int getPort() { public int getPort() {
return port; return description.getPort();
} }
@Override @Override

View File

@ -71,8 +71,8 @@
<artifactId>httpclient</artifactId> <artifactId>httpclient</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>mockito-core</artifactId> <artifactId>nifi-mock</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -137,16 +137,16 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
public void run() { public void run() {
LOG.debug("{} Determining URL of connection", this); LOG.debug("{} Determining URL of connection", this);
final InetAddress inetAddress = socket.getInetAddress(); final InetAddress inetAddress = socket.getInetAddress();
String hostname = inetAddress.getHostName(); String clientHostName = inetAddress.getHostName();
final int slashIndex = hostname.indexOf("/"); final int slashIndex = clientHostName.indexOf("/");
if (slashIndex == 0) { if (slashIndex == 0) {
hostname = hostname.substring(1); clientHostName = clientHostName.substring(1);
} else if (slashIndex > 0) { } else if (slashIndex > 0) {
hostname = hostname.substring(0, slashIndex); clientHostName = clientHostName.substring(0, slashIndex);
} }
final int port = socket.getPort(); final int clientPort = socket.getPort();
final String peerUri = "nifi://" + hostname + ":" + port; final String peerUri = "nifi://" + clientHostName + ":" + clientPort;
LOG.debug("{} Connection URL is {}", this, peerUri); LOG.debug("{} Connection URL is {}", this, peerUri);
final CommunicationsSession commsSession; final CommunicationsSession commsSession;
@ -211,7 +211,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
protocol.setRootProcessGroup(rootGroup.get()); protocol.setRootProcessGroup(rootGroup.get());
protocol.setNodeInformant(nodeInformant); protocol.setNodeInformant(nodeInformant);
final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null); final PeerDescription description = new PeerDescription(clientHostName, clientPort, sslContext != null);
peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort()); peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
LOG.debug("Handshaking...."); LOG.debug("Handshaking....");
protocol.handshake(peer); protocol.handshake(peer);

View File

@ -18,15 +18,15 @@ package org.apache.nifi.remote;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession; import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
@ -34,22 +34,30 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.InputStream; import java.io.ByteArrayInputStream;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import static org.mockito.Matchers.any; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when;
public class TestStandardRemoteGroupPort { public class TestStandardRemoteGroupPort {
@ -64,9 +72,9 @@ public class TestStandardRemoteGroupPort {
private ProcessGroup processGroup; private ProcessGroup processGroup;
public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi";
private StandardRemoteGroupPort port; private StandardRemoteGroupPort port;
private ProcessContext context; private SharedSessionState sessionState;
private ProcessSession session; private MockProcessSession processSession;
private ProvenanceReporter provenanceReporter; private MockProcessContext processContext;
@BeforeClass @BeforeClass
public static void setup() throws Exception { public static void setup() throws Exception {
@ -112,41 +120,49 @@ public class TestStandardRemoteGroupPort {
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction)); doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
doReturn(eventReporter).when(remoteGroup).getEventReporter(); doReturn(eventReporter).when(remoteGroup).getEventReporter();
context = null; }
session = mock(ProcessSession.class);
provenanceReporter = mock(ProvenanceReporter.class);
doReturn(provenanceReporter).when(session).getProvenanceReporter();
private void setupMockProcessSession() {
// Construct a RemoteGroupPort as a processor to use NiFi mock library.
final Processor remoteGroupPort = mock(Processor.class);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(Relationship.ANONYMOUS);
when(remoteGroupPort.getRelationships()).thenReturn(relationships);
when(remoteGroupPort.getIdentifier()).thenReturn("remote-group-port-id");
sessionState = new SharedSessionState(remoteGroupPort, new AtomicLong(0));
processSession = new MockProcessSession(sessionState, remoteGroupPort);
processContext = new MockProcessContext(remoteGroupPort);
} }
@Test @Test
public void testSendRaw() throws Exception { public void testSendRaw() throws Exception {
setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.SEND); setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.SEND);
setupMockProcessSession();
final String peerUrl = "nifi://node1.example.com:9090"; final String peerUrl = "nifi://node1.example.com:9090";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false); final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, true);
try (final SocketChannel socketChannel = SocketChannel.open()) { try (final SocketChannel socketChannel = SocketChannel.open()) {
final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel); final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel);
commsSession.setUserDn("nifi.node1.example.com");
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
doReturn(peer).when(transaction).getCommunicant(); doReturn(peer).when(transaction).getCommunicant();
final QueueSize queueSize = new QueueSize(1, 10); final MockFlowFile flowFile = processSession.createFlowFile("0123456789".getBytes());
final FlowFile flowFile = mock(FlowFile.class); sessionState.getFlowFileQueue().offer(flowFile);
doReturn(queueSize).when(session).getQueueSize(); port.onTrigger(processContext, processSession);
// Return null when it gets called second time.
doReturn(flowFile).doReturn(null).when(session).get();
final String flowFileUuid = "flowfile-uuid"; // Assert provenance.
doReturn(flowFileUuid).when(flowFile).getAttribute(eq(CoreAttributes.UUID.key())); final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(peerUrl + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()), provenanceEvent.getTransitUri());
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
port.onTrigger(context, session);
// Transit uri can be customized if necessary.
verify(provenanceReporter).send(eq(flowFile), eq(peerUrl + "/" + flowFileUuid), any(String.class),
any(Long.class), eq(false));
} }
} }
@ -154,16 +170,17 @@ public class TestStandardRemoteGroupPort {
public void testReceiveRaw() throws Exception { public void testReceiveRaw() throws Exception {
setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.RECEIVE); setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.RECEIVE);
setupMockProcessSession();
final String peerUrl = "nifi://node1.example.com:9090"; final String peerUrl = "nifi://node1.example.com:9090";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false); final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, true);
try (final SocketChannel socketChannel = SocketChannel.open()) { try (final SocketChannel socketChannel = SocketChannel.open()) {
final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel); final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel);
commsSession.setUserDn("nifi.node1.example.com");
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
doReturn(peer).when(transaction).getCommunicant(); doReturn(peer).when(transaction).getCommunicant();
final FlowFile flowFile = mock(FlowFile.class);
final String sourceFlowFileUuid = "flowfile-uuid"; final String sourceFlowFileUuid = "flowfile-uuid";
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid); attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
@ -172,18 +189,26 @@ public class TestStandardRemoteGroupPort {
final DataPacket dataPacket = new StandardDataPacket(attributes, final DataPacket dataPacket = new StandardDataPacket(attributes,
dataPacketInputStream, dataPacketContents.length); dataPacketInputStream, dataPacketContents.length);
doReturn(flowFile).when(session).create();
// Return null when it gets called second time. // Return null when it gets called second time.
doReturn(dataPacket).doReturn(null).when(this.transaction).receive(); doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class)); port.onTrigger(processContext, processSession);
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session); // Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
assertEquals(peerUrl + "/" + sourceFlowFileUuid, provenanceEvent.getTransitUri());
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
// Transit uri can be customized if necessary. // Assert received flow files.
verify(provenanceReporter).receive(eq(flowFile), eq(peerUrl + "/" + sourceFlowFileUuid), any(String.class), processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
any(String.class), any(Long.class)); final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
} }
} }
@ -192,66 +217,76 @@ public class TestStandardRemoteGroupPort {
public void testSendHttp() throws Exception { public void testSendHttp() throws Exception {
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND); setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND);
setupMockProcessSession();
final String peerUrl = "http://node1.example.com:8080/nifi"; final String peerUrl = "https://node1.example.com:8080/nifi";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, true);
final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
commsSession.setUserDn("nifi.node1.example.com");
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; final String flowFileEndpointUri = "https://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
doReturn(peer).when(transaction).getCommunicant(); doReturn(peer).when(transaction).getCommunicant();
commsSession.setDataTransferUrl(flowFileEndpointUri); commsSession.setDataTransferUrl(flowFileEndpointUri);
final QueueSize queueSize = new QueueSize(1, 10); final MockFlowFile flowFile = processSession.createFlowFile("0123456789".getBytes());
final FlowFile flowFile = mock(FlowFile.class); sessionState.getFlowFileQueue().offer(flowFile);
doReturn(queueSize).when(session).getQueueSize(); port.onTrigger(processContext, processSession);
// Return null when it's called second time.
doReturn(flowFile).doReturn(null).when(session).get();
port.onTrigger(context, session);
// peerUrl should be used as the transit url.
verify(provenanceReporter).send(eq(flowFile), eq(flowFileEndpointUri), any(String.class),
any(Long.class), eq(false));
// Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
} }
@Test @Test
public void testReceiveHttp() throws Exception { public void testReceiveHttp() throws Exception {
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE); setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE);
setupMockProcessSession();
final String peerUrl = "http://node1.example.com:8080/nifi"; final String peerUrl = "https://node1.example.com:8080/nifi";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, true);
final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
commsSession.setUserDn("nifi.node1.example.com");
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; final String flowFileEndpointUri = "https://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
doReturn(peer).when(transaction).getCommunicant(); doReturn(peer).when(transaction).getCommunicant();
commsSession.setDataTransferUrl(flowFileEndpointUri); commsSession.setDataTransferUrl(flowFileEndpointUri);
final FlowFile flowFile = mock(FlowFile.class);
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
final byte[] dataPacketContents = "DataPacket Contents".getBytes(); final byte[] dataPacketContents = "DataPacket Contents".getBytes();
final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents); final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents);
final DataPacket dataPacket = new StandardDataPacket(attributes, final DataPacket dataPacket = new StandardDataPacket(attributes,
dataPacketInputStream, dataPacketContents.length); dataPacketInputStream, dataPacketContents.length);
doReturn(flowFile).when(session).create(); // Return null when it gets called second time.
// Return null when it's called second time.
doReturn(dataPacket).doReturn(null).when(transaction).receive(); doReturn(dataPacket).doReturn(null).when(transaction).receive();
doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class)); port.onTrigger(processContext, processSession);
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session); // Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
// peerUrl should be used as the transit url. // Assert received flow files.
verify(provenanceReporter).receive(eq(flowFile), eq(flowFileEndpointUri), any(String.class), processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
any(String.class), any(Long.class)); final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
} }
} }

View File

@ -18,13 +18,15 @@ package org.apache.nifi.remote.protocol.http;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerDescription;
@ -40,32 +42,46 @@ import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.HandshakeProperty; import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.SharedSessionState;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestHttpFlowFileServerProtocol { public class TestHttpFlowFileServerProtocol {
private SharedSessionState sessionState;
private MockProcessSession processSession;
private MockProcessContext processContext;
@BeforeClass @BeforeClass
public static void setup() throws Exception { public static void setup() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
@ -270,11 +286,37 @@ public class TestHttpFlowFileServerProtocol {
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final String transactionId = "testTransferOneFile"; final String transactionId = "testTransferOneFile";
final Peer peer = transferOneFile(serverProtocol, transactionId); final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
final String endpointUri = "https://remote-host:8443/nifi-api/output-ports/port-id/transactions/"
+ transactionId + "/flow-files";
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
transferFlowFiles(serverProtocol, transactionId, peer, processSession -> {
final MockFlowFile flowFile = processSession.createFlowFile("Server content".getBytes());
final HashMap<String, String> attributes = new HashMap<>();
attributes.put("uuid", "server-uuid");
attributes.put("filename", "server-filename");
attributes.put("server-attr-1", "server-attr-1-value");
attributes.put("server-attr-2", "server-attr-2-value");
flowFile.putAttributes(attributes);
return Arrays.asList(flowFile);
});
// Commit transaction // Commit transaction
final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "2077607535"); final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "3229577812");
assertEquals(1, flowFileSent); assertEquals(1, flowFileSent);
// Assert provenance
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(endpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
} }
@Test @Test
@ -282,7 +324,25 @@ public class TestHttpFlowFileServerProtocol {
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final String transactionId = "testTransferOneFileBadChecksum"; final String transactionId = "testTransferOneFileBadChecksum";
final Peer peer = transferOneFile(serverProtocol, transactionId); final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
final String endpointUri = "https://remote-host:8443/nifi-api/output-ports/port-id/transactions/"
+ transactionId + "/flow-files";
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
transferFlowFiles(serverProtocol, transactionId, peer, processSession -> {
final MockFlowFile flowFile = processSession.createFlowFile("Server content".getBytes());
final HashMap<String, String> attributes = new HashMap<>();
attributes.put("uuid", "server-uuid");
attributes.put("filename", "server-filename");
attributes.put("server-attr-1", "server-attr-1-value");
attributes.put("server-attr-2", "server-attr-2-value");
flowFile.putAttributes(attributes);
return Arrays.asList(flowFile);
});
// Commit transaction // Commit transaction
try { try {
@ -293,44 +353,27 @@ public class TestHttpFlowFileServerProtocol {
} }
} }
private Peer transferOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId) throws IOException { private Peer transferFlowFiles(final HttpFlowFileServerProtocol serverProtocol, final String transactionId,
final Peer peer, final Function<MockProcessSession,
Collection<MockFlowFile>> flowFileGenerator) throws IOException {
setupMockProcessSession();
// Enqueue flow files to be transferred.
final Collection<MockFlowFile> flowFiles = flowFileGenerator.apply(processSession);
for (final MockFlowFile flowFile : flowFiles) {
sessionState.getFlowFileQueue().offer(flowFile);
}
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null)); final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/"
+ transactionId + "/flow-files";
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer); serverProtocol.handshake(peer);
assertTrue(serverProtocol.isHandshakeSuccessful()); assertTrue(serverProtocol.isHandshakeSuccessful());
final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
final ProcessContext context = mock(ProcessContext.class);
final ProcessSession processSession = mock(ProcessSession.class);
final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
final FlowFile flowFile = mock(FlowFile.class);
doReturn(flowFile).when(processSession).get();
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[2];
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
doAnswer(invocation -> {
final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
return null;
}).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
// Execute test using mock // Execute test using mock
final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); final int flowFileSent = serverProtocol.transferFlowFiles(peer, processContext, processSession, negotiatedCoded);
assertEquals(1, flowFileSent); assertEquals(flowFiles.size(), flowFileSent);
assertTrue(remoteSiteListener.isTransactionActive(transactionId)); assertTrue(remoteSiteListener.isTransactionActive(transactionId));
return peer; return peer;
@ -338,11 +381,9 @@ public class TestHttpFlowFileServerProtocol {
@Test @Test
public void testTransferTwoFiles() throws Exception { public void testTransferTwoFiles() throws Exception {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
final String transactionId = "testTransferTwoFiles"; final String transactionId = "testTransferTwoFiles";
final Peer peer = getDefaultPeer(transactionId); final Peer peer = getDefaultPeer(transactionId);
final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/" final String endpointUri = "https://remote-host:8443/nifi-api/output-ports/port-id/transactions/"
+ transactionId + "/flow-files"; + transactionId + "/flow-files";
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
@ -350,53 +391,32 @@ public class TestHttpFlowFileServerProtocol {
commsSession.setUserDn("unit-test"); commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri); commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer); transferFlowFiles(serverProtocol, transactionId, peer, processSession ->
IntStream.of(1, 2).mapToObj(i -> {
assertTrue(serverProtocol.isHandshakeSuccessful()); final MockFlowFile flowFile = processSession.createFlowFile(("Server content " + i).getBytes());
final HashMap<String, String> attributes = new HashMap<>();
final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); attributes.put("uuid", "server-uuid-" + i);
final ProcessContext context = mock(ProcessContext.class); attributes.put("filename", "server-filename-" + i);
final ProcessSession processSession = mock(ProcessSession.class); attributes.put("server-attr-" + i + "-1", "server-attr-" + i + "-1-value");
final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class); attributes.put("server-attr-" + i + "-2", "server-attr-" + i + "-2-value");
final FlowFile flowFile1 = mock(FlowFile.class); flowFile.putAttributes(attributes);
final FlowFile flowFile2 = mock(FlowFile.class); return flowFile;
doReturn(flowFile1) }).collect(Collectors.toList())
.doReturn(flowFile2) );
.when(processSession).get();
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[2];
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile1), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[2];
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
doAnswer(invocation -> {
final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
return null;
}).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
// Execute test using mock
int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
assertEquals(2, flowFileSent);
assertTrue(remoteSiteListener.isTransactionActive(transactionId));
// Commit transaction // Commit transaction
flowFileSent = serverProtocol.commitTransferTransaction(peer, "2747386400"); final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "3058746557");
assertEquals(2, flowFileSent); assertEquals(2, flowFileSent);
// Assert provenance
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
for (final ProvenanceEventRecord provenanceEvent : provenanceEvents) {
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(endpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
}
} }
private DataPacket createClientDataPacket() { private DataPacket createClientDataPacket() {
@ -404,6 +424,7 @@ public class TestHttpFlowFileServerProtocol {
final byte[] bytes = contents.getBytes(); final byte[] bytes = contents.getBytes();
final InputStream in = new ByteArrayInputStream(bytes); final InputStream in = new ByteArrayInputStream(bytes);
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.UUID.key(), "client-flow-file-uuid");
attributes.put("client-attr-1", "client-attr-1-value"); attributes.put("client-attr-1", "client-attr-1-value");
attributes.put("client-attr-2", "client-attr-2-value"); attributes.put("client-attr-2", "client-attr-2-value");
return new StandardDataPacket(attributes, in, bytes.length); return new StandardDataPacket(attributes, in, bytes.length);
@ -440,14 +461,42 @@ public class TestHttpFlowFileServerProtocol {
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final String transactionId = "testReceiveOneFile"; final String transactionId = "testReceiveOneFile";
final String endpointUri = "https://remote-host:8443/nifi-api/input-ports/port-id/transactions/"
+ transactionId + "/flow-files";
final Peer peer = getDefaultPeer(transactionId); final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
receiveOneFile(serverProtocol, transactionId, peer); commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
final DataPacket dataPacket = createClientDataPacket();
receiveFlowFiles(serverProtocol, transactionId, peer, dataPacket);
// Commit transaction // Commit transaction
commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION); commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION);
final int flowFileReceived = serverProtocol.commitReceiveTransaction(peer); final int flowFileReceived = serverProtocol.commitReceiveTransaction(peer);
assertEquals(1, flowFileReceived); assertEquals(1, flowFileReceived);
// Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
assertEquals(endpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
// Assert received flow files.
processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
flowFile.assertAttributeEquals("client-attr-1", "client-attr-1-value");
flowFile.assertAttributeEquals("client-attr-2", "client-attr-2-value");
} }
@Test @Test
@ -457,7 +506,7 @@ public class TestHttpFlowFileServerProtocol {
final String transactionId = "testReceiveOneFileBadChecksum"; final String transactionId = "testReceiveOneFileBadChecksum";
final Peer peer = getDefaultPeer(transactionId); final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
receiveOneFile(serverProtocol, transactionId, peer); receiveFlowFiles(serverProtocol, transactionId, peer, createClientDataPacket());
// Commit transaction // Commit transaction
commsSession.setResponseCode(ResponseCode.BAD_CHECKSUM); commsSession.setResponseCode(ResponseCode.BAD_CHECKSUM);
@ -469,71 +518,48 @@ public class TestHttpFlowFileServerProtocol {
} }
} }
private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException { private void receiveFlowFiles(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer, final DataPacket ... dataPackets) throws IOException {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null)); final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/"
+ transactionId + "/flow-files";
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer); serverProtocol.handshake(peer);
assertTrue(serverProtocol.isHandshakeSuccessful()); assertTrue(serverProtocol.isHandshakeSuccessful());
setupMockProcessSession();
// Emulate dataPackets sent from a Site-to-Site client.
final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
final ProcessContext context = mock(ProcessContext.class);
final ProcessSession processSession = mock(ProcessSession.class);
final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
final FlowFile flowFile = mock(FlowFile.class);
final DataPacket dataPacket = createClientDataPacket();
final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream(); final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
for (final DataPacket dataPacket : dataPackets) {
negotiatedCoded.encode(dataPacket, testDataOs); negotiatedCoded.encode(dataPacket, testDataOs);
}
final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray()); final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray());
((HttpInput)commsSession.getInput()).setInputStream(httpInputStream); ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
doAnswer(invocation -> {
final InputStream is = (InputStream) invocation.getArguments()[0];
for (int b; (b = is.read()) >= 0;) {
// consume stream.
}
return flowFile;
}).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
// AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
// which returns flowFile instance used later.
doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[3];
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter)
.receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
final Set<Relationship> relations = new HashSet<>();
final Relationship relationship = new Relationship.Builder().build();
relations.add(relationship);
doReturn(relations).when(context).getAvailableRelationships();
// Execute test using mock // Execute test using mock
final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, processContext, processSession, negotiatedCoded);
assertEquals(1, flowFileReceived); assertEquals(dataPackets.length, flowFileReceived);
assertTrue(remoteSiteListener.isTransactionActive(transactionId)); assertTrue(remoteSiteListener.isTransactionActive(transactionId));
} }
private void setupMockProcessSession() {
// Construct a RootGroupPort as a processor to use NiFi mock library.
final Processor rootGroupPort = mock(Processor.class);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(Relationship.ANONYMOUS);
when(rootGroupPort.getRelationships()).thenReturn(relationships);
when(rootGroupPort.getIdentifier()).thenReturn("root-group-port-id");
sessionState = new SharedSessionState(rootGroupPort, new AtomicLong(0));
processSession = new MockProcessSession(sessionState, rootGroupPort);
processContext = new MockProcessContext(rootGroupPort);
}
@Test @Test
public void testReceiveTwoFiles() throws Exception { public void testReceiveTwoFiles() throws Exception {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
final String transactionId = "testReceiveTwoFile"; final String transactionId = "testReceiveTwoFile";
final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/" final String endpointUri = "https://remote-host:8443/nifi-api/input-ports/port-id/transactions/"
+ transactionId + "/flow-files"; + transactionId + "/flow-files";
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final Peer peer = getDefaultPeer(transactionId); final Peer peer = getDefaultPeer(transactionId);
@ -542,63 +568,32 @@ public class TestHttpFlowFileServerProtocol {
commsSession.setUserDn("unit-test"); commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri); commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer); receiveFlowFiles(serverProtocol, transactionId, peer, createClientDataPacket(), createClientDataPacket());
assertTrue(serverProtocol.isHandshakeSuccessful());
final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
final ProcessContext context = mock(ProcessContext.class);
final ProcessSession processSession = mock(ProcessSession.class);
final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
final FlowFile flowFile1 = mock(FlowFile.class);
final FlowFile flowFile2 = mock(FlowFile.class);
final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
negotiatedCoded.encode(createClientDataPacket(), testDataOs);
negotiatedCoded.encode(createClientDataPacket(), testDataOs);
final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray());
((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
doAnswer(invocation -> {
final InputStream is = (InputStream) invocation.getArguments()[0];
for (int b; (b = is.read()) >= 0;) {
// consume stream.
}
return flowFile1;
}).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
// AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution
// which returns flowFile instance used later, it is called twice for each flow file
doReturn(flowFile1)
.doReturn(flowFile1)
.doReturn(flowFile2)
.doReturn(flowFile2)
.when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[3];
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter)
.receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
final Set<Relationship> relations = new HashSet<>();
doReturn(relations).when(context).getAvailableRelationships();
// Execute test using mock
int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
assertEquals(2, flowFileReceived);
assertTrue(remoteSiteListener.isTransactionActive(transactionId));
// Commit transaction // Commit transaction
commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION); commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION);
flowFileReceived = serverProtocol.commitReceiveTransaction(peer); final int flowFileReceived = serverProtocol.commitReceiveTransaction(peer);
assertEquals(2, flowFileReceived); assertEquals(2, flowFileReceived);
// Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
for (final ProvenanceEventRecord provenanceEvent : provenanceEvents) {
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
assertEquals(endpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
}
// Assert received flow files.
processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
assertEquals(2, flowFiles.size());
for (final MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
flowFile.assertAttributeEquals("client-attr-1", "client-attr-1-value");
flowFile.assertAttributeEquals("client-attr-2", "client-attr-2-value");
}
} }

View File

@ -77,6 +77,8 @@ import javax.ws.rs.core.UriInfo;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT; import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
@ -316,7 +318,14 @@ public class DataTransferResource extends ApplicationResource {
private Peer constructPeer(final HttpServletRequest req, final InputStream inputStream, private Peer constructPeer(final HttpServletRequest req, final InputStream inputStream,
final OutputStream outputStream, final String portId, final String transactionId) { final OutputStream outputStream, final String portId, final String transactionId) {
final String clientHostName = req.getRemoteHost(); String clientHostName = req.getRemoteHost();
try {
// req.getRemoteHost returns IP address, try to resolve hostname to be consistent with RAW protocol.
final InetAddress clientAddress = InetAddress.getByName(clientHostName);
clientHostName = clientAddress.getHostName();
} catch (UnknownHostException e) {
logger.info("Failed to resolve client hostname {}, due to {}", clientHostName, e.getMessage());
}
final int clientPort = req.getRemotePort(); final int clientPort = req.getRemotePort();
final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure()); final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());