diff --git a/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerTool.groovy b/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerTool.groovy index b39c93c7cf..e78adec4b3 100644 --- a/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerTool.groovy +++ b/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerTool.groovy @@ -53,8 +53,10 @@ public class NodeManagerTool extends AbstractAdminTool { private static final String REMOVE = "remove" private static final String DISCONNECT = "disconnect" private static final String CONNECT = "connect" + private static final String NODE_STATUS = "status" private static final String OPERATION = "operation" private final static String NODE_ENDPOINT = "/nifi-api/controller/cluster/nodes" + private final static String NIFI_ENDPOINT = "/nifi" private final static String SUPPORTED_MINIMUM_VERSION = "1.0.0" static enum STATUS {DISCONNECTING,CONNECTING,CONNECTED} @@ -80,7 +82,7 @@ public class NodeManagerTool extends AbstractAdminTool { options.addOption(Option.builder("p").longOpt(PROXY_DN).hasArg().desc("User or Proxy DN that has permission to send a notification. User must have view and modify privileges to 'access the controller' in NiFi").build()) options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build()) options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build()) - options.addOption(Option.builder("o").longOpt(OPERATION).hasArg().desc("Operation to connect, disconnect or remove node from cluster").build()) + options.addOption(Option.builder("o").longOpt(OPERATION).hasArg().desc("Operations supported: status, connect (cluster), disconnect(cluster), remove (cluster)").build()) options.addOption(Option.builder("u").longOpt(CLUSTER_URLS).hasArg().desc("List of active urls for the cluster").build()) options } @@ -135,6 +137,34 @@ public class NodeManagerTool extends AbstractAdminTool { } } + void getStatus(final Client client,NiFiProperties niFiProperties,List activeUrls){ + if(activeUrls == null || activeUrls.empty) { + final String nodeUrl = NiFiClientUtil.getUrl(niFiProperties, null) + activeUrls = [nodeUrl] + } + + for(String activeUrl: activeUrls) { + final String url = activeUrl + NIFI_ENDPOINT + final WebResource webResource = client.resource(url) + + if (isVerbose) { + logger.info("Checking if node is available") + } + + try { + final ClientResponse response = webResource.get(ClientResponse.class) + if (response.status == 200) { + System.out.println("NiFi Node is running and available at "+url) + } else { + System.out.println("Attempt to contact NiFi Node at "+url+" returned Response Code: " + response.status + " with reason: " + response.getEntity(String.class)) + } + } catch (Exception ex) { + System.out.println("Attempt to contact NiFi Node "+url+" did not complete due to exception: " + ex.localizedMessage) + } + } + + } + void disconnectNode(final Client client, NiFiProperties niFiProperties, List activeUrls, final String proxyDN){ final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls,proxyDN) NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties) @@ -232,50 +262,60 @@ public class NodeManagerTool extends AbstractAdminTool { String nifiPropertiesFileName = nifiConfDir + File.separator +"nifi.properties" final String key = NiFiPropertiesLoader.extractKeyFromBootstrapFile(bootstrapConfFileName) final NiFiProperties niFiProperties = NiFiPropertiesLoader.withKey(key).load(nifiPropertiesFileName) + final String operation = commandLine.getOptionValue(OPERATION) - if(!StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT)) && StringUtils.isEmpty(proxyDN)) { + if(!StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT)) && StringUtils.isEmpty(proxyDN) && !operation.equalsIgnoreCase(NODE_STATUS)) { throw new UnsupportedOperationException("Proxy DN is required for sending a notification to this node or cluster") } final String nifiInstallDir = commandLine.getOptionValue(NIFI_INSTALL_DIR) - if(supportedNiFiMinimumVersion(nifiConfDir,nifiLibDir,SUPPORTED_MINIMUM_VERSION) && NiFiClientUtil.isCluster(niFiProperties)){ + if(supportedNiFiMinimumVersion(nifiConfDir,nifiLibDir,SUPPORTED_MINIMUM_VERSION)){ final Client client = clientFactory.getClient(niFiProperties,nifiInstallDir) - final String operation = commandLine.getOptionValue(OPERATION) if(isVerbose){ logger.info("Starting {} request",operation) } - List activeUrls - - if(commandLine.hasOption(CLUSTER_URLS)){ + List activeUrls = null + if (commandLine.hasOption(CLUSTER_URLS)) { final String urlList = commandLine.getOptionValue(CLUSTER_URLS) activeUrls = urlList.tokenize(',') + } + + if(operation.equalsIgnoreCase(NODE_STATUS)){ + getStatus(client,niFiProperties,activeUrls) }else{ - activeUrls = NiFiClientUtil.getActiveClusterUrls(client,niFiProperties,proxyDN) + + if(NiFiClientUtil.isCluster(niFiProperties)) { + + if (activeUrls == null) { + activeUrls = NiFiClientUtil.getActiveClusterUrls(client, niFiProperties, proxyDN) + } + + if (isVerbose) { + logger.info("Using active urls {} for communication.", activeUrls) + } + + if (operation.toLowerCase().equals(REMOVE)) { + removeNode(client, niFiProperties, activeUrls, proxyDN) + } else if (operation.toLowerCase().equals(DISCONNECT)) { + disconnectNode(client, niFiProperties, activeUrls, proxyDN) + } else if (operation.toLowerCase().equals(CONNECT)) { + connectNode(client, niFiProperties, activeUrls, proxyDN) + } else { + throw new ParseException("Invalid operation provided: " + operation) + } + }else{ + throw new UnsupportedOperationException("The provided operation ("+operation+") is only supported with instances of NiFi running within a cluster.") + } + } - if(isVerbose){ - logger.info("Using active urls {} for communication.",activeUrls) - } - - if(operation.toLowerCase().equals(REMOVE)){ - removeNode(client,niFiProperties,activeUrls,proxyDN) - } - else if(operation.toLowerCase().equals(DISCONNECT)){ - disconnectNode(client,niFiProperties,activeUrls,proxyDN) - } - else if(operation.toLowerCase().equals(CONNECT)){ - connectNode(client,niFiProperties,activeUrls,proxyDN) - } - else{ - throw new ParseException("Invalid operation provided: " + operation) - } }else{ - throw new UnsupportedOperationException("Node Manager Tool only supports clustered instance of NiFi running versions 1.0.0 or higher.") + throw new UnsupportedOperationException("Node Manager Tool only supports instances of NiFi running versions 1.0.0 or higher.") } }else if(!commandLine.hasOption(BOOTSTRAP_CONF)){ diff --git a/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerToolSpec.groovy b/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerToolSpec.groovy index 5fd7d3d4b1..d5f3705c1e 100644 --- a/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerToolSpec.groovy +++ b/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/nodemanager/NodeManagerToolSpec.groovy @@ -141,6 +141,7 @@ class NodeManagerToolSpec extends Specification{ e.message == "Invalid operation provided: fake" } + def "get node info successfully"(){ given: @@ -164,6 +165,7 @@ class NodeManagerToolSpec extends Specification{ } + def "delete node successfully"(){ given: @@ -312,6 +314,69 @@ class NodeManagerToolSpec extends Specification{ } + def "get node status successfully"(){ + + given: + def NiFiProperties niFiProperties = Mock NiFiProperties + def Client client = Mock Client + def WebResource resource = Mock WebResource + def ClientResponse response = Mock ClientResponse + def config = new NodeManagerTool() + + when: + config.getStatus(client,niFiProperties,null) + + then: + niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8080" + niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost" + 1 * client.resource(_ as String) >> resource + 1 * resource.get(ClientResponse.class) >> response + 1 * response.getStatus() >> 200 + } + + def "get node status when unavailable"(){ + + given: + def NiFiProperties niFiProperties = Mock NiFiProperties + def Client client = Mock Client + def WebResource resource = Mock WebResource + def ClientResponse response = Mock ClientResponse + def config = new NodeManagerTool() + + when: + config.getStatus(client,niFiProperties,null) + + then: + niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8080" + niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost" + 1 * client.resource(_ as String) >> resource + 1 * resource.get(ClientResponse.class) >> response + 2 * response.getStatus() >> 403 + 1 * response.getEntity(String.class) >> "Unauthorized User" + } + + def "get multiple node status successfully"(){ + + given: + def NiFiProperties niFiProperties = Mock NiFiProperties + def Client client = Mock Client + def WebResource resource = Mock WebResource + def ClientResponse response = Mock ClientResponse + def config = new NodeManagerTool() + def activeUrls = ["https://localhost:8080","https://localhost1:8080"] + + when: + config.getStatus(client,niFiProperties,activeUrls) + + then: + niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8080" + niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost" + 2 * client.resource(_ as String) >> resource + 2 * resource.get(ClientResponse.class) >> response + 2 * response.getStatus() >> 200 + } + + def "disconnect node successfully"(){ setup: