diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java index 69ae396c8d..638704fd13 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -27,6 +27,7 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.VersionedRemoteResource; import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; @@ -134,20 +135,14 @@ public interface ServerProtocol extends VersionedRemoteResource { * * @param peer peer * @param clusterNodeInfo the cluster information - * @param remoteInputHost the remote input host - * @param remoteInputPort the remote input port - * @param remoteInputHttpPort the remote input http port - * @param isSiteToSiteSecure whether site to site is secure + * @param self the node which received the request * * @throws java.io.IOException ioe */ void sendPeerList( Peer peer, Optional clusterNodeInfo, - String remoteInputHost, - Integer remoteInputPort, - Integer remoteInputHttpPort, - boolean isSiteToSiteSecure) throws IOException; + NodeInformation self) throws IOException; void shutdown(Peer peer); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index 5222bbc6a6..a367e9eee2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -18,7 +18,11 @@ package org.apache.nifi.remote; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.NotAuthorizedException; +import org.apache.nifi.remote.exception.RequestExpiredException; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; @@ -257,33 +261,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { } } - LOG.debug("Request type from {} is {}", protocol, requestType); - switch (requestType) { - case NEGOTIATE_FLOWFILE_CODEC: - protocol.negotiateCodec(peer); - break; - case RECEIVE_FLOWFILES: - // peer wants to receive FlowFiles, so we will transfer FlowFiles. - protocol.getPort().transferFlowFiles(peer, protocol); - break; - case SEND_FLOWFILES: - // Peer wants to send FlowFiles, so we will receive. - protocol.getPort().receiveFlowFiles(peer, protocol); - break; - case REQUEST_PEER_LIST: - final Optional nodeInfo = (nodeInformant == null) ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation()); - protocol.sendPeerList( - peer, - nodeInfo, - nifiProperties.getRemoteInputHost(), - nifiProperties.getRemoteInputPort(), - nifiProperties.getRemoteInputHttpPort(), - nifiProperties.isSiteToSiteSecure()); - break; - case SHUTDOWN: - protocol.shutdown(peer); - break; - } + handleRequest(protocol, peer, requestType); } LOG.debug("Finished communicating with {} ({})", peer, protocol); } catch (final Exception e) { @@ -333,6 +311,44 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { listenerThread.start(); } + private void handleRequest(final ServerProtocol protocol, final Peer peer, final RequestType requestType) + throws IOException, NotAuthorizedException, BadRequestException, RequestExpiredException { + LOG.debug("Request type from {} is {}", protocol, requestType); + switch (requestType) { + case NEGOTIATE_FLOWFILE_CODEC: + protocol.negotiateCodec(peer); + break; + case RECEIVE_FLOWFILES: + // peer wants to receive FlowFiles, so we will transfer FlowFiles. + protocol.getPort().transferFlowFiles(peer, protocol); + break; + case SEND_FLOWFILES: + // Peer wants to send FlowFiles, so we will receive. + protocol.getPort().receiveFlowFiles(peer, protocol); + break; + case REQUEST_PEER_LIST: + final Optional nodeInfo = (nodeInformant == null) ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation()); + + String remoteInputHostVal = nifiProperties.getRemoteInputHost(); + if (remoteInputHostVal == null) { + remoteInputHostVal = InetAddress.getLocalHost().getHostName(); + } + final Boolean isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure(); + final Integer apiPort = isSiteToSiteSecure ? nifiProperties.getSslPort() : nifiProperties.getPort(); + final NodeInformation self = new NodeInformation(remoteInputHostVal, + nifiProperties.getRemoteInputPort(), + nifiProperties.getRemoteInputHttpPort(), + apiPort != null ? apiPort : 0, // Avoid potential NullPointerException. + isSiteToSiteSecure, 0); // TotalFlowFiles doesn't matter if it's a standalone NiFi. + + protocol.sendPeerList(peer, nodeInfo, self); + break; + case SHUTDOWN: + protocol.shutdown(peer); + break; + } + } + private int getPort() { return socketPort; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java index 57bebda678..f36cfaa7c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java @@ -21,6 +21,7 @@ import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; @@ -228,8 +229,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr } @Override - public void sendPeerList(Peer peer, Optional clusterNodeInfo, String remoteInputHost, Integer remoteInputPort, Integer remoteInputHttpPort, - boolean isSiteToSiteSecure) throws IOException { + public void sendPeerList(Peer peer, Optional clusterNodeInfo, final NodeInformation self) throws IOException { } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 6c22ac79ae..a7c021244d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -19,7 +19,6 @@ package org.apache.nifi.remote.protocol.socket; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -155,10 +154,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol public void sendPeerList( final Peer peer, final Optional clusterNodeInfo, - final String remoteInputHost, - final Integer remoteInputPort, - final Integer remoteInputHttpPort, - final boolean isSiteToSiteSecure) throws IOException { + final NodeInformation self) throws IOException { if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } @@ -170,18 +166,12 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String remoteInputHostVal = remoteInputHost; - if (remoteInputHostVal == null) { - remoteInputHostVal = InetAddress.getLocalHost().getHostName(); - } logger.debug("{} Advertising Remote Input host name {}", this, peer); List nodeInfos; if (clusterNodeInfo.isPresent()) { nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation()); } else { - final NodeInformation self = new NodeInformation(remoteInputHostVal, remoteInputPort, remoteInputHttpPort, remoteInputHttpPort, - isSiteToSiteSecure, 0); nodeInfos = Collections.singletonList(self); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java new file mode 100644 index 0000000000..36cd550776 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.protocol.ServerProtocol; +import org.apache.nifi.util.NiFiProperties; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestSocketRemoteSiteListener { + + @BeforeClass + public static void setup() { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + } + + @Test + public void testRequestPeerList() throws Exception { + Method method = SocketRemoteSiteListener.class.getDeclaredMethod("handleRequest", + ServerProtocol.class, Peer.class, RequestType.class); + method.setAccessible(true); + + final NiFiProperties nifiProperties = spy(NiFiProperties.class); + final int apiPort = 8080; + final int remoteSocketPort = 8081; + final String remoteInputHost = "node1.example.com"; + when(nifiProperties.getPort()).thenReturn(apiPort); + when(nifiProperties.getRemoteInputHost()).thenReturn(remoteInputHost); + when(nifiProperties.getRemoteInputPort()).thenReturn(remoteSocketPort); + when(nifiProperties.getRemoteInputHttpPort()).thenReturn(null); // Even if HTTP transport is disabled, RAW should work. + when(nifiProperties.isSiteToSiteHttpEnabled()).thenReturn(false); + when(nifiProperties.isSiteToSiteSecure()).thenReturn(false); + final SocketRemoteSiteListener listener = new SocketRemoteSiteListener(remoteSocketPort, null, nifiProperties); + + final ServerProtocol serverProtocol = mock(ServerProtocol.class); + doAnswer(invocation -> { + final NodeInformation self = invocation.getArgumentAt(2, NodeInformation.class); + // Listener should inform about itself properly: + assertEquals(remoteInputHost, self.getSiteToSiteHostname()); + assertEquals(remoteSocketPort, self.getSiteToSitePort().intValue()); + assertNull(self.getSiteToSiteHttpApiPort()); + assertEquals(apiPort, self.getAPIPort()); + return null; + }).when(serverProtocol).sendPeerList(any(Peer.class), any(Optional.class), any(NodeInformation.class)); + + final Peer peer = null; + method.invoke(listener, serverProtocol, peer, RequestType.REQUEST_PEER_LIST); + + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java new file mode 100644 index 0000000000..77c39bc7ba --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.SocketChannelInput; +import org.apache.nifi.remote.io.socket.SocketChannelOutput; +import org.apache.nifi.remote.protocol.HandshakeProperties; +import org.apache.nifi.remote.protocol.HandshakeProperty; +import org.apache.nifi.remote.protocol.Response; +import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.util.NiFiProperties; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestSocketFlowFileServerProtocol { + + @BeforeClass + public static void setup() throws Exception { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + } + + private Peer getDefaultPeer(final HandshakeProperties handshakeProperties, final OutputStream outputStream) throws IOException { + final PeerDescription description = new PeerDescription("peer-host", 8080, false); + + final byte[] inputBytes; + try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + final DataOutputStream dos = new DataOutputStream(bos)) { + + dos.writeUTF(handshakeProperties.getCommsIdentifier()); + dos.writeUTF(handshakeProperties.getTransitUriPrefix()); + dos.writeInt(1); // num of properties + dos.writeUTF(HandshakeProperty.GZIP.name()); + dos.writeUTF(String.valueOf(handshakeProperties.isUseGzip())); + dos.flush(); + + inputBytes = bos.toByteArray(); + } + + final InputStream inputStream = new ByteArrayInputStream(inputBytes); + + final SocketChannelCommunicationsSession commsSession = mock(SocketChannelCommunicationsSession.class); + final SocketChannelInput channelInput = mock(SocketChannelInput.class); + final SocketChannelOutput channelOutput = mock(SocketChannelOutput.class); + when(commsSession.getInput()).thenReturn(channelInput); + when(commsSession.getOutput()).thenReturn(channelOutput); + + when(channelInput.getInputStream()).thenReturn(inputStream); + when(channelOutput.getOutputStream()).thenReturn(outputStream); + + final String peerUrl = "http://peer-host:8080/"; + final String clusterUrl = "cluster-url"; + return new Peer(description, commsSession, peerUrl, clusterUrl); + } + + private SocketFlowFileServerProtocol getDefaultSocketFlowFileServerProtocol() { + final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); + final SocketFlowFileServerProtocol protocol = spy(new SocketFlowFileServerProtocol()); + return protocol; + } + + @Test + public void testSendPeerListStandalone() throws Exception { + final SocketFlowFileServerProtocol protocol = getDefaultSocketFlowFileServerProtocol(); + final Optional clusterNodeInfo = Optional.empty(); + final String siteToSiteHostname = "node1.example.com"; + final Integer siteToSitePort = 8081; + final Integer siteToSiteHttpPort = null; + final int apiPort = 8080; + final boolean isSiteToSiteSecure = true; + final int numOfQueuedFlowFiles = 100; + final NodeInformation self = new NodeInformation(siteToSiteHostname, siteToSitePort, siteToSiteHttpPort, + apiPort, isSiteToSiteSecure, numOfQueuedFlowFiles); + + final HandshakeProperties handshakeProperties = new HandshakeProperties(); + handshakeProperties.setCommsIdentifier("communication-identifier"); + handshakeProperties.setTransitUriPrefix("uri-prefix"); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final Peer peer = getDefaultPeer(handshakeProperties, outputStream); + + protocol.handshake(peer); + protocol.sendPeerList(peer, clusterNodeInfo, self); + + try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(outputStream.toByteArray()))) { + final Response handshakeResponse = Response.read(dis); + assertEquals(ResponseCode.PROPERTIES_OK, handshakeResponse.getCode()); + + final int numPeers = dis.readInt(); + assertEquals(1, numPeers); + + assertEquals(siteToSiteHostname, dis.readUTF()); + assertEquals(siteToSitePort.intValue(), dis.readInt()); + assertEquals(isSiteToSiteSecure, dis.readBoolean()); + assertEquals(numOfQueuedFlowFiles, dis.readInt()); + + } + } + + @Test + public void testSendPeerListCluster() throws Exception { + final SocketFlowFileServerProtocol protocol = getDefaultSocketFlowFileServerProtocol(); + final List nodeInfoList = new ArrayList<>(); + final ClusterNodeInformation clusterNodeInformation = new ClusterNodeInformation(); + clusterNodeInformation.setNodeInformation(nodeInfoList); + final Optional clusterNodeInfo = Optional.of(clusterNodeInformation); + + for (int i = 0; i < 3; i++) { + final String siteToSiteHostname = String.format("node%d.example.com", i); + final Integer siteToSitePort = 8081; + final Integer siteToSiteHttpPort = null; + final int apiPort = 8080; + final boolean isSiteToSiteSecure = true; + final int numOfQueuedFlowFiles = 100 + i; + final NodeInformation nodeInformation = new NodeInformation(siteToSiteHostname, siteToSitePort, siteToSiteHttpPort, + apiPort, isSiteToSiteSecure, numOfQueuedFlowFiles); + nodeInfoList.add(nodeInformation); + } + + final NodeInformation self = nodeInfoList.get(0); + + final HandshakeProperties handshakeProperties = new HandshakeProperties(); + handshakeProperties.setCommsIdentifier("communication-identifier"); + handshakeProperties.setTransitUriPrefix("uri-prefix"); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final Peer peer = getDefaultPeer(handshakeProperties, outputStream); + + protocol.handshake(peer); + protocol.sendPeerList(peer, clusterNodeInfo, self); + + try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(outputStream.toByteArray()))) { + final Response handshakeResponse = Response.read(dis); + assertEquals(ResponseCode.PROPERTIES_OK, handshakeResponse.getCode()); + + final int numPeers = dis.readInt(); + assertEquals(nodeInfoList.size(), numPeers); + + for (int i = 0; i < nodeInfoList.size(); i++) { + final NodeInformation node = nodeInfoList.get(i); + assertEquals(node.getSiteToSiteHostname(), dis.readUTF()); + assertEquals(node.getSiteToSitePort().intValue(), dis.readInt()); + assertEquals(node.isSiteToSiteSecure(), dis.readBoolean()); + assertEquals(node.getTotalFlowFiles(), dis.readInt()); + } + } + } + +}