mirror of https://github.com/apache/nifi.git
NIFI-3765
- added status operation to NodeManager - updated help text - add multiple url support with the -u flag - This closes #1726
This commit is contained in:
parent
eb12b932ab
commit
fae2e3aa21
|
@ -53,8 +53,10 @@ public class NodeManagerTool extends AbstractAdminTool {
|
||||||
private static final String REMOVE = "remove"
|
private static final String REMOVE = "remove"
|
||||||
private static final String DISCONNECT = "disconnect"
|
private static final String DISCONNECT = "disconnect"
|
||||||
private static final String CONNECT = "connect"
|
private static final String CONNECT = "connect"
|
||||||
|
private static final String NODE_STATUS = "status"
|
||||||
private static final String OPERATION = "operation"
|
private static final String OPERATION = "operation"
|
||||||
private final static String NODE_ENDPOINT = "/nifi-api/controller/cluster/nodes"
|
private final static String NODE_ENDPOINT = "/nifi-api/controller/cluster/nodes"
|
||||||
|
private final static String NIFI_ENDPOINT = "/nifi"
|
||||||
private final static String SUPPORTED_MINIMUM_VERSION = "1.0.0"
|
private final static String SUPPORTED_MINIMUM_VERSION = "1.0.0"
|
||||||
static enum STATUS {DISCONNECTING,CONNECTING,CONNECTED}
|
static enum STATUS {DISCONNECTING,CONNECTING,CONNECTED}
|
||||||
|
|
||||||
|
@ -80,7 +82,7 @@ public class NodeManagerTool extends AbstractAdminTool {
|
||||||
options.addOption(Option.builder("p").longOpt(PROXY_DN).hasArg().desc("User or Proxy DN that has permission to send a notification. User must have view and modify privileges to 'access the controller' in NiFi").build())
|
options.addOption(Option.builder("p").longOpt(PROXY_DN).hasArg().desc("User or Proxy DN that has permission to send a notification. User must have view and modify privileges to 'access the controller' in NiFi").build())
|
||||||
options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build())
|
options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build())
|
||||||
options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build())
|
options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build())
|
||||||
options.addOption(Option.builder("o").longOpt(OPERATION).hasArg().desc("Operation to connect, disconnect or remove node from cluster").build())
|
options.addOption(Option.builder("o").longOpt(OPERATION).hasArg().desc("Operations supported: status, connect (cluster), disconnect(cluster), remove (cluster)").build())
|
||||||
options.addOption(Option.builder("u").longOpt(CLUSTER_URLS).hasArg().desc("List of active urls for the cluster").build())
|
options.addOption(Option.builder("u").longOpt(CLUSTER_URLS).hasArg().desc("List of active urls for the cluster").build())
|
||||||
options
|
options
|
||||||
}
|
}
|
||||||
|
@ -135,6 +137,34 @@ public class NodeManagerTool extends AbstractAdminTool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void getStatus(final Client client,NiFiProperties niFiProperties,List<String> activeUrls){
|
||||||
|
if(activeUrls == null || activeUrls.empty) {
|
||||||
|
final String nodeUrl = NiFiClientUtil.getUrl(niFiProperties, null)
|
||||||
|
activeUrls = [nodeUrl]
|
||||||
|
}
|
||||||
|
|
||||||
|
for(String activeUrl: activeUrls) {
|
||||||
|
final String url = activeUrl + NIFI_ENDPOINT
|
||||||
|
final WebResource webResource = client.resource(url)
|
||||||
|
|
||||||
|
if (isVerbose) {
|
||||||
|
logger.info("Checking if node is available")
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final ClientResponse response = webResource.get(ClientResponse.class)
|
||||||
|
if (response.status == 200) {
|
||||||
|
System.out.println("NiFi Node is running and available at "+url)
|
||||||
|
} else {
|
||||||
|
System.out.println("Attempt to contact NiFi Node at "+url+" returned Response Code: " + response.status + " with reason: " + response.getEntity(String.class))
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
System.out.println("Attempt to contact NiFi Node "+url+" did not complete due to exception: " + ex.localizedMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void disconnectNode(final Client client, NiFiProperties niFiProperties, List<String> activeUrls, final String proxyDN){
|
void disconnectNode(final Client client, NiFiProperties niFiProperties, List<String> activeUrls, final String proxyDN){
|
||||||
final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls,proxyDN)
|
final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls,proxyDN)
|
||||||
NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties)
|
NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties)
|
||||||
|
@ -232,50 +262,60 @@ public class NodeManagerTool extends AbstractAdminTool {
|
||||||
String nifiPropertiesFileName = nifiConfDir + File.separator +"nifi.properties"
|
String nifiPropertiesFileName = nifiConfDir + File.separator +"nifi.properties"
|
||||||
final String key = NiFiPropertiesLoader.extractKeyFromBootstrapFile(bootstrapConfFileName)
|
final String key = NiFiPropertiesLoader.extractKeyFromBootstrapFile(bootstrapConfFileName)
|
||||||
final NiFiProperties niFiProperties = NiFiPropertiesLoader.withKey(key).load(nifiPropertiesFileName)
|
final NiFiProperties niFiProperties = NiFiPropertiesLoader.withKey(key).load(nifiPropertiesFileName)
|
||||||
|
final String operation = commandLine.getOptionValue(OPERATION)
|
||||||
|
|
||||||
if(!StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT)) && StringUtils.isEmpty(proxyDN)) {
|
if(!StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT)) && StringUtils.isEmpty(proxyDN) && !operation.equalsIgnoreCase(NODE_STATUS)) {
|
||||||
throw new UnsupportedOperationException("Proxy DN is required for sending a notification to this node or cluster")
|
throw new UnsupportedOperationException("Proxy DN is required for sending a notification to this node or cluster")
|
||||||
}
|
}
|
||||||
|
|
||||||
final String nifiInstallDir = commandLine.getOptionValue(NIFI_INSTALL_DIR)
|
final String nifiInstallDir = commandLine.getOptionValue(NIFI_INSTALL_DIR)
|
||||||
|
|
||||||
if(supportedNiFiMinimumVersion(nifiConfDir,nifiLibDir,SUPPORTED_MINIMUM_VERSION) && NiFiClientUtil.isCluster(niFiProperties)){
|
if(supportedNiFiMinimumVersion(nifiConfDir,nifiLibDir,SUPPORTED_MINIMUM_VERSION)){
|
||||||
|
|
||||||
final Client client = clientFactory.getClient(niFiProperties,nifiInstallDir)
|
final Client client = clientFactory.getClient(niFiProperties,nifiInstallDir)
|
||||||
final String operation = commandLine.getOptionValue(OPERATION)
|
|
||||||
|
|
||||||
if(isVerbose){
|
if(isVerbose){
|
||||||
logger.info("Starting {} request",operation)
|
logger.info("Starting {} request",operation)
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> activeUrls
|
List<String> activeUrls = null
|
||||||
|
if (commandLine.hasOption(CLUSTER_URLS)) {
|
||||||
if(commandLine.hasOption(CLUSTER_URLS)){
|
|
||||||
final String urlList = commandLine.getOptionValue(CLUSTER_URLS)
|
final String urlList = commandLine.getOptionValue(CLUSTER_URLS)
|
||||||
activeUrls = urlList.tokenize(',')
|
activeUrls = urlList.tokenize(',')
|
||||||
|
}
|
||||||
|
|
||||||
|
if(operation.equalsIgnoreCase(NODE_STATUS)){
|
||||||
|
getStatus(client,niFiProperties,activeUrls)
|
||||||
}else{
|
}else{
|
||||||
activeUrls = NiFiClientUtil.getActiveClusterUrls(client,niFiProperties,proxyDN)
|
|
||||||
|
if(NiFiClientUtil.isCluster(niFiProperties)) {
|
||||||
|
|
||||||
|
if (activeUrls == null) {
|
||||||
|
activeUrls = NiFiClientUtil.getActiveClusterUrls(client, niFiProperties, proxyDN)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isVerbose) {
|
||||||
|
logger.info("Using active urls {} for communication.", activeUrls)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (operation.toLowerCase().equals(REMOVE)) {
|
||||||
|
removeNode(client, niFiProperties, activeUrls, proxyDN)
|
||||||
|
} else if (operation.toLowerCase().equals(DISCONNECT)) {
|
||||||
|
disconnectNode(client, niFiProperties, activeUrls, proxyDN)
|
||||||
|
} else if (operation.toLowerCase().equals(CONNECT)) {
|
||||||
|
connectNode(client, niFiProperties, activeUrls, proxyDN)
|
||||||
|
} else {
|
||||||
|
throw new ParseException("Invalid operation provided: " + operation)
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
throw new UnsupportedOperationException("The provided operation ("+operation+") is only supported with instances of NiFi running within a cluster.")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(isVerbose){
|
|
||||||
logger.info("Using active urls {} for communication.",activeUrls)
|
|
||||||
}
|
|
||||||
|
|
||||||
if(operation.toLowerCase().equals(REMOVE)){
|
|
||||||
removeNode(client,niFiProperties,activeUrls,proxyDN)
|
|
||||||
}
|
|
||||||
else if(operation.toLowerCase().equals(DISCONNECT)){
|
|
||||||
disconnectNode(client,niFiProperties,activeUrls,proxyDN)
|
|
||||||
}
|
|
||||||
else if(operation.toLowerCase().equals(CONNECT)){
|
|
||||||
connectNode(client,niFiProperties,activeUrls,proxyDN)
|
|
||||||
}
|
|
||||||
else{
|
|
||||||
throw new ParseException("Invalid operation provided: " + operation)
|
|
||||||
}
|
|
||||||
|
|
||||||
}else{
|
}else{
|
||||||
throw new UnsupportedOperationException("Node Manager Tool only supports clustered instance of NiFi running versions 1.0.0 or higher.")
|
throw new UnsupportedOperationException("Node Manager Tool only supports instances of NiFi running versions 1.0.0 or higher.")
|
||||||
}
|
}
|
||||||
|
|
||||||
}else if(!commandLine.hasOption(BOOTSTRAP_CONF)){
|
}else if(!commandLine.hasOption(BOOTSTRAP_CONF)){
|
||||||
|
|
|
@ -141,6 +141,7 @@ class NodeManagerToolSpec extends Specification{
|
||||||
e.message == "Invalid operation provided: fake"
|
e.message == "Invalid operation provided: fake"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def "get node info successfully"(){
|
def "get node info successfully"(){
|
||||||
|
|
||||||
given:
|
given:
|
||||||
|
@ -164,6 +165,7 @@ class NodeManagerToolSpec extends Specification{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def "delete node successfully"(){
|
def "delete node successfully"(){
|
||||||
|
|
||||||
given:
|
given:
|
||||||
|
@ -312,6 +314,69 @@ class NodeManagerToolSpec extends Specification{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "get node status successfully"(){
|
||||||
|
|
||||||
|
given:
|
||||||
|
def NiFiProperties niFiProperties = Mock NiFiProperties
|
||||||
|
def Client client = Mock Client
|
||||||
|
def WebResource resource = Mock WebResource
|
||||||
|
def ClientResponse response = Mock ClientResponse
|
||||||
|
def config = new NodeManagerTool()
|
||||||
|
|
||||||
|
when:
|
||||||
|
config.getStatus(client,niFiProperties,null)
|
||||||
|
|
||||||
|
then:
|
||||||
|
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8080"
|
||||||
|
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost"
|
||||||
|
1 * client.resource(_ as String) >> resource
|
||||||
|
1 * resource.get(ClientResponse.class) >> response
|
||||||
|
1 * response.getStatus() >> 200
|
||||||
|
}
|
||||||
|
|
||||||
|
def "get node status when unavailable"(){
|
||||||
|
|
||||||
|
given:
|
||||||
|
def NiFiProperties niFiProperties = Mock NiFiProperties
|
||||||
|
def Client client = Mock Client
|
||||||
|
def WebResource resource = Mock WebResource
|
||||||
|
def ClientResponse response = Mock ClientResponse
|
||||||
|
def config = new NodeManagerTool()
|
||||||
|
|
||||||
|
when:
|
||||||
|
config.getStatus(client,niFiProperties,null)
|
||||||
|
|
||||||
|
then:
|
||||||
|
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8080"
|
||||||
|
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost"
|
||||||
|
1 * client.resource(_ as String) >> resource
|
||||||
|
1 * resource.get(ClientResponse.class) >> response
|
||||||
|
2 * response.getStatus() >> 403
|
||||||
|
1 * response.getEntity(String.class) >> "Unauthorized User"
|
||||||
|
}
|
||||||
|
|
||||||
|
def "get multiple node status successfully"(){
|
||||||
|
|
||||||
|
given:
|
||||||
|
def NiFiProperties niFiProperties = Mock NiFiProperties
|
||||||
|
def Client client = Mock Client
|
||||||
|
def WebResource resource = Mock WebResource
|
||||||
|
def ClientResponse response = Mock ClientResponse
|
||||||
|
def config = new NodeManagerTool()
|
||||||
|
def activeUrls = ["https://localhost:8080","https://localhost1:8080"]
|
||||||
|
|
||||||
|
when:
|
||||||
|
config.getStatus(client,niFiProperties,activeUrls)
|
||||||
|
|
||||||
|
then:
|
||||||
|
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8080"
|
||||||
|
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost"
|
||||||
|
2 * client.resource(_ as String) >> resource
|
||||||
|
2 * resource.get(ClientResponse.class) >> response
|
||||||
|
2 * response.getStatus() >> 200
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def "disconnect node successfully"(){
|
def "disconnect node successfully"(){
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
|
|
Loading…
Reference in New Issue