NIFI-3166 This closes #1324. Fix SocketRemoteSiteListener NPE.

- Refactored ServerProtocol.sendPeerList method signature to clarify the
  meaning of arguments, and avoid null pointer exception when converting null Integer to int.
- Refactored SocketRemoteSiteListener handleRequest method to make it
  more unit test friendly.
- Added more unit tests.
This commit is contained in:
Koji Kawamura 2016-12-13 14:33:16 +09:00 committed by joewitt
parent 0f2ac39f69
commit 0ddb90cd2d
6 changed files with 313 additions and 48 deletions

View File

@ -27,6 +27,7 @@ import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.VersionedRemoteResource; import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
@ -134,20 +135,14 @@ public interface ServerProtocol extends VersionedRemoteResource {
* *
* @param peer peer * @param peer peer
* @param clusterNodeInfo the cluster information * @param clusterNodeInfo the cluster information
* @param remoteInputHost the remote input host * @param self the node which received the request
* @param remoteInputPort the remote input port
* @param remoteInputHttpPort the remote input http port
* @param isSiteToSiteSecure whether site to site is secure
* *
* @throws java.io.IOException ioe * @throws java.io.IOException ioe
*/ */
void sendPeerList( void sendPeerList(
Peer peer, Peer peer,
Optional<ClusterNodeInformation> clusterNodeInfo, Optional<ClusterNodeInformation> clusterNodeInfo,
String remoteInputHost, NodeInformation self) throws IOException;
Integer remoteInputPort,
Integer remoteInputHttpPort,
boolean isSiteToSiteSecure) throws IOException;
void shutdown(Peer peer); void shutdown(Peer peer);

View File

@ -18,7 +18,11 @@ package org.apache.nifi.remote;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
@ -257,33 +261,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
} }
} }
LOG.debug("Request type from {} is {}", protocol, requestType); handleRequest(protocol, peer, requestType);
switch (requestType) {
case NEGOTIATE_FLOWFILE_CODEC:
protocol.negotiateCodec(peer);
break;
case RECEIVE_FLOWFILES:
// peer wants to receive FlowFiles, so we will transfer FlowFiles.
protocol.getPort().transferFlowFiles(peer, protocol);
break;
case SEND_FLOWFILES:
// Peer wants to send FlowFiles, so we will receive.
protocol.getPort().receiveFlowFiles(peer, protocol);
break;
case REQUEST_PEER_LIST:
final Optional<ClusterNodeInformation> nodeInfo = (nodeInformant == null) ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation());
protocol.sendPeerList(
peer,
nodeInfo,
nifiProperties.getRemoteInputHost(),
nifiProperties.getRemoteInputPort(),
nifiProperties.getRemoteInputHttpPort(),
nifiProperties.isSiteToSiteSecure());
break;
case SHUTDOWN:
protocol.shutdown(peer);
break;
}
} }
LOG.debug("Finished communicating with {} ({})", peer, protocol); LOG.debug("Finished communicating with {} ({})", peer, protocol);
} catch (final Exception e) { } catch (final Exception e) {
@ -333,6 +311,44 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
listenerThread.start(); listenerThread.start();
} }
private void handleRequest(final ServerProtocol protocol, final Peer peer, final RequestType requestType)
throws IOException, NotAuthorizedException, BadRequestException, RequestExpiredException {
LOG.debug("Request type from {} is {}", protocol, requestType);
switch (requestType) {
case NEGOTIATE_FLOWFILE_CODEC:
protocol.negotiateCodec(peer);
break;
case RECEIVE_FLOWFILES:
// peer wants to receive FlowFiles, so we will transfer FlowFiles.
protocol.getPort().transferFlowFiles(peer, protocol);
break;
case SEND_FLOWFILES:
// Peer wants to send FlowFiles, so we will receive.
protocol.getPort().receiveFlowFiles(peer, protocol);
break;
case REQUEST_PEER_LIST:
final Optional<ClusterNodeInformation> nodeInfo = (nodeInformant == null) ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation());
String remoteInputHostVal = nifiProperties.getRemoteInputHost();
if (remoteInputHostVal == null) {
remoteInputHostVal = InetAddress.getLocalHost().getHostName();
}
final Boolean isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure();
final Integer apiPort = isSiteToSiteSecure ? nifiProperties.getSslPort() : nifiProperties.getPort();
final NodeInformation self = new NodeInformation(remoteInputHostVal,
nifiProperties.getRemoteInputPort(),
nifiProperties.getRemoteInputHttpPort(),
apiPort != null ? apiPort : 0, // Avoid potential NullPointerException.
isSiteToSiteSecure, 0); // TotalFlowFiles doesn't matter if it's a standalone NiFi.
protocol.sendPeerList(peer, nodeInfo, self);
break;
case SHUTDOWN:
protocol.shutdown(peer);
break;
}
}
private int getPort() { private int getPort() {
return socketPort; return socketPort;
} }

View File

@ -21,6 +21,7 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
@ -228,8 +229,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
} }
@Override @Override
public void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo, String remoteInputHost, Integer remoteInputPort, Integer remoteInputHttpPort, public void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo, final NodeInformation self) throws IOException {
boolean isSiteToSiteSecure) throws IOException {
} }
@Override @Override

View File

@ -19,7 +19,6 @@ package org.apache.nifi.remote.protocol.socket;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -155,10 +154,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
public void sendPeerList( public void sendPeerList(
final Peer peer, final Peer peer,
final Optional<ClusterNodeInformation> clusterNodeInfo, final Optional<ClusterNodeInformation> clusterNodeInfo,
final String remoteInputHost, final NodeInformation self) throws IOException {
final Integer remoteInputPort,
final Integer remoteInputHttpPort,
final boolean isSiteToSiteSecure) throws IOException {
if (!handshakeCompleted) { if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed"); throw new IllegalStateException("Handshake has not been completed");
} }
@ -170,18 +166,12 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
final CommunicationsSession commsSession = peer.getCommunicationsSession(); final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
String remoteInputHostVal = remoteInputHost;
if (remoteInputHostVal == null) {
remoteInputHostVal = InetAddress.getLocalHost().getHostName();
}
logger.debug("{} Advertising Remote Input host name {}", this, peer); logger.debug("{} Advertising Remote Input host name {}", this, peer);
List<NodeInformation> nodeInfos; List<NodeInformation> nodeInfos;
if (clusterNodeInfo.isPresent()) { if (clusterNodeInfo.isPresent()) {
nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation()); nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation());
} else { } else {
final NodeInformation self = new NodeInformation(remoteInputHostVal, remoteInputPort, remoteInputHttpPort, remoteInputHttpPort,
isSiteToSiteSecure, 0);
nodeInfos = Collections.singletonList(self); nodeInfos = Collections.singletonList(self);
} }

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.util.NiFiProperties;
import org.junit.BeforeClass;
import org.junit.Test;
import java.lang.reflect.Method;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestSocketRemoteSiteListener {
@BeforeClass
public static void setup() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
}
@Test
public void testRequestPeerList() throws Exception {
Method method = SocketRemoteSiteListener.class.getDeclaredMethod("handleRequest",
ServerProtocol.class, Peer.class, RequestType.class);
method.setAccessible(true);
final NiFiProperties nifiProperties = spy(NiFiProperties.class);
final int apiPort = 8080;
final int remoteSocketPort = 8081;
final String remoteInputHost = "node1.example.com";
when(nifiProperties.getPort()).thenReturn(apiPort);
when(nifiProperties.getRemoteInputHost()).thenReturn(remoteInputHost);
when(nifiProperties.getRemoteInputPort()).thenReturn(remoteSocketPort);
when(nifiProperties.getRemoteInputHttpPort()).thenReturn(null); // Even if HTTP transport is disabled, RAW should work.
when(nifiProperties.isSiteToSiteHttpEnabled()).thenReturn(false);
when(nifiProperties.isSiteToSiteSecure()).thenReturn(false);
final SocketRemoteSiteListener listener = new SocketRemoteSiteListener(remoteSocketPort, null, nifiProperties);
final ServerProtocol serverProtocol = mock(ServerProtocol.class);
doAnswer(invocation -> {
final NodeInformation self = invocation.getArgumentAt(2, NodeInformation.class);
// Listener should inform about itself properly:
assertEquals(remoteInputHost, self.getSiteToSiteHostname());
assertEquals(remoteSocketPort, self.getSiteToSitePort().intValue());
assertNull(self.getSiteToSiteHttpApiPort());
assertEquals(apiPort, self.getAPIPort());
return null;
}).when(serverProtocol).sendPeerList(any(Peer.class), any(Optional.class), any(NodeInformation.class));
final Peer peer = null;
method.invoke(listener, serverProtocol, peer, RequestType.REQUEST_PEER_LIST);
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.protocol.socket;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.SocketChannelInput;
import org.apache.nifi.remote.io.socket.SocketChannelOutput;
import org.apache.nifi.remote.protocol.HandshakeProperties;
import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.util.NiFiProperties;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestSocketFlowFileServerProtocol {
@BeforeClass
public static void setup() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
}
private Peer getDefaultPeer(final HandshakeProperties handshakeProperties, final OutputStream outputStream) throws IOException {
final PeerDescription description = new PeerDescription("peer-host", 8080, false);
final byte[] inputBytes;
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(bos)) {
dos.writeUTF(handshakeProperties.getCommsIdentifier());
dos.writeUTF(handshakeProperties.getTransitUriPrefix());
dos.writeInt(1); // num of properties
dos.writeUTF(HandshakeProperty.GZIP.name());
dos.writeUTF(String.valueOf(handshakeProperties.isUseGzip()));
dos.flush();
inputBytes = bos.toByteArray();
}
final InputStream inputStream = new ByteArrayInputStream(inputBytes);
final SocketChannelCommunicationsSession commsSession = mock(SocketChannelCommunicationsSession.class);
final SocketChannelInput channelInput = mock(SocketChannelInput.class);
final SocketChannelOutput channelOutput = mock(SocketChannelOutput.class);
when(commsSession.getInput()).thenReturn(channelInput);
when(commsSession.getOutput()).thenReturn(channelOutput);
when(channelInput.getInputStream()).thenReturn(inputStream);
when(channelOutput.getOutputStream()).thenReturn(outputStream);
final String peerUrl = "http://peer-host:8080/";
final String clusterUrl = "cluster-url";
return new Peer(description, commsSession, peerUrl, clusterUrl);
}
private SocketFlowFileServerProtocol getDefaultSocketFlowFileServerProtocol() {
final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
final SocketFlowFileServerProtocol protocol = spy(new SocketFlowFileServerProtocol());
return protocol;
}
@Test
public void testSendPeerListStandalone() throws Exception {
final SocketFlowFileServerProtocol protocol = getDefaultSocketFlowFileServerProtocol();
final Optional<ClusterNodeInformation> clusterNodeInfo = Optional.empty();
final String siteToSiteHostname = "node1.example.com";
final Integer siteToSitePort = 8081;
final Integer siteToSiteHttpPort = null;
final int apiPort = 8080;
final boolean isSiteToSiteSecure = true;
final int numOfQueuedFlowFiles = 100;
final NodeInformation self = new NodeInformation(siteToSiteHostname, siteToSitePort, siteToSiteHttpPort,
apiPort, isSiteToSiteSecure, numOfQueuedFlowFiles);
final HandshakeProperties handshakeProperties = new HandshakeProperties();
handshakeProperties.setCommsIdentifier("communication-identifier");
handshakeProperties.setTransitUriPrefix("uri-prefix");
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Peer peer = getDefaultPeer(handshakeProperties, outputStream);
protocol.handshake(peer);
protocol.sendPeerList(peer, clusterNodeInfo, self);
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(outputStream.toByteArray()))) {
final Response handshakeResponse = Response.read(dis);
assertEquals(ResponseCode.PROPERTIES_OK, handshakeResponse.getCode());
final int numPeers = dis.readInt();
assertEquals(1, numPeers);
assertEquals(siteToSiteHostname, dis.readUTF());
assertEquals(siteToSitePort.intValue(), dis.readInt());
assertEquals(isSiteToSiteSecure, dis.readBoolean());
assertEquals(numOfQueuedFlowFiles, dis.readInt());
}
}
@Test
public void testSendPeerListCluster() throws Exception {
final SocketFlowFileServerProtocol protocol = getDefaultSocketFlowFileServerProtocol();
final List<NodeInformation> nodeInfoList = new ArrayList<>();
final ClusterNodeInformation clusterNodeInformation = new ClusterNodeInformation();
clusterNodeInformation.setNodeInformation(nodeInfoList);
final Optional<ClusterNodeInformation> clusterNodeInfo = Optional.of(clusterNodeInformation);
for (int i = 0; i < 3; i++) {
final String siteToSiteHostname = String.format("node%d.example.com", i);
final Integer siteToSitePort = 8081;
final Integer siteToSiteHttpPort = null;
final int apiPort = 8080;
final boolean isSiteToSiteSecure = true;
final int numOfQueuedFlowFiles = 100 + i;
final NodeInformation nodeInformation = new NodeInformation(siteToSiteHostname, siteToSitePort, siteToSiteHttpPort,
apiPort, isSiteToSiteSecure, numOfQueuedFlowFiles);
nodeInfoList.add(nodeInformation);
}
final NodeInformation self = nodeInfoList.get(0);
final HandshakeProperties handshakeProperties = new HandshakeProperties();
handshakeProperties.setCommsIdentifier("communication-identifier");
handshakeProperties.setTransitUriPrefix("uri-prefix");
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Peer peer = getDefaultPeer(handshakeProperties, outputStream);
protocol.handshake(peer);
protocol.sendPeerList(peer, clusterNodeInfo, self);
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(outputStream.toByteArray()))) {
final Response handshakeResponse = Response.read(dis);
assertEquals(ResponseCode.PROPERTIES_OK, handshakeResponse.getCode());
final int numPeers = dis.readInt();
assertEquals(nodeInfoList.size(), numPeers);
for (int i = 0; i < nodeInfoList.size(); i++) {
final NodeInformation node = nodeInfoList.get(i);
assertEquals(node.getSiteToSiteHostname(), dis.readUTF());
assertEquals(node.getSiteToSitePort().intValue(), dis.readInt());
assertEquals(node.isSiteToSiteSecure(), dis.readBoolean());
assertEquals(node.getTotalFlowFiles(), dis.readInt());
}
}
}
}