NIFI-3695

- added proxy dn flag to support providing authorized username for secured environments
- addressed pr comments including fix to ensure proxy info added when getting cluster info, showing cleaner error messaging  and improving help text. Also fixed potential issue with versioning comparison (mismatched lengths)
- Printing response body when requests fails.
- This closes #1697
This commit is contained in:
Yolanda M. Davis 2017-04-26 10:22:18 -04:00 committed by Matt Gilman
parent a41a2a9b1a
commit 55b8c7ddad
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
12 changed files with 714 additions and 60 deletions

View File

@ -361,10 +361,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
// grant access to the proxy resource // grant access to the proxy resource
addAccessPolicy(authorizations, ResourceType.Proxy.getValue(), jaxbNodeUser.getIdentifier(), WRITE_CODE); addAccessPolicy(authorizations, ResourceType.Proxy.getValue(), jaxbNodeUser.getIdentifier(), WRITE_CODE);
//grant access to controller resource
addAccessPolicy(authorizations, ResourceType.Controller.getValue(), jaxbNodeUser.getIdentifier(), READ_CODE);
addAccessPolicy(authorizations, ResourceType.Controller.getValue(), jaxbNodeUser.getIdentifier(), WRITE_CODE);
// grant the user read/write access data of the root group // grant the user read/write access data of the root group
if (rootGroupId != null) { if (rootGroupId != null) {
addAccessPolicy(authorizations, ResourceType.Data.getValue() + ResourceType.ProcessGroup.getValue() + "/" + rootGroupId, jaxbNodeUser.getIdentifier(), READ_CODE); addAccessPolicy(authorizations, ResourceType.Data.getValue() + ResourceType.ProcessGroup.getValue() + "/" + rootGroupId, jaxbNodeUser.getIdentifier(), READ_CODE);

View File

@ -47,10 +47,37 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId> <artifactId>nifi-client-dto</artifactId>
<version>${client.version}</version> <version>${client.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-security</artifactId>
<version>${client.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId> <artifactId>nifi-properties</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -65,6 +92,12 @@ language governing permissions and limitations under the License. -->
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId> <artifactId>nifi-security-utils</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.codehaus.jackson</groupId> <groupId>org.codehaus.jackson</groupId>

View File

@ -87,7 +87,7 @@ public abstract class AbstractAdminTool {
final String versionStr = AdminUtil.getNiFiVersion(nifiConfDir,nifiLibDir) final String versionStr = AdminUtil.getNiFiVersion(nifiConfDir,nifiLibDir)
if(!StringUtils.isEmpty(versionStr)){ if(!StringUtils.isEmpty(versionStr)){
Version version = new Version(versionStr,".") Version version = new Version(versionStr.replace("-","."),".")
Version minVersion = new Version(supportedMinimumVersion,".") Version minVersion = new Version(supportedMinimumVersion,".")
Version.VERSION_COMPARATOR.compare(version,minVersion) >= 0 Version.VERSION_COMPARATOR.compare(version,minVersion) >= 0
}else{ }else{

View File

@ -27,6 +27,7 @@ import org.apache.nifi.web.api.dto.NodeDTO
import org.apache.nifi.web.api.dto.util.DateTimeAdapter import org.apache.nifi.web.api.dto.util.DateTimeAdapter
import org.apache.nifi.web.api.entity.ClusterEntity import org.apache.nifi.web.api.entity.ClusterEntity
import org.apache.nifi.web.api.entity.NodeEntity import org.apache.nifi.web.api.entity.NodeEntity
import org.apache.nifi.web.security.ProxiedEntitiesUtils
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -85,7 +86,7 @@ public class NiFiClientUtil {
urlBuilder.toString() urlBuilder.toString()
} }
public static ClusterEntity getCluster(final Client client, NiFiProperties niFiProperties, List<String> activeUrls){ public static ClusterEntity getCluster(final Client client, NiFiProperties niFiProperties, List<String> activeUrls, final String proxyDN){
if(activeUrls.isEmpty()){ if(activeUrls.isEmpty()){
final String url = getUrl(niFiProperties,null) final String url = getUrl(niFiProperties,null)
@ -98,15 +99,21 @@ public class NiFiClientUtil {
String url = activeUrl + GET_CLUSTER_ENDPOINT String url = activeUrl + GET_CLUSTER_ENDPOINT
final WebResource webResource = client.resource(url) final WebResource webResource = client.resource(url)
final ClientResponse response = webResource.type("application/json").get(ClientResponse.class) ClientResponse response
Integer status = response.getStatus() if(url.startsWith("https")) {
response = webResource.type("application/json").header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.formatProxyDn(proxyDN)).get(ClientResponse.class)
}else{
response = webResource.type("application/json").get(ClientResponse.class)
}
Integer status = response.status
if (status != 200) { if (status != 200) {
if (status == 404) { if (status == 404) {
logger.warn("This node is not attached to a cluster. Please connect to a node that is attached to the cluster for information") logger.warn("This node is not attached to a cluster. Please connect to a node that is attached to the cluster for information")
} else { } else {
logger.warn("Failed with HTTP error code: {}, message: {}", status, response.getStatusInfo().getReasonPhrase()) logger.warn("Failed with HTTP error code: {}, message: {}", status, response.getEntity(String.class))
} }
} else if (status == 200) { } else if (status == 200) {
return response.getEntity(ClusterEntity.class) return response.getEntity(ClusterEntity.class)
@ -122,9 +129,9 @@ public class NiFiClientUtil {
} }
public static List<String> getActiveClusterUrls(final Client client, NiFiProperties niFiProperties){ public static List<String> getActiveClusterUrls(final Client client, NiFiProperties niFiProperties, final String proxyDN){
final ClusterEntity clusterEntity = getCluster(client, niFiProperties, Lists.newArrayList()) final ClusterEntity clusterEntity = getCluster(client, niFiProperties, Lists.newArrayList(),proxyDN)
final List<NodeDTO> activeNodes = clusterEntity.cluster.nodes.findAll{ it.status == "CONNECTED" } final List<NodeDTO> activeNodes = clusterEntity.cluster.nodes.findAll{ it.status == "CONNECTED" }
final List<String> activeUrls = Lists.newArrayList() final List<String> activeUrls = Lists.newArrayList()

View File

@ -35,6 +35,7 @@ import org.apache.nifi.util.StringUtils
import org.apache.nifi.web.api.dto.NodeDTO import org.apache.nifi.web.api.dto.NodeDTO
import org.apache.nifi.web.api.entity.ClusterEntity import org.apache.nifi.web.api.entity.ClusterEntity
import org.apache.nifi.web.api.entity.NodeEntity import org.apache.nifi.web.api.entity.NodeEntity
import org.apache.nifi.web.security.ProxiedEntitiesUtils
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -45,6 +46,7 @@ public class NodeManagerTool extends AbstractAdminTool {
private static final String DEFAULT_DESCRIPTION = "This tool is used to manage nodes within a cluster. Supported functionality will remove node from cluster. " private static final String DEFAULT_DESCRIPTION = "This tool is used to manage nodes within a cluster. Supported functionality will remove node from cluster. "
private static final String HELP_ARG = "help" private static final String HELP_ARG = "help"
private static final String VERBOSE_ARG = "verbose" private static final String VERBOSE_ARG = "verbose"
private static final String PROXY_DN = "proxyDn"
private static final String BOOTSTRAP_CONF = "bootstrapConf" private static final String BOOTSTRAP_CONF = "bootstrapConf"
private static final String NIFI_INSTALL_DIR = "nifiInstallDir" private static final String NIFI_INSTALL_DIR = "nifiInstallDir"
private static final String CLUSTER_URLS = "clusterUrls" private static final String CLUSTER_URLS = "clusterUrls"
@ -75,6 +77,7 @@ public class NodeManagerTool extends AbstractAdminTool {
final Options options = new Options() final Options options = new Options()
options.addOption(Option.builder("h").longOpt(HELP_ARG).desc("Print help info").build()) options.addOption(Option.builder("h").longOpt(HELP_ARG).desc("Print help info").build())
options.addOption(Option.builder("v").longOpt(VERBOSE_ARG).desc("Set mode to verbose (default is false)").build()) options.addOption(Option.builder("v").longOpt(VERBOSE_ARG).desc("Set mode to verbose (default is false)").build())
options.addOption(Option.builder("p").longOpt(PROXY_DN).hasArg().desc("User or Proxy DN that has permission to send a notification. User must have view and modify privileges to 'access the controller' in NiFi").build())
options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build()) options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build())
options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build()) options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build())
options.addOption(Option.builder("o").longOpt(OPERATION).hasArg().desc("Operation to connect, disconnect or remove node from cluster").build()) options.addOption(Option.builder("o").longOpt(OPERATION).hasArg().desc("Operation to connect, disconnect or remove node from cluster").build())
@ -89,7 +92,7 @@ public class NodeManagerTool extends AbstractAdminTool {
return nodeDTOs.find{ it.address == nodeHost } return nodeDTOs.find{ it.address == nodeHost }
} }
NodeEntity updateNode(final String url, final Client client, final NodeDTO nodeDTO, final STATUS nodeStatus){ NodeEntity updateNode(final String url, final Client client, final NodeDTO nodeDTO, final STATUS nodeStatus,final String proxyDN){
final WebResource webResource = client.resource(url) final WebResource webResource = client.resource(url)
nodeDTO.status = nodeStatus nodeDTO.status = nodeStatus
String json = NiFiClientUtil.convertToJson(nodeDTO) String json = NiFiClientUtil.convertToJson(nodeDTO)
@ -98,36 +101,47 @@ public class NodeManagerTool extends AbstractAdminTool {
logger.info("Sending node info for update: " + json) logger.info("Sending node info for update: " + json)
} }
final ClientResponse response = webResource.type("application/json").put(ClientResponse.class,json) ClientResponse response
if(response.getStatus() != 200){ if(url.startsWith("https")) {
throw new RuntimeException("Failed with HTTP error code: " + response.getStatus()) response = webResource.type("application/json").header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.formatProxyDn(proxyDN)).put(ClientResponse.class, json)
}else{
response = webResource.type("application/json").put(ClientResponse.class, json)
}
if(response.status != 200){
throw new RuntimeException("Failed with HTTP error code " + response.status + " with reason: " +response.getEntity(String.class))
}else{ }else{
response.getEntity(NodeEntity.class) response.getEntity(NodeEntity.class)
} }
} }
void deleteNode(final String url, final Client client){ void deleteNode(final String url, final Client client, final String proxyDN){
final WebResource webResource = client.resource(url) final WebResource webResource = client.resource(url)
if(isVerbose){ if(isVerbose){
logger.info("Attempting to delete node" ) logger.info("Attempting to delete node" )
} }
ClientResponse response
final ClientResponse response = webResource.type("application/json").delete(ClientResponse.class) if(url.startsWith("https")) {
response = webResource.type("application/json").header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.formatProxyDn(proxyDN)).delete(ClientResponse.class)
}else{
response = webResource.type("application/json").delete(ClientResponse.class)
}
if(response.getStatus() != 200){ if(response.status != 200){
throw new RuntimeException("Failed with HTTP error code: " + response.getStatus()) throw new RuntimeException("Failed with HTTP error code " + response.status + " with reason: " +response.getEntity(String.class))
} }
} }
void disconnectNode(final Client client, NiFiProperties niFiProperties, List<String> activeUrls){ void disconnectNode(final Client client, NiFiProperties niFiProperties, List<String> activeUrls, final String proxyDN){
final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls) final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls,proxyDN)
NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties) NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties)
for(String activeUrl: activeUrls) { for(String activeUrl: activeUrls) {
try { try {
final String url = activeUrl + NODE_ENDPOINT + File.separator + currentNode.nodeId final String url = activeUrl + NODE_ENDPOINT + File.separator + currentNode.nodeId
updateNode(url, client, currentNode, STATUS.DISCONNECTING) updateNode(url, client, currentNode, STATUS.DISCONNECTING,proxyDN)
return return
} catch (Exception ex){ } catch (Exception ex){
logger.warn("Could not connect to node on "+activeUrl+". Exception: "+ex.toString()) logger.warn("Could not connect to node on "+activeUrl+". Exception: "+ex.toString())
@ -136,13 +150,13 @@ public class NodeManagerTool extends AbstractAdminTool {
throw new RuntimeException("Could not successfully complete request") throw new RuntimeException("Could not successfully complete request")
} }
void connectNode(final Client client, NiFiProperties niFiProperties,List<String> activeUrls){ void connectNode(final Client client, NiFiProperties niFiProperties,List<String> activeUrls, final String proxyDN){
final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls) final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls,proxyDN)
NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties) NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties)
for(String activeUrl: activeUrls) { for(String activeUrl: activeUrls) {
try { try {
final String url = activeUrl + NODE_ENDPOINT + File.separator + currentNode.nodeId final String url = activeUrl + NODE_ENDPOINT + File.separator + currentNode.nodeId
updateNode(url, client, currentNode, STATUS.CONNECTING) updateNode(url, client, currentNode, STATUS.CONNECTING,proxyDN)
return return
} catch (Exception ex){ } catch (Exception ex){
logger.warn("Could not connect to node on "+activeUrl+". Exception: "+ex.toString()) logger.warn("Could not connect to node on "+activeUrl+". Exception: "+ex.toString())
@ -151,9 +165,9 @@ public class NodeManagerTool extends AbstractAdminTool {
throw new RuntimeException("Could not successfully complete request") throw new RuntimeException("Could not successfully complete request")
} }
void removeNode(final Client client, NiFiProperties niFiProperties, List<String> activeUrls){ void removeNode(final Client client, NiFiProperties niFiProperties, List<String> activeUrls, final String proxyDN){
final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls) final ClusterEntity clusterEntity = NiFiClientUtil.getCluster(client, niFiProperties, activeUrls,proxyDN)
NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties) NodeDTO currentNode = getCurrentNode(clusterEntity,niFiProperties)
if(currentNode != null) { if(currentNode != null) {
@ -169,11 +183,11 @@ public class NodeManagerTool extends AbstractAdminTool {
} }
if(currentNode.status == "CONNECTED") { if(currentNode.status == "CONNECTED") {
currentNode = updateNode(url, client, currentNode, STATUS.DISCONNECTING).node currentNode = updateNode(url, client, currentNode, STATUS.DISCONNECTING,proxyDN).node
} }
if(currentNode.status == "DISCONNECTED") { if(currentNode.status == "DISCONNECTED") {
deleteNode(url, client) deleteNode(url, client,proxyDN)
} }
if(isVerbose){ if(isVerbose){
@ -210,6 +224,7 @@ public class NodeManagerTool extends AbstractAdminTool {
} }
final String bootstrapConfFileName = commandLine.getOptionValue(BOOTSTRAP_CONF) final String bootstrapConfFileName = commandLine.getOptionValue(BOOTSTRAP_CONF)
final String proxyDN = commandLine.getOptionValue(PROXY_DN)
final File bootstrapConf = new File(bootstrapConfFileName) final File bootstrapConf = new File(bootstrapConfFileName)
Properties bootstrapProperties = getBootstrapConf(Paths.get(bootstrapConfFileName)) Properties bootstrapProperties = getBootstrapConf(Paths.get(bootstrapConfFileName))
String nifiConfDir = getRelativeDirectory(bootstrapProperties.getProperty("conf.dir"), bootstrapConf.getCanonicalFile().getParentFile().getParentFile().getCanonicalPath()) String nifiConfDir = getRelativeDirectory(bootstrapProperties.getProperty("conf.dir"), bootstrapConf.getCanonicalFile().getParentFile().getParentFile().getCanonicalPath())
@ -218,6 +233,10 @@ public class NodeManagerTool extends AbstractAdminTool {
final String key = NiFiPropertiesLoader.extractKeyFromBootstrapFile(bootstrapConfFileName) final String key = NiFiPropertiesLoader.extractKeyFromBootstrapFile(bootstrapConfFileName)
final NiFiProperties niFiProperties = NiFiPropertiesLoader.withKey(key).load(nifiPropertiesFileName) final NiFiProperties niFiProperties = NiFiPropertiesLoader.withKey(key).load(nifiPropertiesFileName)
if(!StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT)) && StringUtils.isEmpty(proxyDN)) {
throw new UnsupportedOperationException("Proxy DN is required for sending a notification to this node or cluster")
}
final String nifiInstallDir = commandLine.getOptionValue(NIFI_INSTALL_DIR) final String nifiInstallDir = commandLine.getOptionValue(NIFI_INSTALL_DIR)
if(supportedNiFiMinimumVersion(nifiConfDir,nifiLibDir,SUPPORTED_MINIMUM_VERSION) && NiFiClientUtil.isCluster(niFiProperties)){ if(supportedNiFiMinimumVersion(nifiConfDir,nifiLibDir,SUPPORTED_MINIMUM_VERSION) && NiFiClientUtil.isCluster(niFiProperties)){
@ -235,7 +254,7 @@ public class NodeManagerTool extends AbstractAdminTool {
final String urlList = commandLine.getOptionValue(CLUSTER_URLS) final String urlList = commandLine.getOptionValue(CLUSTER_URLS)
activeUrls = urlList.tokenize(',') activeUrls = urlList.tokenize(',')
}else{ }else{
activeUrls = NiFiClientUtil.getActiveClusterUrls(client,niFiProperties) activeUrls = NiFiClientUtil.getActiveClusterUrls(client,niFiProperties,proxyDN)
} }
if(isVerbose){ if(isVerbose){
@ -243,13 +262,13 @@ public class NodeManagerTool extends AbstractAdminTool {
} }
if(operation.toLowerCase().equals(REMOVE)){ if(operation.toLowerCase().equals(REMOVE)){
removeNode(client,niFiProperties,activeUrls) removeNode(client,niFiProperties,activeUrls,proxyDN)
} }
else if(operation.toLowerCase().equals(DISCONNECT)){ else if(operation.toLowerCase().equals(DISCONNECT)){
disconnectNode(client,niFiProperties,activeUrls) disconnectNode(client,niFiProperties,activeUrls,proxyDN)
} }
else if(operation.toLowerCase().equals(CONNECT)){ else if(operation.toLowerCase().equals(CONNECT)){
connectNode(client,niFiProperties,activeUrls) connectNode(client,niFiProperties,activeUrls,proxyDN)
} }
else{ else{
throw new ParseException("Invalid operation provided: " + operation) throw new ParseException("Invalid operation provided: " + operation)

View File

@ -34,6 +34,7 @@ import org.apache.nifi.toolkit.admin.client.NiFiClientFactory
import org.apache.nifi.util.NiFiProperties import org.apache.nifi.util.NiFiProperties
import org.apache.nifi.web.api.dto.BulletinDTO import org.apache.nifi.web.api.dto.BulletinDTO
import org.apache.nifi.web.api.entity.BulletinEntity import org.apache.nifi.web.api.entity.BulletinEntity
import org.apache.nifi.web.security.ProxiedEntitiesUtils
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -45,6 +46,7 @@ public class NotificationTool extends AbstractAdminTool {
private static final String HELP_ARG = "help" private static final String HELP_ARG = "help"
private static final String VERBOSE_ARG = "verbose" private static final String VERBOSE_ARG = "verbose"
private static final String BOOTSTRAP_CONF = "bootstrapConf" private static final String BOOTSTRAP_CONF = "bootstrapConf"
private static final String PROXY_DN = "proxyDn"
private static final String NIFI_INSTALL_DIR = "nifiInstallDir" private static final String NIFI_INSTALL_DIR = "nifiInstallDir"
private static final String NOTIFICATION_MESSAGE = "message" private static final String NOTIFICATION_MESSAGE = "message"
private static final String NOTIFICATION_LEVEL = "level" private static final String NOTIFICATION_LEVEL = "level"
@ -70,6 +72,7 @@ public class NotificationTool extends AbstractAdminTool {
final Options options = new Options() final Options options = new Options()
options.addOption(Option.builder("h").longOpt(HELP_ARG).desc("Print help info").build()) options.addOption(Option.builder("h").longOpt(HELP_ARG).desc("Print help info").build())
options.addOption(Option.builder("v").longOpt(VERBOSE_ARG).desc("Set mode to verbose (default is false)").build()) options.addOption(Option.builder("v").longOpt(VERBOSE_ARG).desc("Set mode to verbose (default is false)").build())
options.addOption(Option.builder("p").longOpt(PROXY_DN).hasArg().desc("User or Proxy DN that has permission to send a notification. User must have view and modify privileges to 'access the controller' in NiFi").build())
options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build()) options.addOption(Option.builder("b").longOpt(BOOTSTRAP_CONF).hasArg().desc("Existing Bootstrap Configuration file").build())
options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build()) options.addOption(Option.builder("d").longOpt(NIFI_INSTALL_DIR).hasArg().desc("NiFi Installation Directory").build())
options.addOption(Option.builder("m").longOpt(NOTIFICATION_MESSAGE).hasArg().desc("Notification message for nifi instance or cluster").build()) options.addOption(Option.builder("m").longOpt(NOTIFICATION_MESSAGE).hasArg().desc("Notification message for nifi instance or cluster").build())
@ -77,7 +80,7 @@ public class NotificationTool extends AbstractAdminTool {
options options
} }
void notifyCluster(final ClientFactory clientFactory, final String nifiPropertiesFile, final String bootstrapConfFile, final String nifiInstallDir, final String message, final String level){ void notifyCluster(final ClientFactory clientFactory, final String nifiPropertiesFile, final String bootstrapConfFile, final String nifiInstallDir, final String message, final String level, final String proxyDN){
if(isVerbose){ if(isVerbose){
logger.info("Loading nifi properties for host information") logger.info("Loading nifi properties for host information")
@ -99,7 +102,19 @@ public class NotificationTool extends AbstractAdminTool {
bulletinDTO.category = "NOTICE" bulletinDTO.category = "NOTICE"
bulletinDTO.level = StringUtils.isEmpty(level) ? "INFO" : level bulletinDTO.level = StringUtils.isEmpty(level) ? "INFO" : level
bulletinEntity.bulletin = bulletinDTO bulletinEntity.bulletin = bulletinDTO
final ClientResponse response = webResource.type("application/json").post(ClientResponse.class, bulletinEntity)
ClientResponse response
if(!org.apache.nifi.util.StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT))) {
if(StringUtils.isEmpty(proxyDN)){
throw new UnsupportedOperationException("Proxy DN is required for sending a notification to this node or cluster")
}
response = webResource.type("application/json").header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.formatProxyDn(proxyDN)).post(ClientResponse.class, bulletinEntity)
}
else {
response = webResource.type("application/json").post(ClientResponse.class, bulletinEntity)
}
Integer status = response.getStatus() Integer status = response.getStatus()
@ -107,7 +122,7 @@ public class NotificationTool extends AbstractAdminTool {
if(status == 404){ if(status == 404){
throw new RuntimeException("The notification feature is not supported by each node in the cluster") throw new RuntimeException("The notification feature is not supported by each node in the cluster")
}else{ }else{
throw new RuntimeException("Failed with HTTP error code: " + status) throw new RuntimeException("Failed with HTTP error code " + status + " with reason: " +response.getEntity(String.class))
} }
} }
@ -130,6 +145,7 @@ public class NotificationTool extends AbstractAdminTool {
final String bootstrapConfFileName = commandLine.getOptionValue(BOOTSTRAP_CONF) final String bootstrapConfFileName = commandLine.getOptionValue(BOOTSTRAP_CONF)
final File bootstrapConf = new File(bootstrapConfFileName) final File bootstrapConf = new File(bootstrapConfFileName)
final Properties bootstrapProperties = getBootstrapConf(Paths.get(bootstrapConfFileName)) final Properties bootstrapProperties = getBootstrapConf(Paths.get(bootstrapConfFileName))
final String proxyDN = commandLine.getOptionValue(PROXY_DN)
final String parentPathName = bootstrapConf.getCanonicalFile().getParentFile().getParentFile().getCanonicalPath() final String parentPathName = bootstrapConf.getCanonicalFile().getParentFile().getParentFile().getCanonicalPath()
final String nifiConfDir = getRelativeDirectory(bootstrapProperties.getProperty("conf.dir"),parentPathName) final String nifiConfDir = getRelativeDirectory(bootstrapProperties.getProperty("conf.dir"),parentPathName)
final String nifiLibDir = getRelativeDirectory(bootstrapProperties.getProperty("lib.dir"),parentPathName) final String nifiLibDir = getRelativeDirectory(bootstrapProperties.getProperty("lib.dir"),parentPathName)
@ -143,7 +159,7 @@ public class NotificationTool extends AbstractAdminTool {
logger.info("Attempting to connect with nifi using properties:", nifiPropertiesFileName) logger.info("Attempting to connect with nifi using properties:", nifiPropertiesFileName)
} }
notifyCluster(clientFactory, nifiPropertiesFileName, bootstrapConfFileName,nifiInstallDir,notificationMessage,notificationLevel) notifyCluster(clientFactory, nifiPropertiesFileName, bootstrapConfFileName,nifiInstallDir,notificationMessage,notificationLevel,proxyDN)
if(isVerbose) { if(isVerbose) {
logger.info("Message sent successfully to NiFi.") logger.info("Message sent successfully to NiFi.")
@ -169,7 +185,7 @@ public class NotificationTool extends AbstractAdminTool {
try{ try{
tool.parse(clientFactory,args) tool.parse(clientFactory,args)
} catch (ParseException | UnsupportedOperationException e) { } catch (ParseException | UnsupportedOperationException | RuntimeException e) {
tool.printUsage(e.message); tool.printUsage(e.message);
System.exit(1) System.exit(1)
} }

View File

@ -64,11 +64,17 @@ class Version {
String[] o2V = o2.versionNumber String[] o2V = o2.versionNumber
for(int i = 0; i < o1V.length; i++) { for(int i = 0; i < o1V.length; i++) {
Integer val1 = Integer.parseInt(o1V[i])
Integer val2 = Integer.parseInt(o2V[i]) if(o2V.length == i ){
if (val1.compareTo(val2) != 0) { return 1
return val1.compareTo(val2) }else {
Integer val1 = Integer.parseInt(o1V[i])
Integer val2 = Integer.parseInt(o2V[i])
if (val1.compareTo(val2) != 0) {
return val1.compareTo(val2)
}
} }
} }
return 0 return 0
} }

View File

@ -63,7 +63,7 @@ class NiFiClientUtilSpec extends Specification{
def ClusterEntity clusterEntity = Mock ClusterEntity def ClusterEntity clusterEntity = Mock ClusterEntity
when: when:
def entity = NiFiClientUtil.getCluster(client, niFiProperties, []) def entity = NiFiClientUtil.getCluster(client, niFiProperties, [], null)
then: then:
@ -77,6 +77,35 @@ class NiFiClientUtilSpec extends Specification{
} }
def "get secured cluster info successfully"(){
given:
def Client client = Mock Client
def NiFiProperties niFiProperties = Mock NiFiProperties
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def ClusterEntity clusterEntity = Mock ClusterEntity
when:
def entity = NiFiClientUtil.getCluster(client, niFiProperties, [], "ydavis@nifi")
then:
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8081"
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost"
1 * client.resource(_ as String) >> resource
1 * resource.type(_) >> builder
1 * builder.header(_,_) >> builder
1 * builder.get(_) >> response
1 * response.getStatus() >> 200
1 * response.getEntity(ClusterEntity.class) >> clusterEntity
entity == clusterEntity
}
def "get cluster info fails"(){ def "get cluster info fails"(){
given: given:
@ -89,7 +118,7 @@ class NiFiClientUtilSpec extends Specification{
when: when:
NiFiClientUtil.getCluster(client, niFiProperties, []) NiFiClientUtil.getCluster(client, niFiProperties, [],null)
then: then:
@ -98,8 +127,7 @@ class NiFiClientUtilSpec extends Specification{
1 * resource.type(_) >> builder 1 * resource.type(_) >> builder
1 * builder.get(_) >> response 1 * builder.get(_) >> response
1 * response.getStatus() >> 500 1 * response.getStatus() >> 500
1 * response.getStatusInfo() >> statusType 1 * response.getEntity(String.class) >> "Only a node connected to a cluster can process the request."
1 * statusType.getReasonPhrase() >> "Only a node connected to a cluster can process the request."
def e = thrown(RuntimeException) def e = thrown(RuntimeException)
e.message == "Unable to obtain cluster information" e.message == "Unable to obtain cluster information"

View File

@ -1,4 +1,3 @@
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -32,6 +31,8 @@ import org.junit.contrib.java.lang.system.ExpectedSystemExit
import org.junit.contrib.java.lang.system.SystemOutRule import org.junit.contrib.java.lang.system.SystemOutRule
import spock.lang.Specification import spock.lang.Specification
import javax.ws.rs.core.Response
class NodeManagerToolSpec extends Specification{ class NodeManagerToolSpec extends Specification{
@Rule @Rule
@ -174,7 +175,7 @@ class NodeManagerToolSpec extends Specification{
def config = new NodeManagerTool() def config = new NodeManagerTool()
when: when:
config.deleteNode(url,client) config.deleteNode(url,client,null)
then: then:
@ -185,6 +186,30 @@ class NodeManagerToolSpec extends Specification{
} }
def "delete secured node successfully"(){
given:
def String url = "https://locahost:8080/nifi-api/controller"
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def config = new NodeManagerTool()
when:
config.deleteNode(url,client,null)
then:
1 * client.resource(_ as String) >> resource
1 * resource.type(_) >> builder
1 * builder.header(_,_) >> builder
1 * builder.delete(_) >> response
1 * response.getStatus() >> 200
}
def "delete node failed"(){ def "delete node failed"(){
given: given:
@ -193,18 +218,20 @@ class NodeManagerToolSpec extends Specification{
def WebResource resource = Mock WebResource def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse def ClientResponse response = Mock ClientResponse
def Response.StatusType statusType = Mock Response.StatusType
def config = new NodeManagerTool() def config = new NodeManagerTool()
when: when:
config.deleteNode(url,client) config.deleteNode(url,client,null)
then: then:
1 * client.resource(_ as String) >> resource 1 * client.resource(_ as String) >> resource
1 * resource.type(_) >> builder 1 * resource.type(_) >> builder
1 * builder.delete(_) >> response 1 * builder.delete(_) >> response
2 * response.getStatus() >> 403 2 * response.getStatus() >> 403
1 * response.getEntity(String.class) >> "Unauthorized User"
def e = thrown(RuntimeException) def e = thrown(RuntimeException)
e.message == "Failed with HTTP error code: 403" e.message == "Failed with HTTP error code 403 with reason: Unauthorized User"
} }
@ -221,7 +248,7 @@ class NodeManagerToolSpec extends Specification{
def config = new NodeManagerTool() def config = new NodeManagerTool()
when: when:
def entity = config.updateNode(url,client,nodeDTO,NodeManagerTool.STATUS.DISCONNECTING) def entity = config.updateNode(url,client,nodeDTO,NodeManagerTool.STATUS.DISCONNECTING,null)
then: then:
1 * client.resource(_ as String) >> resource 1 * client.resource(_ as String) >> resource
@ -233,6 +260,32 @@ class NodeManagerToolSpec extends Specification{
} }
def "update secured node successfully"(){
given:
def String url = "https://locahost:8080/nifi-api/controller"
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def NodeDTO nodeDTO = new NodeDTO()
def NodeEntity nodeEntity = Mock NodeEntity
def config = new NodeManagerTool()
when:
def entity = config.updateNode(url,client,nodeDTO,NodeManagerTool.STATUS.DISCONNECTING,null)
then:
1 * client.resource(_ as String) >> resource
1 * resource.type(_) >> builder
1 * builder.header(_,_) >> builder
1 * builder.put(_,_) >> response
1 * response.getStatus() >> 200
1 * response.getEntity(NodeEntity.class) >> nodeEntity
entity == nodeEntity
}
def "update node fails"(){ def "update node fails"(){
given: given:
@ -241,19 +294,21 @@ class NodeManagerToolSpec extends Specification{
def WebResource resource = Mock WebResource def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse def ClientResponse response = Mock ClientResponse
def Response.StatusType statusType = Mock Response.StatusType
def NodeDTO nodeDTO = new NodeDTO() def NodeDTO nodeDTO = new NodeDTO()
def config = new NodeManagerTool() def config = new NodeManagerTool()
when: when:
config.updateNode(url,client,nodeDTO,NodeManagerTool.STATUS.DISCONNECTING) config.updateNode(url,client,nodeDTO,NodeManagerTool.STATUS.DISCONNECTING,null)
then: then:
1 * client.resource(_ as String) >> resource 1 * client.resource(_ as String) >> resource
1 * resource.type(_) >> builder 1 * resource.type(_) >> builder
1 * builder.put(_,_) >> response 1 * builder.put(_,_) >> response
2 * response.getStatus() >> 403 2 * response.getStatus() >> 403
1 * response.getEntity(String.class) >> "Unauthorized User"
def e = thrown(RuntimeException) def e = thrown(RuntimeException)
e.message == "Failed with HTTP error code: 403" e.message == "Failed with HTTP error code 403 with reason: Unauthorized User"
} }
@ -290,10 +345,48 @@ class NodeManagerToolSpec extends Specification{
nodeDTO.address >> "localhost" nodeDTO.address >> "localhost"
expect: expect:
config.disconnectNode(client, niFiProperties,["http://localhost:8080"]) config.disconnectNode(client, niFiProperties,["http://localhost:8080"],null)
} }
def "disconnect secured node successfully"(){
setup:
def NiFiProperties niFiProperties = Mock NiFiProperties
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def ClusterEntity clusterEntity = Mock ClusterEntity
def ClusterDTO clusterDTO = Mock ClusterDTO
def NodeDTO nodeDTO = new NodeDTO()
nodeDTO.address = "localhost"
nodeDTO.nodeId = "1"
nodeDTO.status = "CONNECTED"
def List<NodeDTO> nodeDTOs = [nodeDTO]
def NodeEntity nodeEntity = new NodeEntity()
nodeEntity.node = nodeDTO
def config = new NodeManagerTool()
niFiProperties.getProperty(_) >> "localhost"
client.resource(_ as String) >> resource
resource.type(_) >> builder
builder.get(ClientResponse.class) >> response
builder.header(_,_) >> builder
builder.put(_,_) >> response
response.getStatus() >> 200
response.getEntity(ClusterEntity.class) >> clusterEntity
response.getEntity(NodeEntity.class) >> nodeEntity
clusterEntity.getCluster() >> clusterDTO
clusterDTO.getNodes() >> nodeDTOs
nodeDTO.address >> "localhost"
expect:
config.disconnectNode(client, niFiProperties,["https://localhost:8080"],null)
}
def "connect node successfully"(){ def "connect node successfully"(){
setup: setup:
@ -327,7 +420,7 @@ class NodeManagerToolSpec extends Specification{
nodeDTO.address >> "localhost" nodeDTO.address >> "localhost"
expect: expect:
config.connectNode(client, niFiProperties,["http://localhost:8080"]) config.connectNode(client, niFiProperties,["http://localhost:8080"],null)
} }
@ -365,7 +458,7 @@ class NodeManagerToolSpec extends Specification{
nodeDTO.address >> "localhost" nodeDTO.address >> "localhost"
expect: expect:
config.removeNode(client, niFiProperties,["http://localhost:8080"]) config.removeNode(client, niFiProperties,["http://localhost:8080"],null)
} }
@ -410,5 +503,172 @@ class NodeManagerToolSpec extends Specification{
} }
def "parse args and fail connecting secured node"(){
setup:
def NiFiProperties niFiProperties = Mock NiFiProperties
def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def ClusterEntity clusterEntity = Mock ClusterEntity
def ClusterDTO clusterDTO = Mock ClusterDTO
def NodeDTO nodeDTO = new NodeDTO()
nodeDTO.address = "localhost"
nodeDTO.nodeId = "1"
nodeDTO.status = "DISCONNECTED"
def List<NodeDTO> nodeDTOs = [nodeDTO]
def NodeEntity nodeEntity = new NodeEntity()
nodeEntity.node = nodeDTO
def config = new NodeManagerTool()
niFiProperties.getProperty(_) >> "localhost"
clientFactory.getClient(_,_) >> client
client.resource(_ as String) >> resource
resource.type(_) >> builder
builder.get(ClientResponse.class) >> response
builder.header(_,_) >> builder
builder.put(_,_) >> response
response.getStatus() >> 200
response.getEntity(ClusterEntity.class) >> clusterEntity
response.getEntity(NodeEntity.class) >> nodeEntity
clusterEntity.getCluster() >> clusterDTO
clusterDTO.getNodes() >> nodeDTOs
nodeDTO.address >> "localhost"
when:
config.parse(clientFactory,["-b","src/test/resources/notify/conf_secure/bootstrap.conf","-d","/bogus/nifi/dir","-o","connect","-u","https://localhost:8080,https://localhost1:8080"] as String[])
then:
def e = thrown(UnsupportedOperationException)
e.message == "Proxy DN is required for sending a notification to this node or cluster"
}
def "parse args and connect secured node"(){
setup:
def NiFiProperties niFiProperties = Mock NiFiProperties
def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def ClusterEntity clusterEntity = Mock ClusterEntity
def ClusterDTO clusterDTO = Mock ClusterDTO
def NodeDTO nodeDTO = new NodeDTO()
nodeDTO.address = "localhost"
nodeDTO.nodeId = "1"
nodeDTO.status = "DISCONNECTED"
def List<NodeDTO> nodeDTOs = [nodeDTO]
def NodeEntity nodeEntity = new NodeEntity()
nodeEntity.node = nodeDTO
def config = new NodeManagerTool()
niFiProperties.getProperty(_) >> "localhost"
clientFactory.getClient(_,_) >> client
client.resource(_ as String) >> resource
resource.type(_) >> builder
builder.get(ClientResponse.class) >> response
builder.header(_,_) >> builder
builder.put(_,_) >> response
response.getStatus() >> 200
response.getEntity(ClusterEntity.class) >> clusterEntity
response.getEntity(NodeEntity.class) >> nodeEntity
clusterEntity.getCluster() >> clusterDTO
clusterDTO.getNodes() >> nodeDTOs
nodeDTO.address >> "localhost"
expect:
config.parse(clientFactory,["-b","src/test/resources/notify/conf_secure/bootstrap.conf","-d","/bogus/nifi/dir","-o","connect","-u","https://localhost:8080,https://localhost1:8080","-p","ydavis@nifi"] as String[])
}
def "parse args and disconnect secured node"(){
setup:
def NiFiProperties niFiProperties = Mock NiFiProperties
def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def ClusterEntity clusterEntity = Mock ClusterEntity
def ClusterDTO clusterDTO = Mock ClusterDTO
def NodeDTO nodeDTO = new NodeDTO()
nodeDTO.address = "localhost"
nodeDTO.nodeId = "1"
nodeDTO.status = "CONNECTED"
def List<NodeDTO> nodeDTOs = [nodeDTO]
def NodeEntity nodeEntity = new NodeEntity()
nodeEntity.node = nodeDTO
def config = new NodeManagerTool()
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8081"
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost"
clientFactory.getClient(_,_) >> client
client.resource(_ as String) >> resource
resource.type(_) >> builder
builder.get(ClientResponse.class) >> response
builder.header(_,_) >> builder
builder.put(_,_) >> response
response.getStatus() >> 200
response.getEntity(ClusterEntity.class) >> clusterEntity
response.getEntity(NodeEntity.class) >> nodeEntity
clusterEntity.getCluster() >> clusterDTO
clusterDTO.getNodes() >> nodeDTOs
nodeDTO.address >> "localhost"
expect:
config.parse(clientFactory,["-b","src/test/resources/notify/conf_secure/bootstrap.conf","-d","/bogus/nifi/dir","-o","disconnect","-u","https://localhost:8080,https://localhost1:8080","-p","ydavis@nifi"] as String[])
}
def "parse args and delete secured node"(){
setup:
def NiFiProperties niFiProperties = Mock NiFiProperties
def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def ClusterEntity clusterEntity = Mock ClusterEntity
def ClusterDTO clusterDTO = Mock ClusterDTO
def NodeDTO nodeDTO = new NodeDTO()
nodeDTO.address = "localhost"
nodeDTO.nodeId = "1"
nodeDTO.status = "CONNECTED"
def List<NodeDTO> nodeDTOs = [nodeDTO]
def NodeEntity nodeEntity = new NodeEntity()
nodeEntity.node = nodeDTO
def config = new NodeManagerTool()
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT) >> "8081"
niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_HOST) >> "localhost"
clientFactory.getClient(_,_) >> client
client.resource(_ as String) >> resource
resource.type(_) >> builder
builder.get(ClientResponse.class) >> response
builder.header(_,_) >> builder
builder.put(_,_) >> response
builder.delete(ClientResponse.class,_) >> response
response.getStatus() >> 200
response.getEntity(ClusterEntity.class) >> clusterEntity
response.getEntity(NodeEntity.class) >> nodeEntity
clusterEntity.getCluster() >> clusterDTO
clusterDTO.getNodes() >> nodeDTOs
nodeDTO.address >> "localhost"
expect:
config.parse(clientFactory,["-b","src/test/resources/notify/conf_secure/bootstrap.conf","-d","/bogus/nifi/dir","-o","remove","-u","https://localhost:8080,https://localhost1:8080","-p","ydavis@nifi"] as String[])
}
} }

View File

@ -21,12 +21,19 @@ import com.sun.jersey.api.client.Client
import com.sun.jersey.api.client.ClientResponse import com.sun.jersey.api.client.ClientResponse
import com.sun.jersey.api.client.WebResource import com.sun.jersey.api.client.WebResource
import org.apache.commons.cli.ParseException import org.apache.commons.cli.ParseException
import org.apache.commons.lang3.SystemUtils
import org.apache.nifi.toolkit.admin.client.ClientFactory import org.apache.nifi.toolkit.admin.client.ClientFactory
import org.apache.nifi.toolkit.tls.standalone.TlsToolkitStandalone
import org.apache.nifi.toolkit.tls.standalone.TlsToolkitStandaloneCommandLine
import org.junit.Rule import org.junit.Rule
import org.junit.contrib.java.lang.system.ExpectedSystemExit import org.junit.contrib.java.lang.system.ExpectedSystemExit
import org.junit.contrib.java.lang.system.SystemOutRule import org.junit.contrib.java.lang.system.SystemOutRule
import spock.lang.Specification import spock.lang.Specification
import javax.ws.rs.core.Response
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission
class NotificationToolSpec extends Specification{ class NotificationToolSpec extends Specification{
@Rule @Rule
@ -104,7 +111,7 @@ class NotificationToolSpec extends Specification{
def config = new NotificationTool() def config = new NotificationTool()
when: when:
config.notifyCluster(clientFactory,"src/test/resources/notify/conf/nifi.properties","src/test/resources/notify/conf/bootstrap.conf","/bogus/nifi/dir","shutting down in 30 seconds","WARN") config.notifyCluster(clientFactory,"src/test/resources/notify/conf/nifi.properties","src/test/resources/notify/conf/bootstrap.conf","/bogus/nifi/dir","shutting down in 30 seconds","WARN",null)
then: then:
@ -116,9 +123,21 @@ class NotificationToolSpec extends Specification{
} }
def "cluster message failed"(){ def "send secured cluster cluster message successfully"(){
given: given:
def File tmpDir = setupTmpDir()
def File testDir = new File("target/tmp/keys")
def toolkitCommandLine = ["-O", "-o",testDir.absolutePath,"-n","localhost","-C", "CN=user1","-S", "badKeyPass", "-K", "badKeyPass", "-P", "badTrustPass"]
TlsToolkitStandaloneCommandLine tlsToolkitStandaloneCommandLine = new TlsToolkitStandaloneCommandLine()
tlsToolkitStandaloneCommandLine.parse(toolkitCommandLine as String[])
new TlsToolkitStandalone().createNifiKeystoresAndTrustStores(tlsToolkitStandaloneCommandLine.createConfig())
def bootstrapConfFile = "src/test/resources/notify/conf/bootstrap.conf"
def nifiPropertiesFile = "src/test/resources/notify/conf/nifi-secured.properties"
def ClientFactory clientFactory = Mock ClientFactory def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client def Client client = Mock Client
def WebResource resource = Mock WebResource def WebResource resource = Mock WebResource
@ -128,7 +147,38 @@ class NotificationToolSpec extends Specification{
def config = new NotificationTool() def config = new NotificationTool()
when: when:
config.notifyCluster(clientFactory,"src/test/resources/notify/conf/nifi.properties","src/test/resources/notify/conf/bootstrap.conf","/bogus/nifi/dir","shutting down in 30 seconds","WARN") config.notifyCluster(clientFactory,nifiPropertiesFile,bootstrapConfFile,"/bogus/nifi/dir","shutting down in 30 seconds","WARN","ydavis@nifi")
then:
1 * clientFactory.getClient(_,_) >> client
1 * client.resource(_ as String) >> resource
1 * resource.type(_) >> builder
1 * builder.header(_,_) >> builder
1 * builder.post(_,_) >> response
1 * response.getStatus() >> 200
cleanup:
tmpDir.deleteDir()
}
def "cluster message failed"(){
given:
def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def Response.StatusType statusType = Mock Response.StatusType
def config = new NotificationTool()
when:
config.notifyCluster(clientFactory,"src/test/resources/notify/conf/nifi.properties","src/test/resources/notify/conf/bootstrap.conf","/bogus/nifi/dir","shutting down in 30 seconds","WARN","ydavis@nifi")
then: then:
@ -137,8 +187,47 @@ class NotificationToolSpec extends Specification{
1 * resource.type(_) >> builder 1 * resource.type(_) >> builder
1 * builder.post(_,_) >> response 1 * builder.post(_,_) >> response
1 * response.getStatus() >> 403 1 * response.getStatus() >> 403
1 * response.getEntity(String.class) >> "Unauthorized User"
def e = thrown(RuntimeException) def e = thrown(RuntimeException)
e.message == "Failed with HTTP error code: 403" e.message == "Failed with HTTP error code 403 with reason: Unauthorized User"
}
def "send secured cluster cluster message fails due to missing proxy dn"(){
given:
def File tmpDir = setupTmpDir()
def File testDir = new File("target/tmp/keys")
def toolkitCommandLine = ["-O", "-o",testDir.absolutePath,"-n","localhost","-C", "CN=user1","-S", "badKeyPass", "-K", "badKeyPass", "-P", "badTrustPass"]
TlsToolkitStandaloneCommandLine tlsToolkitStandaloneCommandLine = new TlsToolkitStandaloneCommandLine()
tlsToolkitStandaloneCommandLine.parse(toolkitCommandLine as String[])
new TlsToolkitStandalone().createNifiKeystoresAndTrustStores(tlsToolkitStandaloneCommandLine.createConfig())
def bootstrapConfFile = "src/test/resources/notify/conf/bootstrap.conf"
def nifiPropertiesFile = "src/test/resources/notify/conf/nifi-secured.properties"
def ClientFactory clientFactory = Mock ClientFactory
def Client client = Mock Client
def WebResource resource = Mock WebResource
def WebResource.Builder builder = Mock WebResource.Builder
def ClientResponse response = Mock ClientResponse
def config = new NotificationTool()
when:
config.notifyCluster(clientFactory,nifiPropertiesFile,bootstrapConfFile,"/bogus/nifi/dir","shutting down in 30 seconds","WARN",null)
then:
1 * clientFactory.getClient(_,_) >> client
1 * client.resource(_ as String) >> resource
def e = thrown(UnsupportedOperationException)
e.message == "Proxy DN is required for sending a notification to this node or cluster"
cleanup:
tmpDir.deleteDir()
} }
@ -166,6 +255,25 @@ class NotificationToolSpec extends Specification{
} }
def setFilePermissions(File file, List<PosixFilePermission> permissions = []) {
if (SystemUtils.IS_OS_WINDOWS) {
file?.setReadable(permissions.contains(PosixFilePermission.OWNER_READ))
file?.setWritable(permissions.contains(PosixFilePermission.OWNER_WRITE))
file?.setExecutable(permissions.contains(PosixFilePermission.OWNER_EXECUTE))
} else {
Files.setPosixFilePermissions(file?.toPath(), permissions as Set)
}
}
def setupTmpDir(String tmpDirPath = "target/tmp/") {
File tmpDir = new File(tmpDirPath)
tmpDir.mkdirs()
setFilePermissions(tmpDir, [PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE, PosixFilePermission.OWNER_EXECUTE,
PosixFilePermission.GROUP_READ, PosixFilePermission.GROUP_WRITE, PosixFilePermission.GROUP_EXECUTE,
PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_WRITE, PosixFilePermission.OTHERS_EXECUTE])
tmpDir
}
} }

View File

@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Java command to use when running NiFi
java=java
# Username to use when running NiFi. This value will be ignored on Windows.
run.as=
# Configure where NiFi's lib and conf directories live
lib.dir=./lib
conf.dir=./conf_secure
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20
# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
# JVM memory settings
java.arg.2=-Xms1024m
java.arg.3=-Xmx1024m
# Enable Remote Debugging
java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
java.arg.4=-Djava.net.preferIPv4Stack=true
# allowRestrictedHeaders is required for Cluster/Node communications to work properly
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
# The G1GC is still considered experimental but has proven to be very advantageous in providing great
# performance without significant "stop-the-world" delays.
java.arg.13=-XX:+UseG1GC
#Set headless mode by default
java.arg.14=-Djava.awt.headless=true
# Master key in hexadecimal format for encrypted sensitive configuration values
nifi.bootstrap.sensitive.key=
###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###
# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml
# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
#nifi.start.notification.services=email-notification
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
#nifi.stop.notification.services=email-notification
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
#nifi.dead.notification.services=email-notification

View File

@ -0,0 +1,107 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# core properties #
nifi.flow.configuration.file=./conf/flow.xml.gz
nifi.flow.configuration.archive.dir=./conf/archive/
nifi.task.configuration.file=./conf/reporting-tasks.xml
nifi.service.configuration.file=./conf/controller-services.xml
nifi.database.directory=./database_repository
nifi.flowfile.repository.directory=./flowfile_repository
nifi.flowfile.repository.partitions=4096
nifi.flowfile.repository.checkpoint.millis=120000
nifi.content.repository.directory.default=./content_repository
nifi.provenance.repository.capacity=25000
nifi.templates.directory=./conf/templates
nifi.version=1.2.0-SNAPSHOT
nifi.ui.banner.text=DEFAULT BANNER
nifi.ui.autorefresh.interval.seconds=30
nifi.flowcontroller.autoStartProcessors=true
nifi.flowcontroller.schedulestrategy=delay
nifi.flowcontroller.minimum.nanoseconds=1000000
nifi.flowcontroller.graceful.shutdown.seconds=10
nifi.nar.library.directory=./lib
nifi.nar.working.directory=./work/nar/
nifi.flowservice.writedelay.seconds=2
nifi.sensitive.props.key=REPLACE_ME
nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
nifi.sensitive.props.provider=BC
nifi.h2.repository.maxmemoryrows=100000
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
nifi.h2.max.connections=20
nifi.h2.login.timeout=500
#For testing purposes. Default value should actually be empty!
nifi.remote.input.socket.port=5000
nifi.remote.input.secure=true
# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=5050
nifi.web.jetty.working.directory=./work/jetty
# security properties #
nifi.security.keystore=target/tmp/keys/localhost/keystore.jks
nifi.security.keystoreType=JKS
nifi.security.keystorePasswd=badKeyPass
nifi.security.keyPasswd=badKeyPass
nifi.security.truststore=target/tmp/keys/localhost/truststore.jks
nifi.security.truststoreType=JKS
nifi.security.truststorePasswd=badTrustPass
nifi.security.needClientAuth=true
nifi.security.user.authorizer=
# cluster common properties (cluster manager and nodes must have same values) #
nifi.cluster.protocol.heartbeat.tick.seconds=10
nifi.cluster.protocol.is.secure=true
nifi.cluster.protocol.socket.timeout.ms=30000
nifi.cluster.protocol.connection.handshake.timeout.seconds=45
# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured #
nifi.cluster.protocol.use.multicast=false
nifi.cluster.protocol.multicast.address=
nifi.cluster.protocol.multicast.port=
nifi.cluster.protocol.multicast.service.broadcast.delay.ms=500
nifi.cluster.protocol.multicast.service.locator.attempts=3
nifi.cluster.protocol.multicast.service.locator.attempts.delay.seconds=1
#For testing purposes. Default value should actually be empty!
nifi.cluster.remote.input.socket.port=5000
nifi.cluster.remote.input.secure=true
# cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=
nifi.cluster.node.protocol.port=
nifi.cluster.node.protocol.threads=2
# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx #
nifi.cluster.node.unicast.manager.address=
nifi.cluster.node.unicast.manager.protocol.port=
nifi.cluster.node.unicast.manager.authority.provider.port=
# cluster manager properties (only configure for cluster manager) #
nifi.cluster.is.manager=true
nifi.cluster.manager.address=localhost
nifi.cluster.manager.protocol.port=3030
nifi.cluster.manager.authority.provider.port=4040
nifi.cluster.manager.authority.provider.threads=10
nifi.cluster.manager.node.firewall.file=
nifi.cluster.manager.node.event.history.size=10
nifi.cluster.manager.node.api.connection.timeout.ms=30000
nifi.cluster.manager.node.api.read.timeout.ms=30000
nifi.cluster.manager.node.api.request.threads=10
nifi.cluster.manager.flow.retrieval.delay.seconds=5
nifi.cluster.manager.protocol.threads=10
nifi.cluster.manager.safemode.seconds=0