mirror of https://github.com/apache/nifi.git
NIFI-1950 Updating FileAuthorizer to convert access controls from input and output ports during legacy conversion. This closes #702.
This commit is contained in:
parent
b082858595
commit
3e9867d5da
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.authorization;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.annotation.AuthorizerContext;
|
||||
import org.apache.nifi.authorization.exception.AuthorizationAccessException;
|
||||
|
@ -26,14 +25,14 @@ import org.apache.nifi.authorization.file.generated.Groups;
|
|||
import org.apache.nifi.authorization.file.generated.Policies;
|
||||
import org.apache.nifi.authorization.file.generated.Policy;
|
||||
import org.apache.nifi.authorization.file.generated.Users;
|
||||
import org.apache.nifi.authorization.resource.ResourceFactory;
|
||||
import org.apache.nifi.authorization.resource.ResourceType;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.apache.nifi.web.api.dto.PortDTO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import javax.xml.XMLConstants;
|
||||
|
@ -42,20 +41,12 @@ import javax.xml.bind.JAXBElement;
|
|||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.bind.Marshaller;
|
||||
import javax.xml.bind.Unmarshaller;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
import javax.xml.validation.Schema;
|
||||
import javax.xml.validation.SchemaFactory;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
@ -68,7 +59,6 @@ import java.util.UUID;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/**
|
||||
* Provides authorizes requests to resources using policies persisted in a file.
|
||||
|
@ -83,8 +73,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
private static final String USERS_XSD = "/users.xsd";
|
||||
private static final String JAXB_USERS_PATH = "org.apache.nifi.user.generated";
|
||||
|
||||
private static final String FLOW_XSD = "/FlowConfiguration.xsd";
|
||||
|
||||
private static final JAXBContext JAXB_AUTHORIZATIONS_CONTEXT = initializeJaxbContext(JAXB_AUTHORIZATIONS_PATH);
|
||||
private static final JAXBContext JAXB_USERS_CONTEXT = initializeJaxbContext(JAXB_USERS_PATH);
|
||||
|
||||
|
@ -107,7 +95,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
static final String PROP_LEGACY_AUTHORIZED_USERS_FILE = "Legacy Authorized Users File";
|
||||
static final Pattern NODE_IDENTITY_PATTERN = Pattern.compile("Node Identity \\S+");
|
||||
|
||||
private Schema flowSchema;
|
||||
private Schema usersSchema;
|
||||
private Schema authorizationsSchema;
|
||||
private SchemaFactory schemaFactory;
|
||||
|
@ -118,6 +105,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
private String initialAdminIdentity;
|
||||
private String legacyAuthorizedUsersFile;
|
||||
private Set<String> nodeIdentities;
|
||||
private List<PortDTO> ports = new ArrayList<>();
|
||||
|
||||
private final AtomicReference<AuthorizationsHolder> authorizationsHolder = new AtomicReference<>();
|
||||
|
||||
|
@ -127,7 +115,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
|
||||
authorizationsSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(AUTHORIZATIONS_XSD));
|
||||
usersSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(USERS_XSD));
|
||||
flowSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(FLOW_XSD));
|
||||
} catch (Exception e) {
|
||||
throw new AuthorizerCreationException(e);
|
||||
}
|
||||
|
@ -200,7 +187,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
|
||||
logger.info(String.format("Authorizations file loaded at %s", new Date().toString()));
|
||||
|
||||
} catch (IOException | AuthorizerCreationException | JAXBException | IllegalStateException e) {
|
||||
} catch (IOException | AuthorizerCreationException | JAXBException | IllegalStateException | SAXException e) {
|
||||
throw new AuthorizerCreationException(e);
|
||||
}
|
||||
}
|
||||
|
@ -212,7 +199,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
* @throws IOException Unable to sync file with restore
|
||||
* @throws IllegalStateException Unable to sync file with restore
|
||||
*/
|
||||
private synchronized void load() throws JAXBException, IOException, IllegalStateException {
|
||||
private synchronized void load() throws JAXBException, IOException, IllegalStateException, SAXException {
|
||||
// attempt to unmarshal
|
||||
final Unmarshaller unmarshaller = JAXB_AUTHORIZATIONS_CONTEXT.createUnmarshaller();
|
||||
unmarshaller.setSchema(authorizationsSchema);
|
||||
|
@ -237,8 +224,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
|
||||
// if we are starting fresh then we might need to populate an initial admin or convert legacy users
|
||||
if (emptyAuthorizations) {
|
||||
// try to extract the root group id from the flow configuration file specified in nifi.properties
|
||||
rootGroupId = getRootGroupId();
|
||||
parseFlow();
|
||||
|
||||
if (hasInitialAdminIdentity && hasLegacyAuthorizedUsers) {
|
||||
throw new AuthorizerCreationException("Cannot provide an Initial Admin Identity and a Legacy Authorized Users File");
|
||||
|
@ -260,68 +246,17 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Extracts the root group id from the flow configuration file provided in nifi.properties.
|
||||
* Try to parse the flow configuration file to extract the root group id and port information.
|
||||
*
|
||||
* @return the root group id, or null if the files doesn't exist, was empty, or could not be parsed
|
||||
* @throws SAXException if an error occurs creating the schema
|
||||
*/
|
||||
private String getRootGroupId() {
|
||||
final File flowFile = properties.getFlowConfigurationFile();
|
||||
if (flowFile == null) {
|
||||
logger.debug("Flow Configuration file was null");
|
||||
return null;
|
||||
}
|
||||
private void parseFlow() throws SAXException {
|
||||
final FlowParser flowParser = new FlowParser();
|
||||
final FlowInfo flowInfo = flowParser.parse(properties.getFlowConfigurationFile());
|
||||
|
||||
// if the flow doesn't exist or is 0 bytes, then return null
|
||||
final Path flowPath = flowFile.toPath();
|
||||
try {
|
||||
if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
|
||||
logger.debug("Flow Configuration does not exist or was empty");
|
||||
return null;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.debug("An error occurred determining the size of the Flow Configuration file");
|
||||
return null;
|
||||
}
|
||||
|
||||
// otherwise create the appropriate input streams to read the file
|
||||
try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
|
||||
final InputStream gzipIn = new GZIPInputStream(in)) {
|
||||
|
||||
final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
|
||||
if (flowBytes == null || flowBytes.length == 0) {
|
||||
logger.debug("Could not extract root group id because Flow Configuration File was empty");
|
||||
return null;
|
||||
}
|
||||
|
||||
// create validating document builder
|
||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||
docFactory.setNamespaceAware(true);
|
||||
docFactory.setSchema(flowSchema);
|
||||
|
||||
// parse the flow
|
||||
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
|
||||
final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes));
|
||||
|
||||
// extract the root group id
|
||||
final Element rootElement = document.getDocumentElement();
|
||||
|
||||
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
|
||||
if (rootGroupElement == null) {
|
||||
logger.debug("rootGroup element not found in Flow Configuration file");
|
||||
return null;
|
||||
}
|
||||
|
||||
final Element rootGroupIdElement = (Element) rootGroupElement.getElementsByTagName("id").item(0);
|
||||
if (rootGroupIdElement == null) {
|
||||
logger.debug("id element not found under rootGroup in Flow Configuration file");
|
||||
return null;
|
||||
}
|
||||
|
||||
return rootGroupIdElement.getTextContent();
|
||||
|
||||
} catch (final SAXException | ParserConfigurationException | IOException ex) {
|
||||
logger.error("Unable to find root group id in {} due to {}", new Object[] { flowPath.toAbsolutePath(), ex });
|
||||
return null;
|
||||
if (flowInfo != null) {
|
||||
rootGroupId = flowInfo.getRootGroupId();
|
||||
ports = flowInfo.getPorts() == null ? new ArrayList<>() : flowInfo.getPorts();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -352,6 +287,10 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
// grant the user read/write access to the /policies resource
|
||||
addAccessPolicy(authorizations, ResourceType.Policy.getValue(), adminUser.getIdentifier(), READ_CODE);
|
||||
addAccessPolicy(authorizations, ResourceType.Policy.getValue(), adminUser.getIdentifier(), WRITE_CODE);
|
||||
|
||||
// grant the user read/write access to the /controller resource
|
||||
addAccessPolicy(authorizations, ResourceType.Controller.getValue(), adminUser.getIdentifier(), READ_CODE);
|
||||
addAccessPolicy(authorizations, ResourceType.Controller.getValue(), adminUser.getIdentifier(), WRITE_CODE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -428,8 +367,8 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
|
||||
for (org.apache.nifi.user.generated.User legacyUser : users.getUser()) {
|
||||
// create the identifier of the new user based on the DN
|
||||
String legacyUserDn = legacyUser.getDn();
|
||||
String userIdentifier = UUID.nameUUIDFromBytes(legacyUserDn.getBytes(StandardCharsets.UTF_8)).toString();
|
||||
final String legacyUserDn = legacyUser.getDn();
|
||||
final String userIdentifier = UUID.nameUUIDFromBytes(legacyUserDn.getBytes(StandardCharsets.UTF_8)).toString();
|
||||
|
||||
// create the new User and add it to the list of users
|
||||
org.apache.nifi.authorization.file.generated.User user = new org.apache.nifi.authorization.file.generated.User();
|
||||
|
@ -459,29 +398,128 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
|||
roleAccessPolicy.getResource(),
|
||||
roleAccessPolicy.getAction());
|
||||
|
||||
// determine if the user already exists in the policy
|
||||
boolean userExists = false;
|
||||
for (Policy.User policyUser : policy.getUser()) {
|
||||
if (policyUser.getIdentifier().equals(userIdentifier)) {
|
||||
userExists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// add the user to the policy if doesn't already exist
|
||||
if (!userExists) {
|
||||
Policy.User policyUser = new Policy.User();
|
||||
policyUser.setIdentifier(userIdentifier);
|
||||
policy.getUser().add(policyUser);
|
||||
}
|
||||
// add the user to the policy if it doesn't exist
|
||||
addUserToPolicy(userIdentifier, policy);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// convert any access controls on ports to the appropriate policies
|
||||
for (PortDTO portDTO : ports) {
|
||||
final boolean isInputPort = portDTO.getType() != null && portDTO.getType().equals("inputPort");
|
||||
final Resource resource = ResourceFactory.getDataTransferResource(isInputPort, portDTO.getId(), portDTO.getName());
|
||||
|
||||
if (portDTO.getUserAccessControl() != null) {
|
||||
for (String userAccessControl : portDTO.getUserAccessControl()) {
|
||||
// find a user where the identity is the userAccessControl
|
||||
org.apache.nifi.authorization.file.generated.User foundUser = null;
|
||||
for (org.apache.nifi.authorization.file.generated.User jaxbUser : authorizations.getUsers().getUser()) {
|
||||
if (jaxbUser.getIdentity().equals(userAccessControl)) {
|
||||
foundUser = jaxbUser;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// couldn't find the user matching the access control so log a warning and skip
|
||||
if (foundUser == null) {
|
||||
logger.warn("Found port with user access control for {} but no user exists with this identity, skipping...",
|
||||
new Object[] {userAccessControl});
|
||||
continue;
|
||||
}
|
||||
|
||||
// we found the user so create the appropriate policy and add the user to it
|
||||
Policy policy = getOrCreatePolicy(
|
||||
allPolicies,
|
||||
seedIdentity,
|
||||
resource.getIdentifier(),
|
||||
WRITE_CODE);
|
||||
|
||||
addUserToPolicy(foundUser.getIdentifier(), policy);
|
||||
}
|
||||
}
|
||||
|
||||
if (portDTO.getGroupAccessControl() != null) {
|
||||
for (String groupAccessControl : portDTO.getGroupAccessControl()) {
|
||||
// find a group where the name is the groupAccessControl
|
||||
org.apache.nifi.authorization.file.generated.Group foundGroup = null;
|
||||
for (org.apache.nifi.authorization.file.generated.Group jaxbGroup : authorizations.getGroups().getGroup()) {
|
||||
if (jaxbGroup.getName().equals(groupAccessControl)) {
|
||||
foundGroup = jaxbGroup;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// couldn't find the group matching the access control so log a warning and skip
|
||||
if (foundGroup == null) {
|
||||
logger.warn("Found port with group access control for {} but no group exists with this name, skipping...",
|
||||
new Object[] {groupAccessControl});
|
||||
continue;
|
||||
}
|
||||
|
||||
// we found the group so create the appropriate policy and add all the users to it
|
||||
Policy policy = getOrCreatePolicy(
|
||||
allPolicies,
|
||||
seedIdentity,
|
||||
resource.getIdentifier(),
|
||||
WRITE_CODE);
|
||||
|
||||
addGroupToPolicy(foundGroup.getIdentifier(), policy);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
authorizations.getPolicies().getPolicy().addAll(allPolicies);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given user identifier to the policy if it doesn't already exist.
|
||||
*
|
||||
* @param userIdentifier a user identifier
|
||||
* @param policy a policy to add the user to
|
||||
*/
|
||||
private void addUserToPolicy(final String userIdentifier, final Policy policy) {
|
||||
// determine if the user already exists in the policy
|
||||
boolean userExists = false;
|
||||
for (Policy.User policyUser : policy.getUser()) {
|
||||
if (policyUser.getIdentifier().equals(userIdentifier)) {
|
||||
userExists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// add the user to the policy if doesn't already exist
|
||||
if (!userExists) {
|
||||
Policy.User policyUser = new Policy.User();
|
||||
policyUser.setIdentifier(userIdentifier);
|
||||
policy.getUser().add(policyUser);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given group identifier to the policy if it doesn't already exist.
|
||||
*
|
||||
* @param groupIdentifier a group identifier
|
||||
* @param policy a policy to add the user to
|
||||
*/
|
||||
private void addGroupToPolicy(final String groupIdentifier, final Policy policy) {
|
||||
// determine if the group already exists in the policy
|
||||
boolean groupExists = false;
|
||||
for (Policy.Group policyGroup : policy.getGroup()) {
|
||||
if (policyGroup.getIdentifier().equals(groupIdentifier)) {
|
||||
groupExists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// add the group to the policy if doesn't already exist
|
||||
if (!groupExists) {
|
||||
Policy.Group policyGroup = new Policy.Group();
|
||||
policyGroup.setIdentifier(groupIdentifier);
|
||||
policy.getGroup().add(policyGroup);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the Group with the given name, or creates a new one and adds it to Authorizations.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.authorization;
|
||||
|
||||
import org.apache.nifi.web.api.dto.PortDTO;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class FlowInfo {
|
||||
|
||||
private final String rootGroupId;
|
||||
|
||||
private final List<PortDTO> ports;
|
||||
|
||||
public FlowInfo(final String rootGroupId, final List<PortDTO> ports) {
|
||||
this.rootGroupId = rootGroupId;
|
||||
this.ports = (ports == null ? Collections.unmodifiableList(Collections.EMPTY_LIST) :
|
||||
Collections.unmodifiableList(new ArrayList<>(ports)) );
|
||||
}
|
||||
|
||||
public String getRootGroupId() {
|
||||
return rootGroupId;
|
||||
}
|
||||
|
||||
public List<PortDTO> getPorts() {
|
||||
return ports;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.authorization;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
|
||||
import org.apache.nifi.web.api.dto.PortDTO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import javax.xml.XMLConstants;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import javax.xml.validation.Schema;
|
||||
import javax.xml.validation.SchemaFactory;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/**
|
||||
* Parses a flow and returns the root group id and root group ports.
|
||||
*/
|
||||
public class FlowParser {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FlowParser.class);
|
||||
|
||||
private static final String FLOW_XSD = "/FlowConfiguration.xsd";
|
||||
|
||||
private Schema flowSchema;
|
||||
private SchemaFactory schemaFactory;
|
||||
|
||||
public FlowParser() throws SAXException {
|
||||
schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
|
||||
flowSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(FLOW_XSD));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the root group id from the flow configuration file provided in nifi.properties, and extracts
|
||||
* the root group input ports and output ports, and their access controls.
|
||||
*
|
||||
*/
|
||||
public FlowInfo parse(final File flowConfigurationFile) {
|
||||
if (flowConfigurationFile == null) {
|
||||
logger.debug("Flow Configuration file was null");
|
||||
return null;
|
||||
}
|
||||
|
||||
// if the flow doesn't exist or is 0 bytes, then return null
|
||||
final Path flowPath = flowConfigurationFile.toPath();
|
||||
try {
|
||||
if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
|
||||
logger.warn("Flow Configuration does not exist or was empty");
|
||||
return null;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("An error occurred determining the size of the Flow Configuration file");
|
||||
return null;
|
||||
}
|
||||
|
||||
// otherwise create the appropriate input streams to read the file
|
||||
try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
|
||||
final InputStream gzipIn = new GZIPInputStream(in)) {
|
||||
|
||||
final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
|
||||
if (flowBytes == null || flowBytes.length == 0) {
|
||||
logger.warn("Could not extract root group id because Flow Configuration File was empty");
|
||||
return null;
|
||||
}
|
||||
|
||||
// create validating document builder
|
||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||
docFactory.setNamespaceAware(true);
|
||||
docFactory.setSchema(flowSchema);
|
||||
|
||||
// parse the flow
|
||||
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
|
||||
final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes));
|
||||
|
||||
// extract the root group id
|
||||
final Element rootElement = document.getDocumentElement();
|
||||
|
||||
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
|
||||
if (rootGroupElement == null) {
|
||||
logger.warn("rootGroup element not found in Flow Configuration file");
|
||||
return null;
|
||||
}
|
||||
|
||||
final Element rootGroupIdElement = (Element) rootGroupElement.getElementsByTagName("id").item(0);
|
||||
if (rootGroupIdElement == null) {
|
||||
logger.warn("id element not found under rootGroup in Flow Configuration file");
|
||||
return null;
|
||||
}
|
||||
|
||||
final String rootGroupId = rootGroupIdElement.getTextContent();
|
||||
|
||||
final List<PortDTO> ports = new ArrayList<>();
|
||||
ports.addAll(getPorts(rootGroupElement, "inputPort"));
|
||||
ports.addAll(getPorts(rootGroupElement, "outputPort"));
|
||||
|
||||
return new FlowInfo(rootGroupId, ports);
|
||||
|
||||
} catch (final SAXException | ParserConfigurationException | IOException ex) {
|
||||
logger.error("Unable to parse flow {} due to {}", new Object[] { flowPath.toAbsolutePath(), ex });
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the ports that are direct children of the given element.
|
||||
*
|
||||
* @param element the element containing ports
|
||||
* @param type the type of port to find (inputPort or outputPort)
|
||||
* @return a list of PortDTOs representing the found ports
|
||||
*/
|
||||
private List<PortDTO> getPorts(final Element element, final String type) {
|
||||
final List<PortDTO> ports = new ArrayList<>();
|
||||
|
||||
// add input ports
|
||||
final List<Element> portNodeList = getChildrenByTagName(element, type);
|
||||
for (final Element portElement : portNodeList) {
|
||||
final PortDTO portDTO = FlowFromDOMFactory.getPort(portElement);
|
||||
portDTO.setType(type);
|
||||
ports.add(portDTO);
|
||||
}
|
||||
|
||||
return ports;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds child elements with the given tagName.
|
||||
*
|
||||
* @param element the parent element
|
||||
* @param tagName the child element name to find
|
||||
* @return a list of matching child elements
|
||||
*/
|
||||
private static List<Element> getChildrenByTagName(final Element element, final String tagName) {
|
||||
final List<Element> matches = new ArrayList<>();
|
||||
final NodeList nodeList = element.getChildNodes();
|
||||
for (int i = 0; i < nodeList.getLength(); i++) {
|
||||
final Node node = nodeList.item(i);
|
||||
if (!(node instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final Element child = (Element) nodeList.item(i);
|
||||
if (child.getNodeName().equals(tagName)) {
|
||||
matches.add(child);
|
||||
}
|
||||
}
|
||||
|
||||
return matches;
|
||||
}
|
||||
|
||||
}
|
|
@ -111,6 +111,7 @@ public class FileAuthorizerTest {
|
|||
private File primary;
|
||||
private File restore;
|
||||
private File flow;
|
||||
private File flowNoPorts;
|
||||
|
||||
private AuthorizerConfigurationContext configurationContext;
|
||||
|
||||
|
@ -127,6 +128,9 @@ public class FileAuthorizerTest {
|
|||
flow = new File("src/test/resources/flow.xml.gz");
|
||||
FileUtils.ensureDirectoryExistAndCanAccess(flow.getParentFile());
|
||||
|
||||
flowNoPorts = new File("src/test/resources/flow-no-ports.xml.gz");
|
||||
FileUtils.ensureDirectoryExistAndCanAccess(flowNoPorts.getParentFile());
|
||||
|
||||
properties = mock(NiFiProperties.class);
|
||||
when(properties.getRestoreDirectory()).thenReturn(restore.getParentFile());
|
||||
when(properties.getFlowConfigurationFile()).thenReturn(flow);
|
||||
|
@ -165,6 +169,29 @@ public class FileAuthorizerTest {
|
|||
assertNotNull(usersAndAccessPolicies.getAccessPolicy(ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID, RequestAction.WRITE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnConfiguredWhenLegacyUsersFileProvidedAndFlowHasNoPorts() throws Exception {
|
||||
properties = mock(NiFiProperties.class);
|
||||
when(properties.getRestoreDirectory()).thenReturn(restore.getParentFile());
|
||||
when(properties.getFlowConfigurationFile()).thenReturn(flowNoPorts);
|
||||
|
||||
when(configurationContext.getProperty(Mockito.eq(FileAuthorizer.PROP_LEGACY_AUTHORIZED_USERS_FILE)))
|
||||
.thenReturn(new StandardPropertyValue("src/test/resources/authorized-users.xml", null));
|
||||
|
||||
writeAuthorizationsFile(primary, EMPTY_AUTHORIZATIONS_CONCISE);
|
||||
authorizer.onConfigured(configurationContext);
|
||||
|
||||
boolean foundDataTransferPolicy = false;
|
||||
for (AccessPolicy policy : authorizer.getAccessPolicies()) {
|
||||
if (policy.getResource().contains(ResourceType.DataTransfer.name())) {
|
||||
foundDataTransferPolicy = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertFalse(foundDataTransferPolicy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnConfiguredWhenLegacyUsersFileProvided() throws Exception {
|
||||
when(configurationContext.getProperty(Mockito.eq(FileAuthorizer.PROP_LEGACY_AUTHORIZED_USERS_FILE)))
|
||||
|
@ -198,7 +225,8 @@ public class FileAuthorizerTest {
|
|||
// verify one group got created
|
||||
final Set<Group> groups = authorizer.getGroups();
|
||||
assertEquals(1, groups.size());
|
||||
assertEquals("group1", groups.iterator().next().getName());
|
||||
final Group group1 = groups.iterator().next();
|
||||
assertEquals("group1", group1.getName());
|
||||
|
||||
// verify more than one policy got created
|
||||
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
|
||||
|
@ -238,7 +266,7 @@ public class FileAuthorizerTest {
|
|||
|
||||
// verify user4's policies
|
||||
final Map<String,Set<RequestAction>> user4Policies = getResourceActions(policies, user4);
|
||||
assertEquals(5, user4Policies.size());
|
||||
assertEquals(6, user4Policies.size());
|
||||
|
||||
assertTrue(user4Policies.containsKey(ResourceType.Flow.getValue()));
|
||||
assertEquals(1, user4Policies.get(ResourceType.Flow.getValue()).size());
|
||||
|
@ -266,11 +294,25 @@ public class FileAuthorizerTest {
|
|||
|
||||
// verify user6's policies
|
||||
final Map<String,Set<RequestAction>> user6Policies = getResourceActions(policies, user6);
|
||||
assertEquals(2, user6Policies.size());
|
||||
assertEquals(3, user6Policies.size());
|
||||
|
||||
assertTrue(user6Policies.containsKey(ResourceType.SiteToSite.getValue()));
|
||||
assertEquals(2, user6Policies.get(ResourceType.SiteToSite.getValue()).size());
|
||||
assertTrue(user6Policies.get(ResourceType.SiteToSite.getValue()).contains(RequestAction.WRITE));
|
||||
|
||||
final Resource inputPortResource = ResourceFactory.getDataTransferResource(true, "2f7d1606-b090-4be7-a592-a5b70fb55531", "TCP Input");
|
||||
final AccessPolicy inputPortPolicy = authorizer.getUsersAndAccessPolicies().getAccessPolicy(inputPortResource.getIdentifier(), RequestAction.WRITE);
|
||||
assertNotNull(inputPortPolicy);
|
||||
assertEquals(1, inputPortPolicy.getUsers().size());
|
||||
assertTrue(inputPortPolicy.getUsers().contains(user6.getIdentifier()));
|
||||
assertEquals(1, inputPortPolicy.getGroups().size());
|
||||
assertTrue(inputPortPolicy.getGroups().contains(group1.getIdentifier()));
|
||||
|
||||
final Resource outputPortResource = ResourceFactory.getDataTransferResource(false, "2f7d1606-b090-4be7-a592-a5b70fb55532", "TCP Output");
|
||||
final AccessPolicy outputPortPolicy = authorizer.getUsersAndAccessPolicies().getAccessPolicy(outputPortResource.getIdentifier(), RequestAction.WRITE);
|
||||
assertNotNull(outputPortPolicy);
|
||||
assertEquals(1, outputPortPolicy.getUsers().size());
|
||||
assertTrue(outputPortPolicy.getUsers().contains(user4.getIdentifier()));
|
||||
}
|
||||
|
||||
private Map<String,Set<RequestAction>> getResourceActions(final Set<AccessPolicy> policies, final User user) {
|
||||
|
@ -342,7 +384,7 @@ public class FileAuthorizerTest {
|
|||
assertEquals(adminIdentity, adminUser.getIdentity());
|
||||
|
||||
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
|
||||
assertEquals(7, policies.size());
|
||||
assertEquals(9, policies.size());
|
||||
|
||||
final String rootGroupResource = ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID;
|
||||
|
||||
|
@ -379,7 +421,7 @@ public class FileAuthorizerTest {
|
|||
assertEquals(adminIdentity, adminUser.getIdentity());
|
||||
|
||||
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
|
||||
assertEquals(5, policies.size());
|
||||
assertEquals(7, policies.size());
|
||||
|
||||
final String rootGroupResource = ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID;
|
||||
|
||||
|
@ -416,7 +458,7 @@ public class FileAuthorizerTest {
|
|||
assertEquals(adminIdentity, adminUser.getIdentity());
|
||||
|
||||
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
|
||||
assertEquals(5, policies.size());
|
||||
assertEquals(7, policies.size());
|
||||
|
||||
final String rootGroupResource = ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID;
|
||||
|
||||
|
|
Binary file not shown.
Binary file not shown.
|
@ -37,6 +37,8 @@
|
|||
|
||||
- Node Identity [unique key] - The identity of a NiFi cluster node. When clustered, a property for each node
|
||||
should be defined, so that every node knows about every other node. If not clustered these properties can be ignored.
|
||||
The name of each property must be unique, for example for a three node cluster:
|
||||
"Node Identity A", "Node Identity B", "Node Identity C" or "Node Identity 1", "Node Identity 2", "Node Identity 3"
|
||||
-->
|
||||
<authorizer>
|
||||
<identifier>file-provider</identifier>
|
||||
|
@ -44,7 +46,8 @@
|
|||
<property name="Authorizations File">./conf/authorizations.xml</property>
|
||||
<property name="Initial Admin Identity"></property>
|
||||
<property name="Legacy Authorized Users File"></property>
|
||||
<!--
|
||||
|
||||
<!-- Provide the identity (typically a DN) of each node when clustered, see above description of Node Identity.
|
||||
<property name="Node Identity 1"></property>
|
||||
<property name="Node Identity 2"></property>
|
||||
-->
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.nifi.authorization.AuthorizationResult.Result;
|
|||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.authorization.RequestAction;
|
||||
import org.apache.nifi.authorization.Resource;
|
||||
import org.apache.nifi.authorization.UserContextKeys;
|
||||
import org.apache.nifi.authorization.resource.ResourceFactory;
|
||||
import org.apache.nifi.authorization.resource.ResourceType;
|
||||
import org.apache.nifi.authorization.user.NiFiUser;
|
||||
|
@ -77,6 +78,8 @@ import javax.ws.rs.core.UriInfo;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isEmpty;
|
||||
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
|
||||
|
@ -125,6 +128,14 @@ public class DataTransferResource extends ApplicationResource {
|
|||
throw new IllegalArgumentException("The resource must be an Input or Output Port.");
|
||||
}
|
||||
|
||||
final Map<String,String> userContext;
|
||||
if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
|
||||
userContext = new HashMap<>();
|
||||
userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
|
||||
} else {
|
||||
userContext = null;
|
||||
}
|
||||
|
||||
// TODO - use DataTransferAuthorizable after looking up underlying component for consistentency
|
||||
final Resource resource = ResourceFactory.getComponentResource(resourceType, identifier, identifier);
|
||||
final AuthorizationRequest request = new AuthorizationRequest.Builder()
|
||||
|
@ -133,6 +144,7 @@ public class DataTransferResource extends ApplicationResource {
|
|||
.anonymous(user.isAnonymous())
|
||||
.accessAttempt(true)
|
||||
.action(RequestAction.WRITE)
|
||||
.userContext(userContext)
|
||||
.build();
|
||||
|
||||
final AuthorizationResult result = authorizer.authorize(request);
|
||||
|
|
Loading…
Reference in New Issue