diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java index dacfd64be7..8ecdb52c74 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java @@ -22,27 +22,6 @@ public abstract class AbstractCommunicationsSession implements CommunicationsSes private String userDn; - private volatile String uri; - - public AbstractCommunicationsSession(final String uri) { - this.uri = uri; - } - - @Override - public String toString() { - return uri; - } - - @Override - public void setUri(final String uri) { - this.uri = uri; - } - - @Override - public String getUri() { - return uri; - } - @Override public String getUserDn() { return userDn; @@ -52,4 +31,10 @@ public abstract class AbstractCommunicationsSession implements CommunicationsSes public void setUserDn(final String dn) { this.userDn = dn; } + + @Override + public String createTransitUri(String communicantUrl, String sourceFlowFileIdentifier) { + return communicantUrl + (communicantUrl.endsWith("/") ? "" : "/") + sourceFlowFileIdentifier; + } + } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java index 6245a53ef4..39fac9e16b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java @@ -43,4 +43,10 @@ public interface Communicant { * if the Distinguished Name is unknown */ String getDistinguishedName(); + + /** + * @return When data is transferred via Site-to-Site, provenance events are generated. + * This method returns a transit url used for the provenance event. + */ + String createTransitUri(final String sourceFlowFileIdentifier); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java index 5cb37b0b95..962baec2ee 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java @@ -148,4 +148,9 @@ public class Peer implements Communicant { public String getDistinguishedName() { return commsSession.getUserDn(); } + + @Override + public String createTransitUri(String sourceFlowFileIdentifier) { + return commsSession.createTransitUri(url, sourceFlowFileIdentifier); + } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 4cc794b3eb..3c92acd16f 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -126,7 +126,6 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr final CommunicationsSession commSession = new HttpCommunicationsSession(); final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription()); - commSession.setUri(nodeApiUrl); final String clusterUrl = config.getUrl(); final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 6869e4ba06..f90aed9c2b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -465,7 +465,6 @@ public class EndpointConnectionPool implements PeerStatusProvider { private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { final boolean siteToSiteSecure = siteInfoProvider.isSecure(); - final String destinationUri = "nifi://" + hostname + ":" + port; CommunicationsSession commsSession = null; try { @@ -478,7 +477,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); socketChannel.connect(); - commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); + commsSession = new SSLSocketChannelCommunicationsSession(socketChannel); try { commsSession.setUserDn(socketChannel.getDn()); @@ -490,11 +489,10 @@ public class EndpointConnectionPool implements PeerStatusProvider { socketChannel.socket().connect(new InetSocketAddress(hostname, port), commsTimeout); socketChannel.socket().setSoTimeout(commsTimeout); - commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); + commsSession = new SocketChannelCommunicationsSession(socketChannel); } commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); - commsSession.setUri(destinationUri); } catch (final IOException ioe) { if (commsSession != null) { commsSession.close(); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java index d561833824..868fb36c95 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java @@ -29,9 +29,10 @@ public class HttpCommunicationsSession extends AbstractCommunicationsSession { protected final HttpInput input; protected final HttpOutput output; protected String checksum; + private String dataTransferUrl; public HttpCommunicationsSession(){ - super(null); + super(); this.input = new HttpInput(); this.output = new HttpOutput(); } @@ -93,5 +94,15 @@ public class HttpCommunicationsSession extends AbstractCommunicationsSession { this.checksum = checksum; } + /** + * @param dataTransferUrl Set data transfer url to use as provenance event transit url. + */ + public void setDataTransferUrl(String dataTransferUrl) { + this.dataTransferUrl = dataTransferUrl; + } + @Override + public String createTransitUri(String communicantUrl, String sourceFlowFileIdentifier) { + return dataTransferUrl; + } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java index 6180c3c409..e19cd3d70d 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java @@ -28,8 +28,8 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe private final SocketChannelOutput response; private int timeout = 30000; - public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException { - super(uri); + public SocketChannelCommunicationsSession(final SocketChannel socketChannel) throws IOException { + super(); request = new SocketChannelInput(socketChannel); response = new SocketChannelOutput(socketChannel); channel = socketChannel; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java index 5e5abc7b19..73f5a90803 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java @@ -26,8 +26,8 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication private final SSLSocketChannelInput request; private final SSLSocketChannelOutput response; - public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) { - super(uri); + public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel) { + super(); request = new SSLSocketChannelInput(channel); response = new SSLSocketChannelOutput(channel); this.channel = channel; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java index aff73ba03f..4df12ae483 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java @@ -31,10 +31,6 @@ public interface CommunicationsSession extends Closeable { int getTimeout() throws IOException; - void setUri(String uri); - - String getUri(); - String getUserDn(); void setUserDn(String dn); @@ -59,4 +55,11 @@ public interface CommunicationsSession extends Closeable { * otherwise */ boolean isClosed(); + + /** + * @param communicantUrl Communicant's url that this session is assigned to. + * @param sourceFlowFileIdentifier Source Flow-file's uuid. + * @return A transit uri to be used in a provenance event. + */ + String createTransitUri(final String communicantUrl, final String sourceFlowFileIdentifier); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java index d4085ca4af..693b3c7724 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java @@ -48,9 +48,9 @@ public class HttpClientTransaction extends AbstractTransaction { this.transactionUrl = transactionUrl; this.apiClient = apiUtil; if(TransferDirection.RECEIVE.equals(direction)){ - dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, peer.getCommunicationsSession()); + dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, peer); } else { - apiUtil.openConnectionForSend(transactionUrl, peer.getCommunicationsSession()); + apiUtil.openConnectionForSend(transactionUrl, peer); } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index e81dc5b8b0..8a379b7f0e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -54,6 +54,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpCoreContext; import org.apache.http.util.EntityUtils; +import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; import org.apache.nifi.remote.exception.PortNotRunningException; @@ -374,9 +375,12 @@ public class SiteToSiteRestApiClient implements Closeable { } - public boolean openConnectionForReceive(final String transactionUrl, final CommunicationsSession commSession) throws IOException { + public boolean openConnectionForReceive(final String transactionUrl, final Peer peer) throws IOException { final HttpGet get = createGet(transactionUrl + "/flow-files"); + // Set uri so that it'll be used as transit uri. + ((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(get.getURI().toString()); + get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); setHandshakeProperties(get); @@ -414,7 +418,7 @@ public class SiteToSiteRestApiClient implements Closeable { return r; } }; - ((HttpInput) commSession.getInput()).setInputStream(streamCapture); + ((HttpInput) peer.getCommunicationsSession().getInput()).setInputStream(streamCapture); startExtendingTtl(transactionUrl, httpIn, response); keepItOpen = true; @@ -436,10 +440,13 @@ public class SiteToSiteRestApiClient implements Closeable { private Future postResult; private CountDownLatch transferDataLatch = new CountDownLatch(1); - public void openConnectionForSend(final String transactionUrl, final CommunicationsSession commSession) throws IOException { + public void openConnectionForSend(final String transactionUrl, final Peer peer) throws IOException { + final CommunicationsSession commSession = peer.getCommunicationsSession(); final String flowFilesPath = transactionUrl + "/flow-files"; final HttpPost post = createPost(flowFilesPath); + // Set uri so that it'll be used as transit uri. + ((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(post.getURI().toString()); post.setHeader("Content-Type", "application/octet-stream"); post.setHeader("Accept", "text/plain"); diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java index 7f3ee5c476..b9ab25c6ab 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java @@ -101,7 +101,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class)); + doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class)); ByteArrayInputStream serverResponse = new ByteArrayInputStream(new byte[0]); ByteArrayOutputStream clientRequest = new ByteArrayOutputStream(); @@ -117,7 +117,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class)); + doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class)); TransactionResultEntity resultEntity = new TransactionResultEntity(); resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode()); doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("3680976076")); @@ -139,7 +139,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class)); + doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class)); TransactionResultEntity resultEntity = new TransactionResultEntity(); resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode()); doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("2969091230")); @@ -162,7 +162,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class)); + doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class)); // The checksum is correct, but here we simulate as if it's wrong, BAD_CHECKSUM. TransactionResultEntity resultEntity = new TransactionResultEntity(); resultEntity.setResponseCode(ResponseCode.BAD_CHECKSUM.getCode()); @@ -186,7 +186,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class)); + doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(Peer.class)); ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream(); ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray()); @@ -203,7 +203,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class)); + doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(Peer.class)); // Emulate that server returns correct checksum. doAnswer(new Answer() { @Override @@ -237,7 +237,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(CommunicationsSession.class)); + doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(Peer.class)); // Emulate that server returns correct checksum. doAnswer(new Answer() { @Override @@ -272,7 +272,7 @@ public class TestHttpClientTransaction { public void testSendWithInvalidChecksum() throws IOException { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class)); + doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(Peer.class)); // Emulate that server returns incorrect checksum. doAnswer(new Answer() { @Override @@ -313,7 +313,7 @@ public class TestHttpClientTransaction { SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId"; - doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(CommunicationsSession.class)); + doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(Peer.class)); // Emulate that server returns correct checksum. doAnswer(new Answer() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index 814d0e6fdf..bd9d2049b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -154,12 +154,12 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { sslSocketChannel.connect(); LOG.trace("Channel connected"); - commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri); + commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel); dn = sslSocketChannel.getDn(); commsSession.setUserDn(dn); } else { LOG.trace("{} Channel is not secure", this); - commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri); + commsSession = new SocketChannelCommunicationsSession(socketChannel); dn = null; } } catch (final Exception e) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 3f59b50990..8f115f7b19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -76,6 +76,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private final TransferDirection transferDirection; private final AtomicReference clientRef = new AtomicReference<>(); + SiteToSiteClient getSiteToSiteClient() { + return clientRef.get(); + } public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) { @@ -118,7 +121,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { public void shutdown() { super.shutdown(); - final SiteToSiteClient client = clientRef.get(); + final SiteToSiteClient client = getSiteToSiteClient(); if (client != null) { try { client.close(); @@ -175,7 +178,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { firstFlowFile = null; } - final SiteToSiteClient client = clientRef.get(); + final SiteToSiteClient client = getSiteToSiteClient(); final Transaction transaction; try { transaction = client.createTransaction(transferDirection); @@ -275,7 +278,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { bytesSent += flowFile.getSize(); logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl()); - final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()); + final String transitUri = transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key())); session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false); session.remove(flowFile); @@ -331,13 +334,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); flowFile = session.importFrom(dataPacket.getData(), flowFile); final long receiveNanos = System.nanoTime() - start; + flowFilesReceived.add(flowFile); String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); if (sourceFlowFileIdentifier == null) { sourceFlowFileIdentifier = ""; } - final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier; + final String transitUri = transaction.getCommunicant().createTransitUri(sourceFlowFileIdentifier); session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index 43428e0834..8e7d2c5e5f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -271,8 +271,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { flowFilesSent.add(flowFile); bytesSent += flowFile.getSize(); - String transitUriPrefix = handshakenProperties.getTransitUriPrefix(); - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + final String transitUri = createTransitUri(peer, flowFile.getAttribute(CoreAttributes.UUID.key())); session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); session.remove(flowFile); @@ -319,6 +318,10 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { } + protected String createTransitUri(Peer peer, String sourceFlowFileIdentifier) { + return peer.createTransitUri(sourceFlowFileIdentifier); + } + protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException { ProcessSession session = transaction.getSession(); Set flowFilesSent = transaction.getFlowFilesSent(); @@ -446,8 +449,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - String transitUriPrefix = handshakenProperties.getTransitUriPrefix(); - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid; + final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid); session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis); session.transfer(flowFile, Relationship.ANONYMOUS); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java index c4f1f5c851..ebbee17c44 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java @@ -48,7 +48,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr private final VersionNegotiator versionNegotiator; private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); - public StandardHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) { + public StandardHttpFlowFileServerProtocol(final VersionNegotiator versionNegotiator) { super(); this.versionNegotiator = versionNegotiator; } @@ -222,4 +222,5 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr public String getResourceName() { return RESOURCE_NAME; } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index fe7d16319e..6e4b860af7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -217,4 +217,10 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol public VersionNegotiator getVersionNegotiator() { return versionNegotiator; } + + @Override + protected String createTransitUri(Peer peer, String sourceFlowFileIdentifier) { + String transitUriPrefix = handshakenProperties.getTransitUriPrefix(); + return (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java new file mode 100644 index 0000000000..4209c93ba3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.io.http.HttpCommunicationsSession; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.InputStream; +import java.net.URI; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public class TestStandardRemoteGroupPort { + + private static final String ID = "remote-group-port-id"; + private static final String NAME = "remote-group-port-name"; + + private RemoteProcessGroup remoteGroup; + private ProcessScheduler scheduler; + private SiteToSiteClient siteToSiteClient; + private Transaction transaction; + private EventReporter eventReporter; + private ProcessGroup processGroup; + public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; + private StandardRemoteGroupPort port; + private ProcessContext context; + private ProcessSession session; + private ProvenanceReporter provenanceReporter; + + @BeforeClass + public static void setup() throws Exception { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + } + + private void setupMock(final SiteToSiteTransportProtocol protocol, + final TransferDirection direction) throws Exception { + setupMock(protocol, direction, mock(Transaction.class)); + } + + private void setupMock(final SiteToSiteTransportProtocol protocol, + final TransferDirection direction, + final Transaction transaction) throws Exception { + processGroup = null; + remoteGroup = mock(RemoteProcessGroup.class); + scheduler = null; + siteToSiteClient = mock(SiteToSiteClient.class); + this.transaction = transaction; + + eventReporter = mock(EventReporter.class); + + final ConnectableType connectableType; + switch (direction) { + case SEND: + connectableType = ConnectableType.REMOTE_INPUT_PORT; + break; + case RECEIVE: + connectableType = ConnectableType.OUTPUT_PORT; + break; + default: + connectableType = null; + break; + } + port = spy(new StandardRemoteGroupPort(ID, NAME, + processGroup, remoteGroup, direction, connectableType, null, scheduler)); + + doReturn(true).when(remoteGroup).isTransmitting(); + doReturn(protocol).when(remoteGroup).getTransportProtocol(); + doReturn(new URI(REMOTE_CLUSTER_URL)).when(remoteGroup).getTargetUri(); + doReturn(siteToSiteClient).when(port).getSiteToSiteClient(); + doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction)); + doReturn(eventReporter).when(remoteGroup).getEventReporter(); + + context = null; + session = mock(ProcessSession.class); + provenanceReporter = mock(ProvenanceReporter.class); + doReturn(provenanceReporter).when(session).getProvenanceReporter(); + + } + + @Test + public void testSendRaw() throws Exception { + + setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.SEND); + + final String peerUrl = "nifi://node1.example.com:9090"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false); + try (final SocketChannel socketChannel = SocketChannel.open()) { + final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + doReturn(peer).when(transaction).getCommunicant(); + + final QueueSize queueSize = new QueueSize(1, 10); + final FlowFile flowFile = mock(FlowFile.class); + + doReturn(queueSize).when(session).getQueueSize(); + // Return null when it gets called second time. + doReturn(flowFile).doReturn(null).when(session).get(); + + final String flowFileUuid = "flowfile-uuid"; + doReturn(flowFileUuid).when(flowFile).getAttribute(eq(CoreAttributes.UUID.key())); + + port.onTrigger(context, session); + + // Transit uri can be customized if necessary. + verify(provenanceReporter).send(eq(flowFile), eq(peerUrl + "/" + flowFileUuid), any(String.class), + any(Long.class), eq(false)); + } + } + + @Test + public void testReceiveRaw() throws Exception { + + setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.RECEIVE); + + final String peerUrl = "nifi://node1.example.com:9090"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false); + try (final SocketChannel socketChannel = SocketChannel.open()) { + final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + doReturn(peer).when(transaction).getCommunicant(); + + final FlowFile flowFile = mock(FlowFile.class); + final String sourceFlowFileUuid = "flowfile-uuid"; + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid); + final byte[] dataPacketContents = "DataPacket Contents".getBytes(); + final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents); + final DataPacket dataPacket = new StandardDataPacket(attributes, + dataPacketInputStream, dataPacketContents.length); + + doReturn(flowFile).when(session).create(); + // Return null when it gets called second time. + doReturn(dataPacket).doReturn(null).when(this.transaction).receive(); + + doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes)); + doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile)); + + port.onTrigger(context, session); + + // Transit uri can be customized if necessary. + verify(provenanceReporter).receive(eq(flowFile), eq(peerUrl + "/" + sourceFlowFileUuid), any(String.class), + any(String.class), any(Long.class)); + } + + } + + @Test + public void testSendHttp() throws Exception { + + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND); + + final String peerUrl = "http://node1.example.com:8080/nifi"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); + final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; + + doReturn(peer).when(transaction).getCommunicant(); + commsSession.setDataTransferUrl(flowFileEndpointUri); + + final QueueSize queueSize = new QueueSize(1, 10); + final FlowFile flowFile = mock(FlowFile.class); + + doReturn(queueSize).when(session).getQueueSize(); + // Return null when it's called second time. + doReturn(flowFile).doReturn(null).when(session).get(); + + port.onTrigger(context, session); + + // peerUrl should be used as the transit url. + verify(provenanceReporter).send(eq(flowFile), eq(flowFileEndpointUri), any(String.class), + any(Long.class), eq(false)); + + } + + @Test + public void testReceiveHttp() throws Exception { + + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE); + + final String peerUrl = "http://node1.example.com:8080/nifi"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); + final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; + + doReturn(peer).when(transaction).getCommunicant(); + commsSession.setDataTransferUrl(flowFileEndpointUri); + + final FlowFile flowFile = mock(FlowFile.class); + final Map attributes = new HashMap<>(); + final byte[] dataPacketContents = "DataPacket Contents".getBytes(); + final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents); + final DataPacket dataPacket = new StandardDataPacket(attributes, + dataPacketInputStream, dataPacketContents.length); + + doReturn(flowFile).when(session).create(); + // Return null when it's called second time. + doReturn(dataPacket).doReturn(null).when(transaction).receive(); + + doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes)); + doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile)); + + port.onTrigger(context, session); + + // peerUrl should be used as the transit url. + verify(provenanceReporter).receive(eq(flowFile), eq(flowFileEndpointUri), any(String.class), + any(String.class), any(Long.class)); + + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java index 4519ddd254..7c9d30b61c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java @@ -297,8 +297,11 @@ public class TestHttpFlowFileServerProtocol { final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); final Peer peer = getDefaultPeer(transactionId); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/" + + transactionId + "/flow-files"; commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1"); commsSession.setUserDn("unit-test"); + commsSession.setDataTransferUrl(endpointUri); serverProtocol.handshake(peer); @@ -312,9 +315,9 @@ public class TestHttpFlowFileServerProtocol { doReturn(flowFile).when(processSession).get(); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - final String peerUrl = (String)invocation.getArguments()[1]; + final String transitUri = (String)invocation.getArguments()[1]; final String detail = (String)invocation.getArguments()[2]; - assertEquals("http://peer-host:8080/", peerUrl); + assertEquals(endpointUri, transitUri); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); @@ -336,13 +339,16 @@ public class TestHttpFlowFileServerProtocol { @Test public void testTransferTwoFiles() throws Exception { final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); - final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final String transactionId = "testTransferTwoFiles"; final Peer peer = getDefaultPeer(transactionId); + final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/" + + transactionId + "/flow-files"; + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2"); commsSession.setUserDn("unit-test"); + commsSession.setDataTransferUrl(endpointUri); serverProtocol.handshake(peer); @@ -360,18 +366,18 @@ public class TestHttpFlowFileServerProtocol { doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - final String peerUrl = (String)invocation.getArguments()[1]; + final String transitUri = (String)invocation.getArguments()[1]; final String detail = (String)invocation.getArguments()[2]; - assertEquals("http://peer-host:8080/", peerUrl); + assertEquals(endpointUri, transitUri); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter).send(eq(flowFile1), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - final String peerUrl = (String)invocation.getArguments()[1]; + final String transitUri = (String)invocation.getArguments()[1]; final String detail = (String)invocation.getArguments()[2]; - assertEquals("http://peer-host:8080/", peerUrl); + assertEquals(endpointUri, transitUri); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); @@ -465,9 +471,12 @@ public class TestHttpFlowFileServerProtocol { private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException { final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); + final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/" + + transactionId + "/flow-files"; final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1"); commsSession.setUserDn("unit-test"); + commsSession.setDataTransferUrl(endpointUri); serverProtocol.handshake(peer); @@ -499,9 +508,9 @@ public class TestHttpFlowFileServerProtocol { doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - final String peerUrl = (String)invocation.getArguments()[1]; + final String transitUri = (String)invocation.getArguments()[1]; final String detail = (String)invocation.getArguments()[3]; - assertEquals("http://peer-host:8080/", peerUrl); + assertEquals(endpointUri, transitUri); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter) @@ -522,13 +531,16 @@ public class TestHttpFlowFileServerProtocol { @Test public void testReceiveTwoFiles() throws Exception { final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); - final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final String transactionId = "testReceiveTwoFile"; + final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/" + + transactionId + "/flow-files"; + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); final Peer peer = getDefaultPeer(transactionId); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2"); commsSession.setUserDn("unit-test"); + commsSession.setDataTransferUrl(endpointUri); serverProtocol.handshake(peer); @@ -562,9 +574,9 @@ public class TestHttpFlowFileServerProtocol { .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - final String peerUrl = (String)invocation.getArguments()[1]; + final String transitUri = (String)invocation.getArguments()[1]; final String detail = (String)invocation.getArguments()[3]; - assertEquals("http://peer-host:8080/", peerUrl); + assertEquals(endpointUri, transitUri); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java index bd61841547..3034e1efd8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java @@ -45,6 +45,7 @@ import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.NotAuthorizedException; import org.apache.nifi.remote.exception.RequestExpiredException; +import org.apache.nifi.remote.io.http.HttpCommunicationsSession; import org.apache.nifi.remote.io.http.HttpOutput; import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; import org.apache.nifi.remote.protocol.HandshakeProperty; @@ -212,7 +213,7 @@ public class DataTransferResource extends ApplicationResource { try { // Execute handshake. - initiateServerProtocol(peer, transportProtocolVersion); + initiateServerProtocol(req, peer, transportProtocolVersion); TransactionResultEntity entity = new TransactionResultEntity(); entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode()); @@ -280,7 +281,7 @@ public class DataTransferResource extends ApplicationResource { final int transportProtocolVersion = validationResult.transportProtocolVersion; try { - HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion); int numOfFlowFiles = serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol); logger.debug("finished receiving flow files, numOfFlowFiles={}", numOfFlowFiles); if (numOfFlowFiles < 1) { @@ -304,10 +305,15 @@ public class DataTransferResource extends ApplicationResource { return responseCreator.acceptedResponse(transactionManager, serverChecksum, transportProtocolVersion); } - private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, Integer transportProtocolVersion) throws IOException { + private HttpFlowFileServerProtocol initiateServerProtocol(final HttpServletRequest req, final Peer peer, + final Integer transportProtocolVersion) throws IOException { // Switch transaction protocol version based on transport protocol version. TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion = new TransportProtocolVersionNegotiator(transportProtocolVersion); VersionNegotiator versionNegotiator = new StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion()); + + final String dataTransferUrl = req.getRequestURL().toString(); + ((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(dataTransferUrl); + HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator); HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol); // TODO: How should I pass cluster information? @@ -316,11 +322,12 @@ public class DataTransferResource extends ApplicationResource { return serverProtocol; } - HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) { + HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(final VersionNegotiator versionNegotiator) { return new StandardHttpFlowFileServerProtocol(versionNegotiator); } - private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) { + private Peer constructPeer(final HttpServletRequest req, final InputStream inputStream, + final OutputStream outputStream, final String portId, final String transactionId) { final String clientHostName = req.getRemoteHost(); final int clientPort = req.getRemotePort(); @@ -357,7 +364,7 @@ public class DataTransferResource extends ApplicationResource { commSession.putHandshakeParam(BATCH_DURATION, batchDuration); } - if(peerDescription.isSecure()){ + if (peerDescription.isSecure()) { final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser(); logger.debug("initiating peer, nifiUser={}", nifiUser); commSession.setUserDn(nifiUser.getIdentity()); @@ -366,6 +373,7 @@ public class DataTransferResource extends ApplicationResource { // TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl. final String peerUrl = "nifi://" + clientHostName + ":" + clientPort; final String clusterUrl = "nifi://localhost:" + req.getLocalPort(); + return new Peer(peerDescription, commSession, peerUrl, clusterUrl); } @@ -434,7 +442,7 @@ public class DataTransferResource extends ApplicationResource { final TransactionResultEntity entity = new TransactionResultEntity(); try { - HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion); String inputErrMessage = null; if (responseCode == null) { @@ -540,7 +548,7 @@ public class DataTransferResource extends ApplicationResource { final TransactionResultEntity entity = new TransactionResultEntity(); try { - HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion); HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); // Pass the response code sent from the client. String inputErrMessage = null; @@ -653,7 +661,7 @@ public class DataTransferResource extends ApplicationResource { final Peer peer = constructPeer(req, inputStream, tempBos, portId, transactionId); final int transportProtocolVersion = validationResult.transportProtocolVersion; try { - final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion); StreamingOutput flowFileContent = new StreamingOutput() { @Override @@ -792,7 +800,7 @@ public class DataTransferResource extends ApplicationResource { try { // Do handshake - initiateServerProtocol(peer, transportProtocolVersion); + initiateServerProtocol(req, peer, transportProtocolVersion); transactionManager.extendTransaction(transactionId); final TransactionResultEntity entity = new TransactionResultEntity(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java index d16c6269cc..0411bec0c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java @@ -18,6 +18,8 @@ package org.apache.nifi.web.api; import static org.apache.commons.lang3.StringUtils.isEmpty; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -214,10 +216,22 @@ public class SiteToSiteResource extends ApplicationResource { } else { // Standalone mode. final PeerDTO peer = new PeerDTO(); - // req.getLocalName returns private IP address, that can't be accessed from client in some environments. + + // Private IP address or hostname may not be accessible from client in some environments. // So, use the value defined in nifi.properties instead when it is defined. final String remoteInputHost = properties.getRemoteInputHost(); - peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : remoteInputHost); + String localName; + try { + // Get local host name using InetAddress if available, same as RAW socket does. + localName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to get local host name using InetAddress.", e); + } + localName = req.getLocalName(); + } + + peer.setHostname(isEmpty(remoteInputHost) ? localName : remoteInputHost); peer.setPort(properties.getRemoteInputHttpPort()); peer.setSecure(properties.isSiteToSiteSecure()); peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java index c23cc7d64e..422dbc3fc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java @@ -64,6 +64,9 @@ public class TestDataTransferResource { private HttpServletRequest createCommonHttpServletRequest() { final HttpServletRequest req = mock(HttpServletRequest.class); doReturn("1").when(req).getHeader(eq(HttpHeaders.PROTOCOL_VERSION)); + doReturn(new StringBuffer("http://nifi.example.com:8080") + .append("/nifi-api/data-transfer/output-ports/port-id/transactions/tx-id/flow-files")) + .when(req).getRequestURL(); return req; }