mirror of https://github.com/apache/nifi.git
NIFI-3133: - Ensuring that Remote Group Ports are always authorized according to their Remote Process Group.
NIFI-3133: - Using getSourceAuthorizable() when accessing flow files and content. NIFI-3133: - Decouple local and remote connectable's to avoid ambiguity with self referencing RPGs. NIFI-3133: - Addressing comments from the PR. NIFI-3133: - Fixed check verifying source/destination when creating a connection. NIFI-3133: - Only showing the go to link when the source component is not a remote port. NIFI-3133: - Removing unnecessary checking of remote group port authorization since it's handled by the parent RPG. NIFI-3133: - Fixing issue showing the connection details dialog when the source component is a RPG. NIFI-3133: - Ensuring the local connectable was found. This closes #1297 Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
parent
fc0d336f36
commit
2d6bba080f
|
@ -24,6 +24,9 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public interface ProvenanceEventRecord {
|
public interface ProvenanceEventRecord {
|
||||||
|
|
||||||
|
String REMOTE_INPUT_PORT_TYPE = "Remote Input Port";
|
||||||
|
String REMOTE_OUTPUT_PORT_TYPE = "Remote Output Port";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a unique ID for this Provenance Event. Depending on the
|
* @return a unique ID for this Provenance Event. Depending on the
|
||||||
* implementation, the Event ID may be set to -1 until the event has been
|
* implementation, the Event ID may be set to -1 until the event has been
|
||||||
|
@ -100,6 +103,14 @@ public interface ProvenanceEventRecord {
|
||||||
*/
|
*/
|
||||||
String getComponentType();
|
String getComponentType();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether this event originated from a remote group port
|
||||||
|
*/
|
||||||
|
default boolean isRemotePortType() {
|
||||||
|
final String componentType = getComponentType();
|
||||||
|
return REMOTE_INPUT_PORT_TYPE.equals(componentType) || REMOTE_OUTPUT_PORT_TYPE.equals(componentType);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a URI that provides information about the System and Protocol
|
* @return a URI that provides information about the System and Protocol
|
||||||
* information over which the transfer occurred. The intent of this field is
|
* information over which the transfer occurred. The intent of this field is
|
||||||
|
|
|
@ -31,6 +31,15 @@ public interface ProvenanceAuthorizableFactory {
|
||||||
* @return the Authorizable that can be use to authorize access to provenance events
|
* @return the Authorizable that can be use to authorize access to provenance events
|
||||||
* @throws ResourceNotFoundException if no component can be found with the given ID
|
* @throws ResourceNotFoundException if no component can be found with the given ID
|
||||||
*/
|
*/
|
||||||
Authorizable createDataAuthorizable(String componentId);
|
Authorizable createLocalDataAuthorizable(String componentId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates an Authorizable object for the Data of the remote group port with the given ID.
|
||||||
|
*
|
||||||
|
* @param remoteGroupPortId the ID of the remote group port to which the data belongs
|
||||||
|
* @return the Authorizable that can be used to authorize access to provenance events
|
||||||
|
* @throws ResourceNotFoundException if no component can be found with the given ID
|
||||||
|
*/
|
||||||
|
Authorizable createRemoteDataAuthorizable(String remoteGroupPortId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.connectable;
|
package org.apache.nifi.connectable;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.nifi.authorization.resource.Authorizable;
|
import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||||
|
@ -27,6 +23,10 @@ import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.processor.FlowFileFilter;
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public interface Connection extends Authorizable {
|
public interface Connection extends Authorizable {
|
||||||
|
|
||||||
void enqueue(FlowFileRecord flowFile);
|
void enqueue(FlowFileRecord flowFile);
|
||||||
|
@ -35,6 +35,8 @@ public interface Connection extends Authorizable {
|
||||||
|
|
||||||
Connectable getDestination();
|
Connectable getDestination();
|
||||||
|
|
||||||
|
Authorizable getDestinationAuthorizable();
|
||||||
|
|
||||||
Collection<Relationship> getRelationships();
|
Collection<Relationship> getRelationships();
|
||||||
|
|
||||||
FlowFileQueue getFlowFileQueue();
|
FlowFileQueue getFlowFileQueue();
|
||||||
|
@ -59,6 +61,8 @@ public interface Connection extends Authorizable {
|
||||||
|
|
||||||
Connectable getSource();
|
Connectable getSource();
|
||||||
|
|
||||||
|
Authorizable getSourceAuthorizable();
|
||||||
|
|
||||||
void setRelationships(Collection<Relationship> newRelationships);
|
void setRelationships(Collection<Relationship> newRelationships);
|
||||||
|
|
||||||
void setDestination(final Connectable newDestination);
|
void setDestination(final Connectable newDestination);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.nifi.controller.label.Label;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.Processor;
|
import org.apache.nifi.processor.Processor;
|
||||||
|
import org.apache.nifi.remote.RemoteGroupPort;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -731,9 +732,16 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
|
||||||
* @param identifier of connectable
|
* @param identifier of connectable
|
||||||
* @return the Connectable with the given ID, if it exists; otherwise
|
* @return the Connectable with the given ID, if it exists; otherwise
|
||||||
* returns null. This performs a recursive search of all ProcessGroups'
|
* returns null. This performs a recursive search of all ProcessGroups'
|
||||||
* input ports, output ports, funnels, processors, and remote process groups
|
* input ports, output ports, funnels, processors
|
||||||
*/
|
*/
|
||||||
Connectable findConnectable(String identifier);
|
Connectable findLocalConnectable(String identifier);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param identifier of remote group port
|
||||||
|
* @return the RemoteGroupPort with the given ID, if it exists; otherwise
|
||||||
|
* returns null.
|
||||||
|
*/
|
||||||
|
RemoteGroupPort findRemoteGroupPort(String identifier);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a Set of all {@link org.apache.nifi.connectable.Positionable}s contained within this
|
* @return a Set of all {@link org.apache.nifi.connectable.Positionable}s contained within this
|
||||||
|
|
|
@ -16,17 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.connectable;
|
package org.apache.nifi.connectable;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||||
|
@ -51,6 +40,19 @@ import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.processor.FlowFileFilter;
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventRepository;
|
import org.apache.nifi.provenance.ProvenanceEventRepository;
|
||||||
|
import org.apache.nifi.remote.RemoteGroupPort;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Models a connection between connectable components. A connection may contain
|
* Models a connection between connectable components. A connection may contain
|
||||||
|
@ -137,6 +139,36 @@ public final class StandardConnection implements Connection {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Authorizable getSourceAuthorizable() {
|
||||||
|
final Connectable sourceConnectable = getSource();
|
||||||
|
final Authorizable sourceAuthorizable;
|
||||||
|
|
||||||
|
// if the source is a remote group port, authorize according to the RPG
|
||||||
|
if (sourceConnectable instanceof RemoteGroupPort) {
|
||||||
|
sourceAuthorizable = ((RemoteGroupPort) sourceConnectable).getRemoteProcessGroup();
|
||||||
|
} else {
|
||||||
|
sourceAuthorizable = sourceConnectable;
|
||||||
|
}
|
||||||
|
|
||||||
|
return sourceAuthorizable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Authorizable getDestinationAuthorizable() {
|
||||||
|
final Connectable destinationConnectable = getDestination();
|
||||||
|
final Authorizable destinationAuthorizable;
|
||||||
|
|
||||||
|
// if the destination is a remote group port, authorize according to the RPG
|
||||||
|
if (destinationConnectable instanceof RemoteGroupPort) {
|
||||||
|
destinationAuthorizable = ((RemoteGroupPort) destinationConnectable).getRemoteProcessGroup();
|
||||||
|
} else {
|
||||||
|
destinationAuthorizable = destinationConnectable;
|
||||||
|
}
|
||||||
|
|
||||||
|
return destinationAuthorizable;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) {
|
public AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) {
|
||||||
if (user == null) {
|
if (user == null) {
|
||||||
|
@ -144,13 +176,13 @@ public final class StandardConnection implements Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check the source
|
// check the source
|
||||||
final AuthorizationResult sourceResult = getSource().checkAuthorization(authorizer, action, user, resourceContext);
|
final AuthorizationResult sourceResult = getSourceAuthorizable().checkAuthorization(authorizer, action, user, resourceContext);
|
||||||
if (Result.Denied.equals(sourceResult.getResult())) {
|
if (Result.Denied.equals(sourceResult.getResult())) {
|
||||||
return sourceResult;
|
return sourceResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check the destination
|
// check the destination
|
||||||
return getDestination().checkAuthorization(authorizer, action, user, resourceContext);
|
return getDestinationAuthorizable().checkAuthorization(authorizer, action, user, resourceContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -159,8 +191,8 @@ public final class StandardConnection implements Connection {
|
||||||
throw new AccessDeniedException("Unknown user");
|
throw new AccessDeniedException("Unknown user");
|
||||||
}
|
}
|
||||||
|
|
||||||
getSource().authorize(authorizer, action, user, resourceContext);
|
getSourceAuthorizable().authorize(authorizer, action, user, resourceContext);
|
||||||
getDestination().authorize(authorizer, action, user, resourceContext);
|
getDestinationAuthorizable().authorize(authorizer, action, user, resourceContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -4010,7 +4010,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Authorizable createDataAuthorizable(final String componentId) {
|
public Authorizable createLocalDataAuthorizable(final String componentId) {
|
||||||
final String rootGroupId = getRootGroupId();
|
final String rootGroupId = getRootGroupId();
|
||||||
|
|
||||||
// Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
|
// Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
|
||||||
|
@ -4022,7 +4022,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
authorizable = new DataAuthorizable(getRootGroup());
|
authorizable = new DataAuthorizable(getRootGroup());
|
||||||
} else {
|
} else {
|
||||||
// check if the component is a connectable, this should be the case most often
|
// check if the component is a connectable, this should be the case most often
|
||||||
final Connectable connectable = getRootGroup().findConnectable(componentId);
|
final Connectable connectable = getRootGroup().findLocalConnectable(componentId);
|
||||||
if (connectable == null) {
|
if (connectable == null) {
|
||||||
// if the component id is not a connectable then consider a connection
|
// if the component id is not a connectable then consider a connection
|
||||||
final Connection connection = getRootGroup().findConnection(componentId);
|
final Connection connection = getRootGroup().findConnection(componentId);
|
||||||
|
@ -4041,6 +4041,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
return authorizable;
|
return authorizable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Authorizable createRemoteDataAuthorizable(String remoteGroupPortId) {
|
||||||
|
final DataAuthorizable authorizable;
|
||||||
|
|
||||||
|
final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(remoteGroupPortId);
|
||||||
|
if (remoteGroupPort == null) {
|
||||||
|
throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
|
||||||
|
} else {
|
||||||
|
// authorizable for remote group ports should be the remote process group
|
||||||
|
authorizable = new DataAuthorizable(remoteGroupPort.getRemoteProcessGroup());
|
||||||
|
}
|
||||||
|
|
||||||
|
return authorizable;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Action> getFlowChanges(final int firstActionId, final int maxActions) {
|
public List<Action> getFlowChanges(final int firstActionId, final int maxActions) {
|
||||||
final History history = auditService.getActions(firstActionId, maxActions);
|
final History history = auditService.getActions(firstActionId, maxActions);
|
||||||
|
|
|
@ -16,11 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.controller.reporting;
|
package org.apache.nifi.controller.reporting;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.nifi.attribute.expression.language.PreparedQuery;
|
import org.apache.nifi.attribute.expression.language.PreparedQuery;
|
||||||
import org.apache.nifi.attribute.expression.language.Query;
|
import org.apache.nifi.attribute.expression.language.Query;
|
||||||
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
|
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
|
||||||
|
@ -43,6 +38,11 @@ import org.apache.nifi.reporting.ReportingContext;
|
||||||
import org.apache.nifi.reporting.ReportingTask;
|
import org.apache.nifi.reporting.ReportingTask;
|
||||||
import org.apache.nifi.reporting.Severity;
|
import org.apache.nifi.reporting.Severity;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class StandardReportingContext implements ReportingContext, ControllerServiceLookup {
|
public class StandardReportingContext implements ReportingContext, ControllerServiceLookup {
|
||||||
|
|
||||||
private final FlowController flowController;
|
private final FlowController flowController;
|
||||||
|
@ -95,7 +95,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
|
||||||
@Override
|
@Override
|
||||||
public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
|
public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
|
||||||
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
|
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
|
||||||
final Connectable connectable = rootGroup.findConnectable(componentId);
|
final Connectable connectable = rootGroup.findLocalConnectable(componentId);
|
||||||
if (connectable == null) {
|
if (connectable == null) {
|
||||||
throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
|
throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,31 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.controller.repository;
|
package org.apache.nifi.controller.repository;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.EOFException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.NoSuchElementException;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.nifi.connectable.Connectable;
|
import org.apache.nifi.connectable.Connectable;
|
||||||
import org.apache.nifi.connectable.Connection;
|
import org.apache.nifi.connectable.Connection;
|
||||||
|
@ -79,6 +54,31 @@ import org.apache.nifi.stream.io.StreamUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Provides a ProcessSession that ensures all accesses, changes and transfers
|
* Provides a ProcessSession that ensures all accesses, changes and transfers
|
||||||
|
@ -164,10 +164,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
componentType = "Output Port";
|
componentType = "Output Port";
|
||||||
break;
|
break;
|
||||||
case REMOTE_INPUT_PORT:
|
case REMOTE_INPUT_PORT:
|
||||||
componentType = "Remote Input Port";
|
componentType = ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE;
|
||||||
break;
|
break;
|
||||||
case REMOTE_OUTPUT_PORT:
|
case REMOTE_OUTPUT_PORT:
|
||||||
componentType = "Remote Output Port";
|
componentType = ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE;
|
||||||
break;
|
break;
|
||||||
case FUNNEL:
|
case FUNNEL:
|
||||||
componentType = "Funnel";
|
componentType = "Funnel";
|
||||||
|
|
|
@ -1482,11 +1482,11 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Connectable findConnectable(final String identifier) {
|
public Connectable findLocalConnectable(final String identifier) {
|
||||||
return findConnectable(identifier, this);
|
return findLocalConnectable(identifier, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Connectable findConnectable(final String identifier, final ProcessGroup group) {
|
private static Connectable findLocalConnectable(final String identifier, final ProcessGroup group) {
|
||||||
final ProcessorNode procNode = group.getProcessor(identifier);
|
final ProcessorNode procNode = group.getProcessor(identifier);
|
||||||
if (procNode != null) {
|
if (procNode != null) {
|
||||||
return procNode;
|
return procNode;
|
||||||
|
@ -1507,6 +1507,21 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
return funnel;
|
return funnel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
||||||
|
final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup);
|
||||||
|
if (childGroupConnectable != null) {
|
||||||
|
return childGroupConnectable;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteGroupPort findRemoteGroupPort(final String identifier) {
|
||||||
|
return findRemoteGroupPort(identifier, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RemoteGroupPort findRemoteGroupPort(final String identifier, final ProcessGroup group) {
|
||||||
for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
|
for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
|
||||||
final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
|
final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
|
||||||
if (remoteInPort != null) {
|
if (remoteInPort != null) {
|
||||||
|
@ -1520,9 +1535,9 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
||||||
final Connectable childGroupConnectable = findConnectable(identifier, childGroup);
|
final RemoteGroupPort childGroupRemoteGroupPort = findRemoteGroupPort(identifier, childGroup);
|
||||||
if (childGroupConnectable != null) {
|
if (childGroupRemoteGroupPort != null) {
|
||||||
return childGroupConnectable;
|
return childGroupRemoteGroupPort;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.groups.ProcessGroupCounts;
|
import org.apache.nifi.groups.ProcessGroupCounts;
|
||||||
import org.apache.nifi.groups.RemoteProcessGroup;
|
import org.apache.nifi.groups.RemoteProcessGroup;
|
||||||
|
import org.apache.nifi.remote.RemoteGroupPort;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -508,7 +509,12 @@ public class MockProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Connectable findConnectable(final String identifier) {
|
public Connectable findLocalConnectable(final String identifier) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteGroupPort findRemoteGroupPort(String identifier) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,24 +115,6 @@ public interface AuthorizableLookup {
|
||||||
*/
|
*/
|
||||||
Authorizable getRemoteProcessGroup(String id);
|
Authorizable getRemoteProcessGroup(String id);
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the authorizable RemoteProcessGroup input port.
|
|
||||||
*
|
|
||||||
* @param remoteProcessGroupId remote process group id
|
|
||||||
* @param id input port id
|
|
||||||
* @return authorizable
|
|
||||||
*/
|
|
||||||
Authorizable getRemoteProcessGroupInputPort(String remoteProcessGroupId, String id);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the authorizable RemoteProcessGroup output port.
|
|
||||||
*
|
|
||||||
* @param remoteProcessGroupId remote process group id
|
|
||||||
* @param id output port id
|
|
||||||
* @return authorizable
|
|
||||||
*/
|
|
||||||
Authorizable getRemoteProcessGroupOutputPort(String remoteProcessGroupId, String id);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the authorizable Label.
|
* Get the authorizable Label.
|
||||||
*
|
*
|
||||||
|
@ -203,12 +185,12 @@ public interface AuthorizableLookup {
|
||||||
TemplateAuthorizable getTemplate(String id);
|
TemplateAuthorizable getTemplate(String id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the authorizable connectable.
|
* Get the authorizable connectable. Note this does not include RemoteGroupPorts.
|
||||||
*
|
*
|
||||||
* @param id connectable id
|
* @param id connectable id
|
||||||
* @return authorizable
|
* @return authorizable
|
||||||
*/
|
*/
|
||||||
Authorizable getConnectable(String id);
|
Authorizable getLocalConnectable(String id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the snippet of authorizable's.
|
* Get the snippet of authorizable's.
|
||||||
|
@ -224,13 +206,6 @@ public interface AuthorizableLookup {
|
||||||
*/
|
*/
|
||||||
Authorizable getTenant();
|
Authorizable getTenant();
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the authorizable for data of a specified component.
|
|
||||||
*
|
|
||||||
* @return authorizable
|
|
||||||
*/
|
|
||||||
Authorizable getData(String id);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the authorizable for access all policies.
|
* Get the authorizable for access all policies.
|
||||||
*
|
*
|
||||||
|
|
|
@ -38,6 +38,13 @@ public interface ConnectionAuthorizable {
|
||||||
*/
|
*/
|
||||||
Connectable getSource();
|
Connectable getSource();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the data authorizable for the source of the connection.
|
||||||
|
*
|
||||||
|
* @return source data authorizable
|
||||||
|
*/
|
||||||
|
Authorizable getSourceData();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the destination.
|
* Returns the destination.
|
||||||
*
|
*
|
||||||
|
@ -45,6 +52,13 @@ public interface ConnectionAuthorizable {
|
||||||
*/
|
*/
|
||||||
Connectable getDestination();
|
Connectable getDestination();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the data authorizable for the destination of the connection.
|
||||||
|
*
|
||||||
|
* @return destination data authorizable
|
||||||
|
*/
|
||||||
|
Authorizable getDestinationData();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the parent process group.
|
* Returns the parent process group.
|
||||||
*
|
*
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.nifi.controller.Template;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceReference;
|
import org.apache.nifi.controller.service.ControllerServiceReference;
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.groups.RemoteProcessGroup;
|
|
||||||
import org.apache.nifi.remote.PortAuthorizationResult;
|
import org.apache.nifi.remote.PortAuthorizationResult;
|
||||||
import org.apache.nifi.remote.RootGroupPort;
|
import org.apache.nifi.remote.RootGroupPort;
|
||||||
import org.apache.nifi.web.ResourceNotFoundException;
|
import org.apache.nifi.web.ResourceNotFoundException;
|
||||||
|
@ -239,18 +238,6 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
|
||||||
return remoteProcessGroupDAO.getRemoteProcessGroup(id);
|
return remoteProcessGroupDAO.getRemoteProcessGroup(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Authorizable getRemoteProcessGroupInputPort(final String remoteProcessGroupId, final String id) {
|
|
||||||
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
|
|
||||||
return remoteProcessGroup.getInputPort(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Authorizable getRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final String id) {
|
|
||||||
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
|
|
||||||
return remoteProcessGroup.getOutputPort(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Authorizable getLabel(final String id) {
|
public Authorizable getLabel(final String id) {
|
||||||
return labelDAO.getLabel(id);
|
return labelDAO.getLabel(id);
|
||||||
|
@ -413,11 +400,6 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
|
||||||
return TENANT_AUTHORIZABLE;
|
return TENANT_AUTHORIZABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Authorizable getData(final String id) {
|
|
||||||
return controllerFacade.getDataAuthorizable(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Authorizable getPolicies() {
|
public Authorizable getPolicies() {
|
||||||
return POLICIES_AUTHORIZABLE;
|
return POLICIES_AUTHORIZABLE;
|
||||||
|
@ -525,9 +507,6 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
|
||||||
case Template:
|
case Template:
|
||||||
authorizable = getTemplate(componentId).getAuthorizable();
|
authorizable = getTemplate(componentId).getAuthorizable();
|
||||||
break;
|
break;
|
||||||
case Data:
|
|
||||||
authorizable = controllerFacade.getDataAuthorizable(componentId);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (authorizable == null) {
|
if (authorizable == null) {
|
||||||
|
@ -681,9 +660,15 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Authorizable getConnectable(String id) {
|
public Authorizable getLocalConnectable(String id) {
|
||||||
final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
|
final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
|
||||||
return group.findConnectable(id);
|
final Connectable connectable = group.findLocalConnectable(id);
|
||||||
|
|
||||||
|
if (connectable == null) {
|
||||||
|
throw new ResourceNotFoundException("Unable to find component with id " + id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return connectable;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -888,11 +873,21 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
|
||||||
return connection.getSource();
|
return connection.getSource();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Authorizable getSourceData() {
|
||||||
|
return new DataAuthorizable(connection.getSourceAuthorizable());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Connectable getDestination() {
|
public Connectable getDestination() {
|
||||||
return connection.getDestination();
|
return connection.getDestination();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Authorizable getDestinationData() {
|
||||||
|
return new DataAuthorizable(connection.getDestinationAuthorizable());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProcessGroup getParentGroup() {
|
public ProcessGroup getParentGroup() {
|
||||||
return connection.getProcessGroup();
|
return connection.getProcessGroup();
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.nifi.authorization.RequestAction;
|
||||||
import org.apache.nifi.authorization.resource.Authorizable;
|
import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
import org.apache.nifi.connectable.Connectable;
|
import org.apache.nifi.connectable.Connectable;
|
||||||
|
import org.apache.nifi.connectable.ConnectableType;
|
||||||
import org.apache.nifi.web.NiFiServiceFacade;
|
import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
import org.apache.nifi.web.Revision;
|
import org.apache.nifi.web.Revision;
|
||||||
import org.apache.nifi.web.api.dto.ConnectionDTO;
|
import org.apache.nifi.web.api.dto.ConnectionDTO;
|
||||||
|
@ -203,10 +204,16 @@ public class ConnectionResource extends ApplicationResource {
|
||||||
+ "requested resource (%s).", requestConnection.getId(), id));
|
+ "requested resource (%s).", requestConnection.getId(), id));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requestConnection.getDestination() != null && requestConnection.getDestination().getId() == null) {
|
if (requestConnection.getDestination() != null) {
|
||||||
|
if (requestConnection.getDestination().getId() == null) {
|
||||||
throw new IllegalArgumentException("When specifying a destination component, the destination id is required.");
|
throw new IllegalArgumentException("When specifying a destination component, the destination id is required.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (requestConnection.getDestination().getType() == null) {
|
||||||
|
throw new IllegalArgumentException("When specifying a destination component, the type of the destination is required.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (isReplicateRequest()) {
|
if (isReplicateRequest()) {
|
||||||
return replicate(HttpMethod.PUT, requestConnectionEntity);
|
return replicate(HttpMethod.PUT, requestConnectionEntity);
|
||||||
}
|
}
|
||||||
|
@ -224,9 +231,23 @@ public class ConnectionResource extends ApplicationResource {
|
||||||
// if a destination has been specified and is different
|
// if a destination has been specified and is different
|
||||||
final Connectable currentDestination = connAuth.getDestination();
|
final Connectable currentDestination = connAuth.getDestination();
|
||||||
if (requestConnection.getDestination() != null && !currentDestination.getIdentifier().equals(requestConnection.getDestination().getId())) {
|
if (requestConnection.getDestination() != null && !currentDestination.getIdentifier().equals(requestConnection.getDestination().getId())) {
|
||||||
|
try {
|
||||||
|
final ConnectableType destinationConnectableType = ConnectableType.valueOf(requestConnection.getDestination().getType());
|
||||||
|
|
||||||
|
// explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing
|
||||||
|
final Authorizable newDestinationAuthorizable;
|
||||||
|
if (ConnectableType.REMOTE_INPUT_PORT.equals(destinationConnectableType)) {
|
||||||
|
newDestinationAuthorizable = lookup.getRemoteProcessGroup(requestConnection.getDestination().getGroupId());
|
||||||
|
} else {
|
||||||
|
newDestinationAuthorizable = lookup.getLocalConnectable(requestConnection.getDestination().getId());
|
||||||
|
}
|
||||||
|
|
||||||
// verify access of the new destination (current destination was already authorized as part of the connection check)
|
// verify access of the new destination (current destination was already authorized as part of the connection check)
|
||||||
final Authorizable newDestinationAuthorizable = lookup.getConnectable(requestConnection.getDestination().getId());
|
|
||||||
newDestinationAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
newDestinationAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
|
} catch (final IllegalArgumentException e) {
|
||||||
|
throw new IllegalArgumentException(String.format("Unrecognized destination type %s. Excepted values are [%s]",
|
||||||
|
requestConnection.getDestination().getType(), StringUtils.join(ConnectableType.values(), ", ")));
|
||||||
|
}
|
||||||
|
|
||||||
// verify access of the parent group (this is the same check that is performed when creating the connection)
|
// verify access of the parent group (this is the same check that is performed when creating the connection)
|
||||||
connAuth.getParentGroup().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
connAuth.getParentGroup().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
|
|
|
@ -331,7 +331,7 @@ public class FlowFileQueueResource extends ApplicationResource {
|
||||||
requestConnectionEntity,
|
requestConnectionEntity,
|
||||||
lookup -> {
|
lookup -> {
|
||||||
final ConnectionAuthorizable connAuth = lookup.getConnection(id);
|
final ConnectionAuthorizable connAuth = lookup.getConnection(id);
|
||||||
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = connAuth.getSourceData();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
||||||
},
|
},
|
||||||
() -> serviceFacade.verifyListQueue(id),
|
() -> serviceFacade.verifyListQueue(id),
|
||||||
|
@ -400,7 +400,7 @@ public class FlowFileQueueResource extends ApplicationResource {
|
||||||
// authorize access
|
// authorize access
|
||||||
serviceFacade.authorizeAccess(lookup -> {
|
serviceFacade.authorizeAccess(lookup -> {
|
||||||
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
||||||
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = connAuth.getSourceData();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -465,7 +465,7 @@ public class FlowFileQueueResource extends ApplicationResource {
|
||||||
new ListingEntity(connectionId, listingRequestId),
|
new ListingEntity(connectionId, listingRequestId),
|
||||||
lookup -> {
|
lookup -> {
|
||||||
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
||||||
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = connAuth.getSourceData();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
||||||
},
|
},
|
||||||
null,
|
null,
|
||||||
|
@ -554,7 +554,7 @@ public class FlowFileQueueResource extends ApplicationResource {
|
||||||
requestConnectionEntity,
|
requestConnectionEntity,
|
||||||
lookup -> {
|
lookup -> {
|
||||||
final ConnectionAuthorizable connAuth = lookup.getConnection(id);
|
final ConnectionAuthorizable connAuth = lookup.getConnection(id);
|
||||||
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = connAuth.getSourceData();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
},
|
},
|
||||||
null,
|
null,
|
||||||
|
@ -623,7 +623,7 @@ public class FlowFileQueueResource extends ApplicationResource {
|
||||||
// authorize access
|
// authorize access
|
||||||
serviceFacade.authorizeAccess(lookup -> {
|
serviceFacade.authorizeAccess(lookup -> {
|
||||||
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
||||||
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = connAuth.getSourceData();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -688,7 +688,7 @@ public class FlowFileQueueResource extends ApplicationResource {
|
||||||
new DropEntity(connectionId, dropRequestId),
|
new DropEntity(connectionId, dropRequestId),
|
||||||
lookup -> {
|
lookup -> {
|
||||||
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
|
||||||
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = connAuth.getSourceData();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
},
|
},
|
||||||
null,
|
null,
|
||||||
|
|
|
@ -651,7 +651,7 @@ public class FlowResource extends ApplicationResource {
|
||||||
|
|
||||||
// ensure access to every component being scheduled
|
// ensure access to every component being scheduled
|
||||||
requestComponentsToSchedule.keySet().forEach(componentId -> {
|
requestComponentsToSchedule.keySet().forEach(componentId -> {
|
||||||
final Authorizable connectable = lookup.getConnectable(componentId);
|
final Authorizable connectable = lookup.getLocalConnectable(componentId);
|
||||||
connectable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
connectable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.nifi.authorization.TemplateAuthorizable;
|
||||||
import org.apache.nifi.authorization.resource.Authorizable;
|
import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
|
import org.apache.nifi.connectable.ConnectableType;
|
||||||
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
|
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
|
||||||
import org.apache.nifi.web.NiFiServiceFacade;
|
import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
import org.apache.nifi.web.ResourceNotFoundException;
|
import org.apache.nifi.web.ResourceNotFoundException;
|
||||||
|
@ -1505,10 +1506,34 @@ public class ProcessGroupResource extends ApplicationResource {
|
||||||
throw new IllegalArgumentException("The source of the connection must be specified.");
|
throw new IllegalArgumentException("The source of the connection must be specified.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (requestConnection.getSource().getType() == null) {
|
||||||
|
throw new IllegalArgumentException("The type of the source of the connection must be specified.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final ConnectableType sourceConnectableType;
|
||||||
|
try {
|
||||||
|
sourceConnectableType = ConnectableType.valueOf(requestConnection.getSource().getType());
|
||||||
|
} catch (final IllegalArgumentException e) {
|
||||||
|
throw new IllegalArgumentException(String.format("Unrecognized source type %s. Expected values are [%s]",
|
||||||
|
requestConnection.getSource().getType(), StringUtils.join(ConnectableType.values(), ", ")));
|
||||||
|
}
|
||||||
|
|
||||||
if (requestConnection.getDestination() == null || requestConnection.getDestination().getId() == null) {
|
if (requestConnection.getDestination() == null || requestConnection.getDestination().getId() == null) {
|
||||||
throw new IllegalArgumentException("The destination of the connection must be specified.");
|
throw new IllegalArgumentException("The destination of the connection must be specified.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (requestConnection.getDestination().getType() == null) {
|
||||||
|
throw new IllegalArgumentException("The type of the destination of the connection must be specified.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final ConnectableType destinationConnectableType;
|
||||||
|
try {
|
||||||
|
destinationConnectableType = ConnectableType.valueOf(requestConnection.getDestination().getType());
|
||||||
|
} catch (final IllegalArgumentException e) {
|
||||||
|
throw new IllegalArgumentException(String.format("Unrecognized destination type %s. Expected values are [%s]",
|
||||||
|
requestConnection.getDestination().getType(), StringUtils.join(ConnectableType.values(), ", ")));
|
||||||
|
}
|
||||||
|
|
||||||
if (isReplicateRequest()) {
|
if (isReplicateRequest()) {
|
||||||
return replicate(HttpMethod.POST, requestConnectionEntity);
|
return replicate(HttpMethod.POST, requestConnectionEntity);
|
||||||
}
|
}
|
||||||
|
@ -1521,15 +1546,29 @@ public class ProcessGroupResource extends ApplicationResource {
|
||||||
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
|
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
|
||||||
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
|
|
||||||
|
// explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing
|
||||||
|
final Authorizable source;
|
||||||
|
if (ConnectableType.REMOTE_OUTPUT_PORT.equals(sourceConnectableType)) {
|
||||||
|
source = lookup.getRemoteProcessGroup(requestConnection.getSource().getGroupId());
|
||||||
|
} else {
|
||||||
|
source = lookup.getLocalConnectable(requestConnection.getSource().getId());
|
||||||
|
}
|
||||||
|
|
||||||
// ensure write access to the source
|
// ensure write access to the source
|
||||||
final Authorizable source = lookup.getConnectable(requestConnection.getSource().getId());
|
|
||||||
if (source == null) {
|
if (source == null) {
|
||||||
throw new ResourceNotFoundException("Cannot find source component with ID [" + requestConnection.getSource().getId() + "]");
|
throw new ResourceNotFoundException("Cannot find source component with ID [" + requestConnection.getSource().getId() + "]");
|
||||||
}
|
}
|
||||||
source.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
source.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
|
|
||||||
|
// explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing
|
||||||
|
final Authorizable destination;
|
||||||
|
if (ConnectableType.REMOTE_INPUT_PORT.equals(destinationConnectableType)) {
|
||||||
|
destination = lookup.getRemoteProcessGroup(requestConnection.getDestination().getGroupId());
|
||||||
|
} else {
|
||||||
|
destination = lookup.getLocalConnectable(requestConnection.getDestination().getId());
|
||||||
|
}
|
||||||
|
|
||||||
// ensure write access to the destination
|
// ensure write access to the destination
|
||||||
final Authorizable destination = lookup.getConnectable(requestConnection.getDestination().getId());
|
|
||||||
if (destination == null) {
|
if (destination == null) {
|
||||||
throw new ResourceNotFoundException("Cannot find destination component with ID [" + requestConnection.getDestination().getId() + "]");
|
throw new ResourceNotFoundException("Cannot find destination component with ID [" + requestConnection.getDestination().getId() + "]");
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,8 +291,8 @@ public class RemoteProcessGroupResource extends ApplicationResource {
|
||||||
requestRemoteProcessGroupPortEntity,
|
requestRemoteProcessGroupPortEntity,
|
||||||
requestRevision,
|
requestRevision,
|
||||||
lookup -> {
|
lookup -> {
|
||||||
final Authorizable remoteProcessGroupInputPort = lookup.getRemoteProcessGroupInputPort(id, portId);
|
final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id);
|
||||||
remoteProcessGroupInputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
remoteProcessGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
},
|
},
|
||||||
() -> serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, requestRemoteProcessGroupPort),
|
() -> serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, requestRemoteProcessGroupPort),
|
||||||
(revision, remoteProcessGroupPortEntity) -> {
|
(revision, remoteProcessGroupPortEntity) -> {
|
||||||
|
@ -393,8 +393,8 @@ public class RemoteProcessGroupResource extends ApplicationResource {
|
||||||
requestRemoteProcessGroupPortEntity,
|
requestRemoteProcessGroupPortEntity,
|
||||||
requestRevision,
|
requestRevision,
|
||||||
lookup -> {
|
lookup -> {
|
||||||
final Authorizable remoteProcessGroupOutputPort = lookup.getRemoteProcessGroupOutputPort(id, portId);
|
final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id);
|
||||||
remoteProcessGroupOutputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
remoteProcessGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||||
},
|
},
|
||||||
() -> serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, requestRemoteProcessGroupPort),
|
() -> serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, requestRemoteProcessGroupPort),
|
||||||
(revision, remoteProcessGroupPortEntity) -> {
|
(revision, remoteProcessGroupPortEntity) -> {
|
||||||
|
|
|
@ -79,6 +79,7 @@ import org.apache.nifi.provenance.search.SearchTerm;
|
||||||
import org.apache.nifi.provenance.search.SearchTerms;
|
import org.apache.nifi.provenance.search.SearchTerms;
|
||||||
import org.apache.nifi.provenance.search.SearchableField;
|
import org.apache.nifi.provenance.search.SearchableField;
|
||||||
import org.apache.nifi.registry.VariableRegistry;
|
import org.apache.nifi.registry.VariableRegistry;
|
||||||
|
import org.apache.nifi.remote.RemoteGroupPort;
|
||||||
import org.apache.nifi.remote.RootGroupPort;
|
import org.apache.nifi.remote.RootGroupPort;
|
||||||
import org.apache.nifi.reporting.ReportingTask;
|
import org.apache.nifi.reporting.ReportingTask;
|
||||||
import org.apache.nifi.scheduling.ExecutionNode;
|
import org.apache.nifi.scheduling.ExecutionNode;
|
||||||
|
@ -1172,7 +1173,12 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// authorize the event
|
// authorize the event
|
||||||
final Authorizable dataAuthorizable = flowController.createDataAuthorizable(event.getComponentId());
|
final Authorizable dataAuthorizable;
|
||||||
|
if (event.isRemotePortType()) {
|
||||||
|
dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
|
||||||
|
|
||||||
// get the filename and fall back to the identifier (should never happen)
|
// get the filename and fall back to the identifier (should never happen)
|
||||||
|
@ -1215,7 +1221,7 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// authorize the replay
|
// authorize the replay
|
||||||
authorizeReplay(originalEvent.getComponentId(), originalEvent.getAttributes(), originalEvent.getSourceQueueIdentifier());
|
authorizeReplay(originalEvent);
|
||||||
|
|
||||||
// replay the flow file
|
// replay the flow file
|
||||||
final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user);
|
final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user);
|
||||||
|
@ -1230,18 +1236,23 @@ public class ControllerFacade implements Authorizable {
|
||||||
/**
|
/**
|
||||||
* Authorizes access to replay a specified provenance event.
|
* Authorizes access to replay a specified provenance event.
|
||||||
*
|
*
|
||||||
* @param componentId component id
|
* @param event event
|
||||||
* @param eventAttributes event attributes
|
|
||||||
* @param connectionId connection id
|
|
||||||
*/
|
*/
|
||||||
private AuthorizationResult checkAuthorizationForReplay(final String componentId, final Map<String, String> eventAttributes, final String connectionId) {
|
private AuthorizationResult checkAuthorizationForReplay(final ProvenanceEventRecord event) {
|
||||||
// if the connection id isn't specified, then the replay wouldn't be available anyways and we have nothing to authorize against so deny it`
|
// if the connection id isn't specified, then the replay wouldn't be available anyways and we have nothing to authorize against so deny it`
|
||||||
if (connectionId == null) {
|
if (event.getSourceQueueIdentifier() == null) {
|
||||||
return AuthorizationResult.denied();
|
return AuthorizationResult.denied();
|
||||||
}
|
}
|
||||||
|
|
||||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||||
final Authorizable dataAuthorizable = flowController.createDataAuthorizable(componentId);
|
final Authorizable dataAuthorizable;
|
||||||
|
if (event.isRemotePortType()) {
|
||||||
|
dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, String> eventAttributes = event.getAttributes();
|
||||||
|
|
||||||
// ensure we can read the data
|
// ensure we can read the data
|
||||||
final AuthorizationResult result = dataAuthorizable.checkAuthorization(authorizer, RequestAction.READ, user, eventAttributes);
|
final AuthorizationResult result = dataAuthorizable.checkAuthorization(authorizer, RequestAction.READ, user, eventAttributes);
|
||||||
|
@ -1256,20 +1267,24 @@ public class ControllerFacade implements Authorizable {
|
||||||
/**
|
/**
|
||||||
* Authorizes access to replay a specified provenance event.
|
* Authorizes access to replay a specified provenance event.
|
||||||
*
|
*
|
||||||
* @param componentId component id
|
* @param event event
|
||||||
* @param eventAttributes event attributes
|
|
||||||
* @param connectionId connection id
|
|
||||||
*/
|
*/
|
||||||
private void authorizeReplay(final String componentId, final Map<String, String> eventAttributes, final String connectionId) {
|
private void authorizeReplay(final ProvenanceEventRecord event) {
|
||||||
// if the connection id isn't specified, then the replay wouldn't be available anyways and we have nothing to authorize against so deny it`
|
// if the connection id isn't specified, then the replay wouldn't be available anyways and we have nothing to authorize against so deny it`
|
||||||
if (connectionId == null) {
|
if (event.getSourceQueueIdentifier() == null) {
|
||||||
throw new AccessDeniedException("The connection id is unknown.");
|
throw new AccessDeniedException("The connection id is unknown.");
|
||||||
}
|
}
|
||||||
|
|
||||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||||
final Authorizable dataAuthorizable = flowController.createDataAuthorizable(componentId);
|
final Authorizable dataAuthorizable;
|
||||||
|
if (event.isRemotePortType()) {
|
||||||
|
dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
|
|
||||||
// ensure we can read and write the data
|
// ensure we can read and write the data
|
||||||
|
final Map<String, String> eventAttributes = event.getAttributes();
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, eventAttributes);
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, eventAttributes);
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, user, eventAttributes);
|
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, user, eventAttributes);
|
||||||
}
|
}
|
||||||
|
@ -1289,7 +1304,12 @@ public class ControllerFacade implements Authorizable {
|
||||||
|
|
||||||
// get the flowfile attributes and authorize the event
|
// get the flowfile attributes and authorize the event
|
||||||
final Map<String, String> attributes = event.getAttributes();
|
final Map<String, String> attributes = event.getAttributes();
|
||||||
final Authorizable dataAuthorizable = flowController.createDataAuthorizable(event.getComponentId());
|
final Authorizable dataAuthorizable;
|
||||||
|
if (event.isRemotePortType()) {
|
||||||
|
dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
|
||||||
|
|
||||||
// convert the event
|
// convert the event
|
||||||
|
@ -1299,16 +1319,6 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets an authorizable for proveance events for a given component id.
|
|
||||||
*
|
|
||||||
* @param componentId component id
|
|
||||||
* @return authorizable
|
|
||||||
*/
|
|
||||||
public Authorizable getDataAuthorizable(final String componentId) {
|
|
||||||
return flowController.createDataAuthorizable(componentId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a ProvenanceEventDTO for the specified ProvenanceEventRecord.
|
* Creates a ProvenanceEventDTO for the specified ProvenanceEventRecord.
|
||||||
*
|
*
|
||||||
|
@ -1394,7 +1404,7 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// determine if authorized for event replay
|
// determine if authorized for event replay
|
||||||
final AuthorizationResult replayAuthorized = checkAuthorizationForReplay(event.getComponentId(), event.getAttributes(), event.getSourceQueueIdentifier());
|
final AuthorizationResult replayAuthorized = checkAuthorizationForReplay(event);
|
||||||
|
|
||||||
// replay
|
// replay
|
||||||
dto.setReplayAvailable(contentAvailability.isReplayable() && Result.Approved.equals(replayAuthorized.getResult()));
|
dto.setReplayAvailable(contentAvailability.isReplayable() && Result.Approved.equals(replayAuthorized.getResult()));
|
||||||
|
@ -1432,13 +1442,20 @@ public class ControllerFacade implements Authorizable {
|
||||||
private void setComponentDetails(final ProvenanceEventDTO dto) {
|
private void setComponentDetails(final ProvenanceEventDTO dto) {
|
||||||
final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
|
final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
|
||||||
|
|
||||||
final Connectable connectable = root.findConnectable(dto.getComponentId());
|
final Connectable connectable = root.findLocalConnectable(dto.getComponentId());
|
||||||
if (connectable != null) {
|
if (connectable != null) {
|
||||||
dto.setGroupId(connectable.getProcessGroup().getIdentifier());
|
dto.setGroupId(connectable.getProcessGroup().getIdentifier());
|
||||||
dto.setComponentName(connectable.getName());
|
dto.setComponentName(connectable.getName());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final RemoteGroupPort remoteGroupPort = root.findRemoteGroupPort(dto.getComponentId());
|
||||||
|
if (remoteGroupPort != null) {
|
||||||
|
dto.setGroupId(remoteGroupPort.getProcessGroupIdentifier());
|
||||||
|
dto.setComponentName(remoteGroupPort.getName());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final Connection connection = root.findConnection(dto.getComponentId());
|
final Connection connection = root.findConnection(dto.getComponentId());
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
dto.setGroupId(connection.getProcessGroup().getIdentifier());
|
dto.setGroupId(connection.getProcessGroup().getIdentifier());
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.web.dao.impl;
|
||||||
import org.apache.nifi.authorization.Authorizer;
|
import org.apache.nifi.authorization.Authorizer;
|
||||||
import org.apache.nifi.authorization.RequestAction;
|
import org.apache.nifi.authorization.RequestAction;
|
||||||
import org.apache.nifi.authorization.resource.Authorizable;
|
import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
|
import org.apache.nifi.authorization.resource.DataAuthorizable;
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
import org.apache.nifi.connectable.Connectable;
|
import org.apache.nifi.connectable.Connectable;
|
||||||
|
@ -134,7 +135,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
|
||||||
|
|
||||||
// get the attributes and ensure appropriate access
|
// get the attributes and ensure appropriate access
|
||||||
final Map<String, String> attributes = flowFile.getAttributes();
|
final Map<String, String> attributes = flowFile.getAttributes();
|
||||||
final Authorizable dataAuthorizable = flowController.createDataAuthorizable(connection.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable());
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
|
||||||
|
|
||||||
return flowFile;
|
return flowFile;
|
||||||
|
@ -397,15 +398,30 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
|
||||||
}
|
}
|
||||||
|
|
||||||
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
|
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
|
||||||
final Connectable sourceConnectable = rootGroup.findConnectable(sourceDto.getId());
|
|
||||||
|
if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) {
|
||||||
|
final Connectable sourceConnectable = rootGroup.findRemoteGroupPort(sourceDto.getId());
|
||||||
if (sourceConnectable == null) {
|
if (sourceConnectable == null) {
|
||||||
throw new IllegalArgumentException("The specified source for the connection does not exist");
|
throw new IllegalArgumentException("The specified source for the connection does not exist");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
final Connectable sourceConnectable = rootGroup.findLocalConnectable(sourceDto.getId());
|
||||||
|
if (sourceConnectable == null) {
|
||||||
|
throw new IllegalArgumentException("The specified source for the connection does not exist");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final Connectable destinationConnectable = rootGroup.findConnectable(destinationDto.getId());
|
if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) {
|
||||||
|
final Connectable destinationConnectable = rootGroup.findRemoteGroupPort(destinationDto.getId());
|
||||||
if (destinationConnectable == null) {
|
if (destinationConnectable == null) {
|
||||||
throw new IllegalArgumentException("The specified destination for the connection does not exist");
|
throw new IllegalArgumentException("The specified destination for the connection does not exist");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
final Connectable destinationConnectable = rootGroup.findLocalConnectable(destinationDto.getId());
|
||||||
|
if (destinationConnectable == null) {
|
||||||
|
throw new IllegalArgumentException("The specified destination for the connection does not exist");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyList(final FlowFileQueue queue) {
|
private void verifyList(final FlowFileQueue queue) {
|
||||||
|
@ -625,7 +641,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
|
||||||
|
|
||||||
// get the attributes and ensure appropriate access
|
// get the attributes and ensure appropriate access
|
||||||
final Map<String, String> attributes = flowFile.getAttributes();
|
final Map<String, String> attributes = flowFile.getAttributes();
|
||||||
final Authorizable dataAuthorizable = flowController.createDataAuthorizable(connection.getSource().getIdentifier());
|
final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable());
|
||||||
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
|
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
|
||||||
|
|
||||||
// get the filename and fall back to the identifier (should never happen)
|
// get the filename and fall back to the identifier (should never happen)
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
|
||||||
|
|
||||||
final Set<Connectable> connectables = new HashSet<>(componentIds.size());
|
final Set<Connectable> connectables = new HashSet<>(componentIds.size());
|
||||||
for (final String componentId : componentIds) {
|
for (final String componentId : componentIds) {
|
||||||
final Connectable connectable = group.findConnectable(componentId);
|
final Connectable connectable = group.findLocalConnectable(componentId);
|
||||||
if (connectable == null) {
|
if (connectable == null) {
|
||||||
throw new ResourceNotFoundException("Unable to find component with id " + componentId);
|
throw new ResourceNotFoundException("Unable to find component with id " + componentId);
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
|
||||||
final ProcessGroup group = locateProcessGroup(flowController, groupId);
|
final ProcessGroup group = locateProcessGroup(flowController, groupId);
|
||||||
|
|
||||||
for (final String componentId : componentIds) {
|
for (final String componentId : componentIds) {
|
||||||
final Connectable connectable = group.findConnectable(componentId);
|
final Connectable connectable = group.findLocalConnectable(componentId);
|
||||||
if (ScheduledState.RUNNING.equals(state)) {
|
if (ScheduledState.RUNNING.equals(state)) {
|
||||||
if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
|
if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
|
||||||
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
|
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
|
||||||
|
|
|
@ -116,6 +116,8 @@ nf.ConnectionDetails = (function () {
|
||||||
$('#read-only-connection-source-label').text('From output');
|
$('#read-only-connection-source-label').text('From output');
|
||||||
$('#read-only-connection-source').text(source.name);
|
$('#read-only-connection-source').text(source.name);
|
||||||
$('#read-only-connection-source-group-name').text(remoteProcessGroup.name);
|
$('#read-only-connection-source-group-name').text(remoteProcessGroup.name);
|
||||||
|
|
||||||
|
deferred.resolve();
|
||||||
}).fail(function (xhr, status, error) {
|
}).fail(function (xhr, status, error) {
|
||||||
if (xhr.status === 403) {
|
if (xhr.status === 403) {
|
||||||
// populate source processor details
|
// populate source processor details
|
||||||
|
|
|
@ -614,7 +614,8 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// conditionally support going to the component
|
// conditionally support going to the component
|
||||||
if (isInShell && nf.Common.isDefinedAndNotNull(dataContext.groupId)) {
|
var isRemotePort = dataContext.componentType === 'Remote Input Port' || dataContext.componentType === 'Remote Output Port';
|
||||||
|
if (isInShell && nf.Common.isDefinedAndNotNull(dataContext.groupId) && isRemotePort === false) {
|
||||||
markup += ' <div class="pointer go-to fa fa-long-arrow-right" title="Go To"></div>';
|
markup += ' <div class="pointer go-to fa fa-long-arrow-right" title="Go To"></div>';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,51 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.provenance;
|
package org.apache.nifi.provenance;
|
||||||
|
|
||||||
import java.io.EOFException;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileFilter;
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.FilenameFilter;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.SortedMap;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.lucene.document.Document;
|
import org.apache.lucene.document.Document;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
import org.apache.lucene.index.IndexNotFoundException;
|
import org.apache.lucene.index.IndexNotFoundException;
|
||||||
|
@ -119,6 +74,51 @@ import org.apache.nifi.web.ResourceNotFoundException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.EOFException;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileFilter;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class PersistentProvenanceRepository implements ProvenanceRepository {
|
public class PersistentProvenanceRepository implements ProvenanceRepository {
|
||||||
|
|
||||||
public static final String EVENT_CATEGORY = "Provenance Repository";
|
public static final String EVENT_CATEGORY = "Provenance Repository";
|
||||||
|
@ -447,7 +447,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
|
||||||
|
|
||||||
final Authorizable eventAuthorizable;
|
final Authorizable eventAuthorizable;
|
||||||
try {
|
try {
|
||||||
eventAuthorizable = resourceFactory.createDataAuthorizable(event.getComponentId());
|
if (event.isRemotePortType()) {
|
||||||
|
eventAuthorizable = resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
eventAuthorizable = resourceFactory.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
} catch (final ResourceNotFoundException rnfe) {
|
} catch (final ResourceNotFoundException rnfe) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -461,7 +465,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Authorizable eventAuthorizable = resourceFactory.createDataAuthorizable(event.getComponentId());
|
final Authorizable eventAuthorizable;
|
||||||
|
if (event.isRemotePortType()) {
|
||||||
|
eventAuthorizable = resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
eventAuthorizable = resourceFactory.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
eventAuthorizable.authorize(authorizer, RequestAction.READ, user, event.getAttributes());
|
eventAuthorizable.authorize(authorizer, RequestAction.READ, user, event.getAttributes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -256,7 +256,11 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
|
||||||
|
|
||||||
final Authorizable eventAuthorizable;
|
final Authorizable eventAuthorizable;
|
||||||
try {
|
try {
|
||||||
eventAuthorizable = resourceFactory.createDataAuthorizable(event.getComponentId());
|
if (event.isRemotePortType()) {
|
||||||
|
eventAuthorizable = resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
eventAuthorizable = resourceFactory.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
} catch (final ResourceNotFoundException rnfe) {
|
} catch (final ResourceNotFoundException rnfe) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -270,7 +274,12 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Authorizable eventAuthorizable = resourceFactory.createDataAuthorizable(event.getComponentId());
|
final Authorizable eventAuthorizable;
|
||||||
|
if (event.isRemotePortType()) {
|
||||||
|
eventAuthorizable = resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
|
||||||
|
} else {
|
||||||
|
eventAuthorizable = resourceFactory.createLocalDataAuthorizable(event.getComponentId());
|
||||||
|
}
|
||||||
eventAuthorizable.authorize(authorizer, RequestAction.READ, user, event.getAttributes());
|
eventAuthorizable.authorize(authorizer, RequestAction.READ, user, event.getAttributes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue