NIFI-2061 Added methods to AbstractPolicyBasedAuthorizer to generate and inherit a fingerprint

- Updated StandardFlowSynchronizer to compare authorization fingerprints
- This closes #566
This commit is contained in:
Bryan Bende 2016-06-20 16:58:42 -04:00 committed by Matt Gilman
parent baed85fa3d
commit 4f2643f668
20 changed files with 914 additions and 67 deletions

View File

@ -17,7 +17,26 @@
package org.apache.nifi.authorization;
import org.apache.nifi.authorization.exception.AuthorizationAccessException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
/**
@ -25,6 +44,23 @@ import java.util.Set;
*/
public abstract class AbstractPolicyBasedAuthorizer implements Authorizer {
static final DocumentBuilderFactory DOCUMENT_BUILDER_FACTORY = DocumentBuilderFactory.newInstance();
static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newInstance();
static final String USER_ELEMENT = "user";
static final String USER_GROUP_ELEMENT = "userGroup";
static final String GROUP_ELEMENT = "group";
static final String POLICY_ELEMENT = "policy";
static final String POLICY_USER_ELEMENT = "policyUser";
static final String POLICY_GROUP_ELEMENT = "policyGroup";
static final String IDENTIFIER_ATTR = "identifier";
static final String IDENTITY_ATTR = "identity";
static final String NAME_ATTR = "name";
static final String RESOURCE_ATTR = "resource";
static final String ACTIONS_ATTR = "actions";
public static final String EMPTY_FINGERPRINT = "EMPTY";
@Override
public final AuthorizationResult authorize(AuthorizationRequest request) throws AuthorizationAccessException {
final UsersAndAccessPolicies usersAndAccessPolicies = getUsersAndAccessPolicies();
@ -218,4 +254,261 @@ public abstract class AbstractPolicyBasedAuthorizer implements Authorizer {
*/
public abstract UsersAndAccessPolicies getUsersAndAccessPolicies() throws AuthorizationAccessException;
/**
* Parses the fingerprint and adds any users, groups, and policies to the current Authorizer.
*
* @param fingerprint the fingerprint that was obtained from calling getFingerprint() on another Authorizer.
*/
public final void inheritFingerprint(final String fingerprint) throws AuthorizationAccessException {
if (fingerprint == null || fingerprint.trim().isEmpty()) {
return;
}
final byte[] fingerprintBytes = fingerprint.getBytes(StandardCharsets.UTF_8);
try (final ByteArrayInputStream in = new ByteArrayInputStream(fingerprintBytes)) {
final DocumentBuilder docBuilder = DOCUMENT_BUILDER_FACTORY.newDocumentBuilder();
final Document document = docBuilder.parse(in);
final Element rootElement = document.getDocumentElement();
// parse all the users and add them to the current authorizer
NodeList userNodes = rootElement.getElementsByTagName(USER_ELEMENT);
for (int i=0; i < userNodes.getLength(); i++) {
Node userNode = userNodes.item(i);
User user = parseUser((Element) userNode);
addUser(user);
}
// parse all the groups and add them to the current authorizer
NodeList groupNodes = rootElement.getElementsByTagName(GROUP_ELEMENT);
for (int i=0; i < groupNodes.getLength(); i++) {
Node groupNode = groupNodes.item(i);
Group group = parseGroup((Element) groupNode);
addGroup(group);
}
// parse all the policies and add them to the current authorizer
NodeList policyNodes = rootElement.getElementsByTagName(POLICY_ELEMENT);
for (int i=0; i < policyNodes.getLength(); i++) {
Node policyNode = policyNodes.item(i);
AccessPolicy policy = parsePolicy((Element) policyNode);
addAccessPolicy(policy);
}
} catch (SAXException | ParserConfigurationException | IOException e) {
throw new AuthorizationAccessException("Unable to parse fingerprint", e);
}
}
private User parseUser(final Element element) {
final User.Builder builder = new User.Builder()
.identifier(element.getAttribute(IDENTIFIER_ATTR))
.identity(element.getAttribute(IDENTITY_ATTR));
NodeList userGroups = element.getElementsByTagName(USER_GROUP_ELEMENT);
for (int i=0; i < userGroups.getLength(); i++) {
Element userGroupNode = (Element) userGroups.item(i);
builder.addGroup(userGroupNode.getAttribute(IDENTIFIER_ATTR));
}
return builder.build();
}
private Group parseGroup(final Element element) {
return new Group.Builder()
.identifier(element.getAttribute(IDENTIFIER_ATTR))
.name(element.getAttribute(NAME_ATTR))
.build();
}
private AccessPolicy parsePolicy(final Element element) {
final AccessPolicy.Builder builder = new AccessPolicy.Builder()
.identifier(element.getAttribute(IDENTIFIER_ATTR))
.resource(element.getAttribute(RESOURCE_ATTR));
final String actions = element.getAttribute(ACTIONS_ATTR);
if (actions.contains(RequestAction.READ.name())) {
builder.addAction(RequestAction.READ);
}
if (actions.contains(RequestAction.WRITE.name())) {
builder.addAction(RequestAction.WRITE);
}
NodeList policyUsers = element.getElementsByTagName(POLICY_USER_ELEMENT);
for (int i=0; i < policyUsers.getLength(); i++) {
Element policyUserNode = (Element) policyUsers.item(i);
builder.addUser(policyUserNode.getAttribute(IDENTIFIER_ATTR));
}
NodeList policyGroups = element.getElementsByTagName(POLICY_GROUP_ELEMENT);
for (int i=0; i < policyGroups.getLength(); i++) {
Element policyGroupNode = (Element) policyGroups.item(i);
builder.addGroup(policyGroupNode.getAttribute(IDENTIFIER_ATTR));
}
return builder.build();
}
/**
* Returns a fingerprint representing the authorizations managed by this authorizer. The fingerprint will be
* used for comparison to determine if two policy-based authorizers represent a compatible set of users,
* groups, and policies.
*
* @return the fingerprint for this Authorizer
*/
public final String getFingerprint() throws AuthorizationAccessException {
final List<User> users = getSortedUsers();
final List<Group> groups = getSortedGroups();
final List<AccessPolicy> policies = getSortedAccessPolicies();
// when there are no users, groups, policies we want to always return a simple indicator so
// it can easily be determined when comparing fingerprints
if (users.isEmpty() && groups.isEmpty() && policies.isEmpty()) {
return EMPTY_FINGERPRINT;
}
XMLStreamWriter writer = null;
final StringWriter out = new StringWriter();
try {
writer = XML_OUTPUT_FACTORY.createXMLStreamWriter(out);
writer.writeStartDocument();
writer.writeStartElement("authorizations");
for (User user : users) {
writeUser(writer, user);
}
for (Group group : groups) {
writeGroup(writer, group);
}
for (AccessPolicy policy : policies) {
writePolicy(writer, policy);
}
writer.writeEndElement();
writer.writeEndDocument();
writer.flush();
} catch (XMLStreamException e) {
throw new AuthorizationAccessException("Unable to generate fingerprint", e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (XMLStreamException e) {
// nothing to do here
}
}
}
return out.toString();
}
private void writeUser(final XMLStreamWriter writer, final User user) throws XMLStreamException {
List<String> userGroups = new ArrayList<>(user.getGroups());
Collections.sort(userGroups);
writer.writeStartElement(USER_ELEMENT);
writer.writeAttribute(IDENTIFIER_ATTR, user.getIdentifier());
writer.writeAttribute(IDENTITY_ATTR, user.getIdentity());
for (String userGroup : userGroups) {
writer.writeStartElement(USER_GROUP_ELEMENT);
writer.writeAttribute(IDENTIFIER_ATTR, userGroup);
writer.writeEndElement();
}
writer.writeEndElement();
}
private void writeGroup(final XMLStreamWriter writer, final Group group) throws XMLStreamException {
writer.writeStartElement(GROUP_ELEMENT);
writer.writeAttribute(IDENTIFIER_ATTR, group.getIdentifier());
writer.writeAttribute(NAME_ATTR, group.getName());
writer.writeEndElement();
}
private void writePolicy(final XMLStreamWriter writer, final AccessPolicy policy) throws XMLStreamException {
// build the action string in a deterministic order
StringBuilder actionBuilder = new StringBuilder();
List<RequestAction> actions = getSortedActions(policy);
for (RequestAction action : actions) {
actionBuilder.append(action);
}
// sort the users for the policy
List<String> policyUsers = new ArrayList<>(policy.getUsers());
Collections.sort(policyUsers);
// sort the groups for this policy
List<String> policyGroups = new ArrayList<>(policy.getGroups());
Collections.sort(policyGroups);
writer.writeStartElement(POLICY_ELEMENT);
writer.writeAttribute(IDENTIFIER_ATTR, policy.getIdentifier());
writer.writeAttribute(RESOURCE_ATTR, policy.getResource());
writer.writeAttribute(ACTIONS_ATTR, actionBuilder.toString());
for (String policyUser : policyUsers) {
writer.writeStartElement(POLICY_USER_ELEMENT);
writer.writeAttribute(IDENTIFIER_ATTR, policyUser);
writer.writeEndElement();
}
for (String policyGroup : policyGroups) {
writer.writeStartElement(POLICY_GROUP_ELEMENT);
writer.writeAttribute(IDENTIFIER_ATTR, policyGroup);
writer.writeEndElement();
}
writer.writeEndElement();
}
private List<AccessPolicy> getSortedAccessPolicies() {
final List<AccessPolicy> policies = new ArrayList<>(getAccessPolicies());
Collections.sort(policies, new Comparator<AccessPolicy>() {
@Override
public int compare(AccessPolicy p1, AccessPolicy p2) {
return p1.getIdentifier().compareTo(p2.getIdentifier());
}
});
return policies;
}
private List<Group> getSortedGroups() {
final List<Group> groups = new ArrayList<>(getGroups());
Collections.sort(groups, new Comparator<Group>() {
@Override
public int compare(Group g1, Group g2) {
return g1.getIdentifier().compareTo(g2.getIdentifier());
}
});
return groups;
}
private List<User> getSortedUsers() {
final List<User> users = new ArrayList<>(getUsers());
Collections.sort(users, new Comparator<User>() {
@Override
public int compare(User u1, User u2) {
return u1.getIdentifier().compareTo(u2.getIdentifier());
}
});
return users;
}
private List<RequestAction> getSortedActions(final AccessPolicy policy) {
final List<RequestAction> actions = new ArrayList<>(policy.getActions());
Collections.sort(actions, new Comparator<RequestAction>() {
@Override
public int compare(RequestAction r1, RequestAction r2) {
return r1.name().compareTo(r2.name());
}
});
return actions;
}
}

View File

@ -16,10 +16,15 @@
*/
package org.apache.nifi.authorization;
import org.apache.nifi.authorization.exception.AuthorizationAccessException;
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@ -171,4 +176,268 @@ public class TestAbstractPolicyBasedAuthorizer {
assertEquals(AuthorizationResult.resourceNotFound(), authorizer.authorize(request));
}
@Test
public void testGetFingerprint() {
// create the users, groups, and policies
Group group1 = new Group.Builder().identifier("group-id-1").name("group-1").build();
Group group2 = new Group.Builder().identifier("group-id-2").name("group-2").build();
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").addGroup(group1.getIdentifier()).build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").addGroup(group2.getIdentifier()).build();
AccessPolicy policy1 = new AccessPolicy.Builder()
.identifier("policy-id-1")
.resource("resource1")
.addAction(RequestAction.READ)
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
AccessPolicy policy2 = new AccessPolicy.Builder()
.identifier("policy-id-2")
.resource("resource2")
.addAction(RequestAction.READ)
.addAction(RequestAction.WRITE)
.addGroup(group1.getIdentifier())
.addGroup(group2.getIdentifier())
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
// create the first Authorizer
Set<Group> groups1 = new LinkedHashSet<>();
groups1.add(group1);
groups1.add(group2);
Set<User> users1 = new LinkedHashSet<>();
users1.add(user1);
users1.add(user2);
Set<AccessPolicy> policies1 = new LinkedHashSet<>();
policies1.add(policy1);
policies1.add(policy2);
AbstractPolicyBasedAuthorizer authorizer1 = Mockito.mock(AbstractPolicyBasedAuthorizer.class);
when(authorizer1.getGroups()).thenReturn(groups1);
when(authorizer1.getUsers()).thenReturn(users1);
when(authorizer1.getAccessPolicies()).thenReturn(policies1);
// create the second Authorizer
Set<Group> groups2 = new LinkedHashSet<>();
groups2.add(group2);
groups2.add(group1);
Set<User> users2 = new LinkedHashSet<>();
users2.add(user2);
users2.add(user1);
Set<AccessPolicy> policies2 = new LinkedHashSet<>();
policies2.add(policy2);
policies2.add(policy1);
AbstractPolicyBasedAuthorizer authorizer2 = Mockito.mock(AbstractPolicyBasedAuthorizer.class);
when(authorizer2.getGroups()).thenReturn(groups2);
when(authorizer2.getUsers()).thenReturn(users2);
when(authorizer2.getAccessPolicies()).thenReturn(policies2);
// compare the fingerprints
assertEquals(authorizer1.getFingerprint(), authorizer2.getFingerprint());
System.out.println(authorizer1.getFingerprint());
}
@Test
public void testInheritFingerprint() {
Group group1 = new Group.Builder().identifier("group-id-1").name("group-1").build();
Group group2 = new Group.Builder().identifier("group-id-2").name("group-2").build();
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").addGroup(group1.getIdentifier()).build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build();
AccessPolicy policy1 = new AccessPolicy.Builder()
.identifier("policy-id-1")
.resource("resource1")
.addAction(RequestAction.READ)
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
AccessPolicy policy2 = new AccessPolicy.Builder()
.identifier("policy-id-2")
.resource("resource2")
.addAction(RequestAction.READ)
.addAction(RequestAction.WRITE)
.addGroup(group1.getIdentifier())
.addGroup(group2.getIdentifier())
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
// create the first Authorizer
Set<Group> groups1 = new LinkedHashSet<>();
groups1.add(group1);
groups1.add(group2);
Set<User> users1 = new LinkedHashSet<>();
users1.add(user1);
users1.add(user2);
Set<AccessPolicy> policies1 = new LinkedHashSet<>();
policies1.add(policy1);
policies1.add(policy2);
AbstractPolicyBasedAuthorizer authorizer1 = Mockito.mock(AbstractPolicyBasedAuthorizer.class);
when(authorizer1.getGroups()).thenReturn(groups1);
when(authorizer1.getUsers()).thenReturn(users1);
when(authorizer1.getAccessPolicies()).thenReturn(policies1);
final String fingerprint1 = authorizer1.getFingerprint();
// make a second authorizer using the memory-backed implementation so we can inherit the fingerprint
// and then compute a new fingerprint to compare them
AbstractPolicyBasedAuthorizer authorizer2 = new MemoryPolicyBasedAuthorizer();
authorizer2.inheritFingerprint(fingerprint1);
// computer the fingerprint of the second authorizer and it should be the same as the first
final String fingerprint2 = authorizer2.getFingerprint();
assertEquals(fingerprint1, fingerprint2);
// all the sets should be equal now after inheriting
assertEquals(authorizer1.getUsers(), authorizer2.getUsers());
assertEquals(authorizer1.getGroups(), authorizer2.getGroups());
assertEquals(authorizer1.getAccessPolicies(), authorizer2.getAccessPolicies());
}
@Test
public void testEmptyAuthorizer() {
AbstractPolicyBasedAuthorizer authorizer = Mockito.mock(AbstractPolicyBasedAuthorizer.class);
when(authorizer.getGroups()).thenReturn(new HashSet<Group>());
when(authorizer.getUsers()).thenReturn(new HashSet<User>());
when(authorizer.getAccessPolicies()).thenReturn(new HashSet<AccessPolicy>());
final String fingerprint = authorizer.getFingerprint();
Assert.assertNotNull(fingerprint);
Assert.assertTrue(fingerprint.length() > 0);
}
/**
* An AbstractPolicyBasedAuthorizer that stores everything in memory.
*/
private static final class MemoryPolicyBasedAuthorizer extends AbstractPolicyBasedAuthorizer {
private Set<Group> groups = new HashSet<>();
private Set<User> users = new HashSet<>();
private Set<AccessPolicy> policies = new HashSet<>();
@Override
public Group addGroup(Group group) throws AuthorizationAccessException {
groups.add(group);
return group;
}
@Override
public Group getGroup(String identifier) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public Group updateGroup(Group group) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public Group deleteGroup(Group group) throws AuthorizationAccessException {
groups.remove(group);
return group;
}
@Override
public Set<Group> getGroups() throws AuthorizationAccessException {
return groups;
}
@Override
public User addUser(User user) throws AuthorizationAccessException {
users.add(user);
return user;
}
@Override
public User getUser(String identifier) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public User getUserByIdentity(String identity) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public User updateUser(User user) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public User deleteUser(User user) throws AuthorizationAccessException {
users.remove(user);
return user;
}
@Override
public Set<User> getUsers() throws AuthorizationAccessException {
return users;
}
@Override
public AccessPolicy addAccessPolicy(AccessPolicy accessPolicy) throws AuthorizationAccessException {
policies.add(accessPolicy);
return accessPolicy;
}
@Override
public AccessPolicy getAccessPolicy(String identifier) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public AccessPolicy updateAccessPolicy(AccessPolicy accessPolicy) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public AccessPolicy deleteAccessPolicy(AccessPolicy policy) throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public Set<AccessPolicy> getAccessPolicies() throws AuthorizationAccessException {
return policies;
}
@Override
public UsersAndAccessPolicies getUsersAndAccessPolicies() throws AuthorizationAccessException {
throw new UnsupportedOperationException();
}
@Override
public void initialize(AuthorizerInitializationContext initializationContext) throws AuthorizerCreationException {
}
@Override
public void onConfigured(AuthorizerConfigurationContext configurationContext) throws AuthorizerCreationException {
}
@Override
public void preDestruction() throws AuthorizerDestructionException {
}
}
}

View File

@ -33,26 +33,30 @@ public class StandardDataFlow implements Serializable, DataFlow {
private final byte[] flow;
private final byte[] snippetBytes;
private final byte[] authorizerFingerprint;
/**
* Constructs an instance.
*
* @param flow a valid flow as bytes, which cannot be null
* @param snippetBytes an XML representation of snippets. May be null.
* @param authorizerFingerprint the bytes of the Authorizer's fingerprint. May be null when using an external Authorizer.
*
* @throws NullPointerException if flow is null
*/
public StandardDataFlow(final byte[] flow, final byte[] snippetBytes) {
public StandardDataFlow(final byte[] flow, final byte[] snippetBytes, final byte[] authorizerFingerprint) {
if(flow == null){
throw new NullPointerException("Flow cannot be null");
}
this.flow = flow;
this.snippetBytes = snippetBytes;
this.authorizerFingerprint = authorizerFingerprint;
}
public StandardDataFlow(final DataFlow toCopy) {
this.flow = copy(toCopy.getFlow());
this.snippetBytes = copy(toCopy.getSnippets());
this.authorizerFingerprint = copy(toCopy.getAuthorizerFingerprint());
}
private static byte[] copy(final byte[] bytes) {
@ -69,4 +73,10 @@ public class StandardDataFlow implements Serializable, DataFlow {
public byte[] getSnippets() {
return snippetBytes;
}
@Override
public byte[] getAuthorizerFingerprint() {
return authorizerFingerprint;
}
}

View File

@ -22,6 +22,7 @@ public class AdaptedDataFlow {
private byte[] flow;
private byte[] snippets;
private byte[] authorizerFingerprint;
public byte[] getFlow() {
return flow;
@ -38,4 +39,13 @@ public class AdaptedDataFlow {
public void setSnippets(byte[] snippets) {
this.snippets = snippets;
}
public byte[] getAuthorizerFingerprint() {
return authorizerFingerprint;
}
public void setAuthorizerFingerprint(byte[] authorizerFingerprint) {
this.authorizerFingerprint = authorizerFingerprint;
}
}

View File

@ -32,6 +32,7 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, DataFlow> {
if (df != null) {
aDf.setFlow(df.getFlow());
aDf.setSnippets(df.getSnippets());
aDf.setAuthorizerFingerprint(df.getAuthorizerFingerprint());
}
return aDf;
@ -39,7 +40,7 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, DataFlow> {
@Override
public DataFlow unmarshal(final AdaptedDataFlow aDf) {
final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getSnippets());
final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getSnippets(), aDf.getAuthorizerFingerprint());
return dataFlow;
}

View File

@ -46,7 +46,7 @@ public class TestJaxbProtocolUtils {
final ConnectionResponseMessage msg = new ConnectionResponseMessage();
final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, true);
final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0]);
final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
final List<NodeConnectionStatus> nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
final List<ComponentRevision> componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1")));
msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 9990, 8080, false, "instance-1", nodeStatuses, componentRevisions));

View File

@ -80,7 +80,7 @@ public class TestNodeClusterCoordinator {
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50]);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
}
@ -164,7 +164,7 @@ public class TestNodeClusterCoordinator {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50]);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
@ -228,7 +228,7 @@ public class TestNodeClusterCoordinator {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50]);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);

View File

@ -28,4 +28,10 @@ public interface DataFlow {
*/
public byte[] getSnippets();
/**
* @return the raw byte array of the Authorizer's fingerprint,
* null when not using a sub-class of AbstractPolicyBasedAuthorizer
*/
public byte[] getAuthorizerFingerprint();
}

View File

@ -20,13 +20,13 @@ import com.sun.jersey.api.client.ClientHandlerException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
@ -271,7 +271,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final AtomicReference<CounterRepository> counterRepositoryRef;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final StandardControllerServiceProvider controllerServiceProvider;
private final KeyService keyService;
private final Authorizer authorizer;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
private final ComponentStatusRepository componentStatusRepository;
@ -364,14 +364,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static FlowController createStandaloneInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final KeyService keyService,
final Authorizer authorizer,
final AuditService auditService,
final StringEncryptor encryptor,
final BulletinRepository bulletinRepo) {
return new FlowController(
flowFileEventRepo,
properties,
keyService,
authorizer,
auditService,
encryptor,
/* configuredForClustering */ false,
@ -383,7 +383,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static FlowController createClusteredInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final KeyService keyService,
final Authorizer authorizer,
final AuditService auditService,
final StringEncryptor encryptor,
final NodeProtocolSender protocolSender,
@ -392,7 +392,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final FlowController flowController = new FlowController(
flowFileEventRepo,
properties,
keyService,
authorizer,
auditService,
encryptor,
/* configuredForClustering */ true,
@ -408,7 +408,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private FlowController(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final KeyService keyService,
final Authorizer authorizer,
final AuditService auditService,
final StringEncryptor encryptor,
final boolean configuredForClustering,
@ -470,7 +470,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startConnectablesAfterInitialization = new ArrayList<>();
startRemoteGroupPortsAfterInitialization = new ArrayList<>();
this.keyService = keyService;
this.authorizer = authorizer;
this.auditService = auditService;
final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
@ -1090,6 +1090,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return stateManagerProvider;
}
public Authorizer getAuthorizer() {
return authorizer;
}
/**
* Creates a Port to use as an Input Port for the root Process Group, which is used for Site-to-Site communications
*
@ -1104,7 +1108,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
keyService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
}
/**
@ -1121,7 +1125,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
keyService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
}
/**

View File

@ -17,6 +17,8 @@
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@ -106,6 +108,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private final int gracefulShutdownSeconds;
private final boolean autoResumeState;
private final StringEncryptor encryptor;
private final Authorizer authorizer;
// Lock is used to protect the flow.xml file.
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -143,9 +146,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final FlowController controller,
final NiFiProperties properties,
final StringEncryptor encryptor,
final RevisionManager revisionManager) throws IOException {
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
return new StandardFlowService(controller, properties, null, encryptor, false, null, revisionManager);
return new StandardFlowService(controller, properties, null, encryptor, false, null, revisionManager, authorizer);
}
public static StandardFlowService createClusteredInstance(
@ -154,9 +158,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final NodeProtocolSenderListener senderListener,
final ClusterCoordinator coordinator,
final StringEncryptor encryptor,
final RevisionManager revisionManager) throws IOException {
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
return new StandardFlowService(controller, properties, senderListener, encryptor, true, coordinator, revisionManager);
return new StandardFlowService(controller, properties, senderListener, encryptor, true, coordinator, revisionManager, authorizer);
}
private StandardFlowService(
@ -166,7 +171,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final StringEncryptor encryptor,
final boolean configuredForClustering,
final ClusterCoordinator clusterCoordinator,
final RevisionManager revisionManager) throws IOException {
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
this.controller = controller;
this.encryptor = encryptor;
@ -181,6 +187,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
clusterCoordinator.setFlowService(this);
}
this.revisionManager = revisionManager;
this.authorizer = authorizer;
if (configuredForClustering) {
this.configuredForClustering = configuredForClustering;
@ -544,6 +551,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
private byte[] getAuthorizerFingerprint() {
final boolean isInternalAuthorizer = (authorizer instanceof AbstractPolicyBasedAuthorizer);
return isInternalAuthorizer ? ((AbstractPolicyBasedAuthorizer) authorizer).getFingerprint().getBytes(StandardCharsets.UTF_8) : null;
}
@Override
public StandardDataFlow createDataFlow() throws IOException {
// Load the flow from disk
@ -551,7 +563,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
dao.load(baos);
final byte[] bytes = baos.toByteArray();
final byte[] snippetBytes = controller.getSnippetManager().export();
final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes);
final byte[] authorizerFingerprint = getAuthorizerFingerprint();
final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint);
// Check if the flow from disk is empty. If not, use it.
if (!StandardFlowSynchronizer.isEmpty(fromDisk, encryptor)) {
@ -568,7 +581,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final byte[] flowBytes = baos.toByteArray();
baos.reset();
return new StandardDataFlow(flowBytes, snippetBytes);
return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint);
}
private NodeIdentifier getNodeId() {
@ -648,17 +661,20 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// resolve the given flow (null means load flow from disk)
final DataFlow actualProposedFlow;
final byte[] flowBytes;
final byte[] authorizerFingerprint;
if (proposedFlow == null) {
final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream();
copyCurrentFlow(flowOnDisk);
flowBytes = flowOnDisk.toByteArray();
authorizerFingerprint = getAuthorizerFingerprint();
logger.debug("Loaded Flow from bytes");
} else {
flowBytes = proposedFlow.getFlow();
authorizerFingerprint = proposedFlow.getAuthorizerFingerprint();
logger.debug("Loaded flow from proposed flow");
}
actualProposedFlow = new StandardDataFlow(flowBytes, null);
actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint);
if (firstControllerInitialization) {
// load the controller services

View File

@ -17,6 +17,8 @@
package org.apache.nifi.controller;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.connectable.Connectable;
@ -94,10 +96,12 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -209,21 +213,42 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
logger.trace("Exporting snippets from controller");
final byte[] existingSnippets = controller.getSnippetManager().export();
final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingSnippets);
logger.trace("Getting Authorizer fingerprint from controller");
final byte[] existingAuthFingerprint;
final AbstractPolicyBasedAuthorizer policyBasedAuthorizer;
final Authorizer authorizer = controller.getAuthorizer();
if (authorizer instanceof AbstractPolicyBasedAuthorizer) {
policyBasedAuthorizer = (AbstractPolicyBasedAuthorizer) authorizer;
existingAuthFingerprint = policyBasedAuthorizer.getFingerprint().getBytes(StandardCharsets.UTF_8);
} else {
existingAuthFingerprint = null;
policyBasedAuthorizer = null;
}
final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingSnippets, existingAuthFingerprint);
// check that the proposed flow is inheritable by the controller
try {
if (!existingFlowEmpty) {
logger.trace("Checking flow inheritability");
final String problemInheriting = checkFlowInheritability(existingDataFlow, proposedFlow, controller);
if (problemInheriting != null) {
throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting);
final String problemInheritingFlow = checkFlowInheritability(existingDataFlow, proposedFlow, controller);
if (problemInheritingFlow != null) {
throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheritingFlow);
}
}
} catch (final FingerprintException fe) {
throw new FlowSerializationException("Failed to generate flow fingerprints", fe);
}
logger.trace("Checking authorizer inheritability");
final AuthorizerInheritability authInheritability = checkAuthorizerInheritability(existingDataFlow, proposedFlow);
if (!authInheritability.isInheritable() && authInheritability.getReason() != null) {
throw new UninheritableFlowException("Proposed Authorizer is not inheritable by the flow controller because of Authorizer differences: " + authInheritability.getReason());
}
// create document by parsing proposed flow bytes
logger.trace("Parsing proposed flow bytes as DOM document");
final Document configuration = parseFlowBytes(proposedFlow.getFlow());
@ -308,7 +333,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
logger.debug("Finished synching flows");
// if auths are inheritable and we have a policy based authorizer, then inherit
if (authInheritability.isInheritable() && policyBasedAuthorizer != null) {
logger.trace("Inheriting authorizations");
final String proposedAuthFingerprint = new String(proposedFlow.getAuthorizerFingerprint(), StandardCharsets.UTF_8);
policyBasedAuthorizer.inheritFingerprint(proposedAuthFingerprint);
}
logger.debug("Finished syncing flows");
} catch (final Exception ex) {
throw new FlowSynchronizationException(ex);
}
@ -1084,6 +1116,57 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return processGroup;
}
/**
* If both authorizers are external authorizers, or if the both are internal authorizers with equal fingerprints,
* then an uniheritable result with no reason is returned to indicate nothing to do.
*
* If both are internal authorizers and the current authorizer is empty, then an inheritable result is returned.
*
* All other cases return uninheritable with a reason which indicates to throw an exception.
*
* @param existingFlow the existing DataFlow
* @param proposedFlow the proposed DataFlow
* @return the AuthorizerInheritability result
*/
public AuthorizerInheritability checkAuthorizerInheritability(final DataFlow existingFlow, final DataFlow proposedFlow) {
final byte[] existing = existingFlow.getAuthorizerFingerprint();
final byte[] proposed = proposedFlow.getAuthorizerFingerprint();
// both are using external authorizers so nothing to inherit, but we don't want to throw an exception
if (existing == null && proposed == null) {
return AuthorizerInheritability.uninheritable(null);
}
// current is external, but proposed is internal
if (existing == null && proposed != null) {
return AuthorizerInheritability.uninheritable(
"Current Authorizer is an external Authorizer, but proposed Authorizer is an internal Authorizer");
}
// current is internal, but proposed is external
if (existing != null && proposed == null) {
return AuthorizerInheritability.uninheritable(
"Current Authorizer is an internal Authorizer, but proposed Authorizer is an external Authorizer");
}
// both are internal, but not the same
if (!Arrays.equals(existing, proposed)) {
final byte[] emptyAuthBytes = AbstractPolicyBasedAuthorizer.EMPTY_FINGERPRINT.getBytes(StandardCharsets.UTF_8);
// if current is empty then we can take all the proposed authorizations
// otherwise they are both internal authorizers and don't match so we can't proceed
if (Arrays.equals(emptyAuthBytes, existing)) {
return AuthorizerInheritability.inheritable();
} else {
return AuthorizerInheritability.uninheritable(
"Proposed Authorizations do not match current Authorizations");
}
}
// both are internal and equal
return AuthorizerInheritability.uninheritable(null);
}
/**
* Returns true if the given controller can inherit the proposed flow without orphaning flow files.
*
@ -1200,4 +1283,36 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private String formatFlowDiscrepancy(final String flowFingerprint, final int deltaIndex, final int deltaPad) {
return flowFingerprint.substring(Math.max(0, deltaIndex - deltaPad), Math.min(flowFingerprint.length(), deltaIndex + deltaPad));
}
/**
* Holder for the result of determining if a proposed Authorizer is inheritable.
*/
private static final class AuthorizerInheritability {
private final boolean inheritable;
private final String reason;
public AuthorizerInheritability(boolean inheritable, String reason) {
this.inheritable = inheritable;
this.reason = reason;
}
public boolean isInheritable() {
return inheritable;
}
public String getReason() {
return reason;
}
public static AuthorizerInheritability uninheritable(String reason) {
return new AuthorizerInheritability(false, reason);
}
public static AuthorizerInheritability inheritable() {
return new AuthorizerInheritability(true, null);
}
}
}

View File

@ -17,7 +17,7 @@
package org.apache.nifi.spring;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.controller.FlowController;
@ -39,7 +39,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
private ApplicationContext applicationContext;
private FlowController flowController;
private NiFiProperties properties;
private KeyService keyService;
private Authorizer authorizer;
private AuditService auditService;
private StringEncryptor encryptor;
private BulletinRepository bulletinRepository;
@ -55,7 +55,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
flowController = FlowController.createClusteredInstance(
flowFileEventRepository,
properties,
keyService,
authorizer,
auditService,
encryptor,
nodeProtocolSender,
@ -65,7 +65,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
properties,
keyService,
authorizer,
auditService,
encryptor,
bulletinRepository);
@ -95,8 +95,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
this.properties = properties;
}
public void setKeyService(final KeyService keyService) {
this.keyService = keyService;
public void setAuthorizer(final Authorizer authorizer) {
this.authorizer = authorizer;
}
public void setEncryptor(final StringEncryptor encryptor) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.spring;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
import org.apache.nifi.controller.FlowController;
@ -39,6 +40,7 @@ public class StandardFlowServiceFactoryBean implements FactoryBean, ApplicationC
private FlowService flowService;
private NiFiProperties properties;
private StringEncryptor encryptor;
private Authorizer authorizer;
@Override
public Object getObject() throws Exception {
@ -55,13 +57,15 @@ public class StandardFlowServiceFactoryBean implements FactoryBean, ApplicationC
nodeProtocolSenderListener,
clusterCoordinator,
encryptor,
revisionManager);
revisionManager,
authorizer);
} else {
flowService = StandardFlowService.createStandaloneInstance(
flowController,
properties,
encryptor,
revisionManager);
revisionManager,
authorizer);
}
}
@ -90,4 +94,9 @@ public class StandardFlowServiceFactoryBean implements FactoryBean, ApplicationC
public void setEncryptor(StringEncryptor encryptor) {
this.encryptor = encryptor;
}
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
}

View File

@ -36,7 +36,7 @@
<!-- flow controller -->
<bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="keyService" ref="keyService" />
<property name="authorizer" ref="authorizer" />
<property name="auditService" ref="auditService" />
<property name="encryptor" ref="stringEncryptor" />
<property name="bulletinRepository" ref="bulletinRepository" />
@ -46,6 +46,7 @@
<bean id="flowService" class="org.apache.nifi.spring.StandardFlowServiceFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="encryptor" ref="stringEncryptor" />
<property name="authorizer" ref="authorizer" />
</bean>
<!-- bulletin repository -->

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller
import groovy.xml.XmlUtil
import org.apache.nifi.authorization.Authorizer
import org.apache.nifi.cluster.protocol.DataFlow
import org.apache.nifi.connectable.*
import org.apache.nifi.controller.label.Label
@ -47,6 +48,7 @@ class StandardFlowSynchronizerSpec extends Specification {
def snippetManager = Mock SnippetManager
def bulletinRepository = Mock BulletinRepository
def flowFileQueue = Mock FlowFileQueue
def authorizer = Mock Authorizer
def flowFile = new File(StandardFlowSynchronizerSpec.getResource(filename).toURI())
def flowControllerXml = new XmlSlurper().parse(flowFile)
def Map<String, Position> originalPositionablePositionsById = flowControllerXml.rootGroup.'**'
@ -73,6 +75,7 @@ class StandardFlowSynchronizerSpec extends Specification {
_ * controller.getGroup(_) >> { String id -> positionableMocksById.get(id) }
_ * controller.snippetManager >> snippetManager
_ * controller.bulletinRepository >> bulletinRepository
_ * controller.authorizer >> authorizer
_ * controller./set.*/(*_)
_ * controller.createProcessGroup(_) >> { String pgId ->
def processGroup = Mock(ProcessGroup)
@ -191,6 +194,8 @@ class StandardFlowSynchronizerSpec extends Specification {
_ * proposedFlow.snippets >> {
[] as byte[]
}
_ * proposedFlow.authorizerFingerprint >> null
_ * flowFileQueue./set.*/(*_)
_ * _.hashCode() >> 1
0 * _ // no other mock calls allowed

View File

@ -25,7 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.serialization.FlowSerializationException;
@ -59,7 +59,7 @@ public class StandardFlowServiceTest {
private FlowController flowController;
private NiFiProperties properties;
private FlowFileEventRepository mockFlowFileEventRepository;
private KeyService mockKeyService;
private Authorizer authorizer;
private AuditService mockAuditService;
private StringEncryptor mockEncryptor;
private RevisionManager revisionManager;
@ -73,17 +73,17 @@ public class StandardFlowServiceTest {
public void setup() throws Exception {
properties = NiFiProperties.getInstance();
mockFlowFileEventRepository = mock(FlowFileEventRepository.class);
mockKeyService = mock(KeyService.class);
authorizer = mock(Authorizer.class);
mockAuditService = mock(AuditService.class);
revisionManager = mock(RevisionManager.class);
flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, mockKeyService, mockAuditService, mockEncryptor, new VolatileBulletinRepository());
flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager);
flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor, new VolatileBulletinRepository());
flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer);
}
@Test
public void testLoadWithFlow() throws IOException {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(flowBytes, null));
flowService.load(new StandardDataFlow(flowBytes, null, null));
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -98,16 +98,16 @@ public class StandardFlowServiceTest {
@Test(expected = FlowSerializationException.class)
public void testLoadWithCorruptFlow() throws IOException {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml"));
flowService.load(new StandardDataFlow(flowBytes, null));
flowService.load(new StandardDataFlow(flowBytes, null, null));
}
@Test
public void testLoadExistingFlow() throws IOException {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(flowBytes, null));
flowService.load(new StandardDataFlow(flowBytes, null, null));
flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-inheritable.xml"));
flowService.load(new StandardDataFlow(flowBytes, null));
flowService.load(new StandardDataFlow(flowBytes, null, null));
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -121,11 +121,11 @@ public class StandardFlowServiceTest {
@Test
public void testLoadExistingFlowWithUninheritableFlow() throws IOException {
byte[] originalBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(originalBytes, null));
flowService.load(new StandardDataFlow(originalBytes, null, null));
try {
byte[] updatedBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-uninheritable.xml"));
flowService.load(new StandardDataFlow(updatedBytes, null));
flowService.load(new StandardDataFlow(updatedBytes, null, null));
fail("should have thrown " + UninheritableFlowException.class);
} catch (UninheritableFlowException ufe) {
@ -143,11 +143,11 @@ public class StandardFlowServiceTest {
@Test
public void testLoadExistingFlowWithCorruptFlow() throws IOException {
byte[] originalBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(originalBytes, null));
flowService.load(new StandardDataFlow(originalBytes, null, null));
try {
byte[] updatedBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml"));
flowService.load(new StandardDataFlow(updatedBytes, null));
flowService.load(new StandardDataFlow(updatedBytes, null, null));
fail("should have thrown " + FlowSerializationException.class);
} catch (FlowSerializationException ufe) {

View File

@ -17,13 +17,13 @@
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.AccessPolicy;
import org.apache.nifi.authorization.Group;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.User;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@ -39,16 +39,31 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestFlowController {
private FlowController controller;
private AbstractPolicyBasedAuthorizer authorizer;
private StandardFlowSynchronizer standardFlowSynchronizer;
@Before
public void setup() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
final KeyService keyService = Mockito.mock(KeyService.class);
final AuditService auditService = Mockito.mock(AuditService.class);
final StringEncryptor encryptor = StringEncryptor.createEncryptor();
final NiFiProperties properties = NiFiProperties.getInstance();
@ -56,8 +71,52 @@ public class TestFlowController {
properties.setProperty("nifi.remote.input.socket.port", "");
properties.setProperty("nifi.remote.input.secure", "");
Group group1 = new Group.Builder().identifier("group-id-1").name("group-1").build();
Group group2 = new Group.Builder().identifier("group-id-2").name("group-2").build();
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").addGroup(group1.getIdentifier()).build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build();
AccessPolicy policy1 = new AccessPolicy.Builder()
.identifier("policy-id-1")
.resource("resource1")
.addAction(RequestAction.READ)
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
AccessPolicy policy2 = new AccessPolicy.Builder()
.identifier("policy-id-2")
.resource("resource2")
.addAction(RequestAction.READ)
.addAction(RequestAction.WRITE)
.addGroup(group1.getIdentifier())
.addGroup(group2.getIdentifier())
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
Set<Group> groups1 = new LinkedHashSet<>();
groups1.add(group1);
groups1.add(group2);
Set<User> users1 = new LinkedHashSet<>();
users1.add(user1);
users1.add(user2);
Set<AccessPolicy> policies1 = new LinkedHashSet<>();
policies1.add(policy1);
policies1.add(policy2);
authorizer = Mockito.mock(AbstractPolicyBasedAuthorizer.class);
when(authorizer.getGroups()).thenReturn(groups1);
when(authorizer.getUsers()).thenReturn(users1);
when(authorizer.getAccessPolicies()).thenReturn(policies1);
final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, keyService, auditService, encryptor, bulletinRepo);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, authorizer, auditService, encryptor, bulletinRepo);
standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor());
}
@After
@ -65,6 +124,55 @@ public class TestFlowController {
controller.shutdown(true);
}
@Test
public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
// had a problem verifying the call to inheritFingerprint didn't happen, so just verify none of the add methods got called
verify(authorizer, times(0)).addUser(any(User.class));
verify(authorizer, times(0)).addGroup(any(Group.class));
verify(authorizer, times(0)).addAccessPolicy(any(AccessPolicy.class));
}
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenAuthorizationsAreDifferent() {
// create a mock proposed data flow with different auth fingerprint as the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
}
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() {
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
}
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
// reset the authorizer so it returns empty fingerprint
when(authorizer.getUsers()).thenReturn(new HashSet<User>());
when(authorizer.getGroups()).thenReturn(new HashSet<Group>());
when(authorizer.getAccessPolicies()).thenReturn(new HashSet<AccessPolicy>());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
verify(authorizer, times(1)).inheritFingerprint(authFingerprint);
}
@Test
public void testCreateMissingProcessor() throws ProcessorInstantiationException {
final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor");

View File

@ -39,10 +39,10 @@ import java.util.concurrent.locks.LockSupport;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -661,7 +661,7 @@ public class TestProcessorLifecycle {
properties.setProperty("nifi.remote.input.secure", "");
return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), properties,
mock(KeyService.class), mock(AuditService.class), null, new VolatileBulletinRepository());
mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository());
}
/**

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.remote;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
@ -71,7 +71,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
private final AtomicReference<Set<String>> userAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
private final ProcessScheduler processScheduler;
private final boolean secure;
private final KeyService keyService;
private final Authorizer authorizer;
@SuppressWarnings("unused")
private final BulletinRepository bulletinRepository;
private final EventReporter eventReporter;
@ -85,13 +85,13 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
private boolean shutdown = false; // guarded by requestLock
public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup,
final TransferDirection direction, final ConnectableType type, final KeyService keyService,
final TransferDirection direction, final ConnectableType type, final Authorizer authorizer,
final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure) {
super(id, name, processGroup, type, scheduler);
this.processScheduler = scheduler;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
this.keyService = keyService;
this.authorizer = authorizer;
this.secure = secure;
this.bulletinRepository = bulletinRepository;
this.scheduler = scheduler;

View File

@ -25,7 +25,7 @@ import java.lang.reflect.Modifier;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.util.CapturingLogger;
@ -136,6 +136,6 @@ public class MonitorMemoryTest {
properties.setProperty("nifi.remote.input.secure", "");
return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), properties,
mock(KeyService.class), mock(AuditService.class), null, null);
mock(Authorizer.class), mock(AuditService.class), null, null);
}
}