diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java index f0f56f733d..3970d63803 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java @@ -17,8 +17,11 @@ package org.apache.nifi.web.api.dto.flow; import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import java.util.Date; /** * The NiFi flow starting at a given Process Group. @@ -31,6 +34,7 @@ public class ProcessGroupFlowDTO { private String parentGroupId; private FlowBreadcrumbDTO breadcrumb; private FlowDTO flow; + private Date lastRefreshed; /** * @return contents of this process group. This field will be populated if the request is marked verbose @@ -108,5 +112,15 @@ public class ProcessGroupFlowDTO { this.uri = uri; } + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty( + value = "The time the flow for the process group was last refreshed." + ) + public Date getLastRefreshed() { + return lastRefreshed; + } + public void setLastRefreshed(Date lastRefreshed) { + this.lastRefreshed = lastRefreshed; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionEntity.java index ed4a38fcc3..1b48ca404f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionEntity.java @@ -19,6 +19,7 @@ package org.apache.nifi.web.api.entity; import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import javax.xml.bind.annotation.XmlRootElement; import java.util.List; @@ -30,6 +31,7 @@ import java.util.List; public class ConnectionEntity extends ComponentEntity { private ConnectionDTO component; + private ConnectionStatusDTO status; private List bends; private Integer labelIndex; private String sourceId; @@ -48,6 +50,20 @@ public class ConnectionEntity extends ComponentEntity { this.component = component; } + /** + * @return the connection status + */ + @ApiModelProperty( + value = "The status of the connection." + ) + public ConnectionStatusDTO getStatus() { + return status; + } + + public void setStatus(ConnectionStatusDTO status) { + this.status = status; + } + /** * @return position of the bend points on this connection */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortEntity.java index 0dc4fdc83f..1594664b90 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortEntity.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.web.api.entity; +import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; import javax.xml.bind.annotation.XmlRootElement; @@ -27,6 +29,7 @@ import javax.xml.bind.annotation.XmlRootElement; public class PortEntity extends ComponentEntity { private PortDTO component; + private PortStatusDTO status; private String portType; /** @@ -40,6 +43,20 @@ public class PortEntity extends ComponentEntity { this.component = component; } + /** + * @return the port status + */ + @ApiModelProperty( + value = "The status of the port." + ) + public PortStatusDTO getStatus() { + return status; + } + + public void setStatus(PortStatusDTO status) { + this.status = status; + } + public String getPortType() { return portType; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java index 06e78e5366..6f649bf35d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java @@ -17,7 +17,10 @@ package org.apache.nifi.web.api.entity; import javax.xml.bind.annotation.XmlRootElement; + +import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; /** * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessGroupDTO. @@ -26,6 +29,17 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO; public class ProcessGroupEntity extends ComponentEntity { private ProcessGroupDTO component; + private ProcessGroupStatusDTO status; + + private Integer runningCount; + private Integer stoppedCount; + private Integer invalidCount; + private Integer disabledCount; + private Integer activeRemotePortCount; + private Integer inactiveRemotePortCount; + + private Integer inputPortCount; + private Integer outputPortCount; /** * The ProcessGroupDTO that is being serialized. @@ -40,4 +54,129 @@ public class ProcessGroupEntity extends ComponentEntity { this.component = component; } + /** + * @return the process group status + */ + @ApiModelProperty( + value = "The status of the process group." + ) + public ProcessGroupStatusDTO getStatus() { + return status; + } + + public void setStatus(ProcessGroupStatusDTO status) { + this.status = status; + } + + /** + * @return number of input ports contained in this process group + */ + @ApiModelProperty( + value = "The number of input ports in the process group." + ) + public Integer getInputPortCount() { + return inputPortCount; + } + + public void setInputPortCount(Integer inputPortCount) { + this.inputPortCount = inputPortCount; + } + + /** + * @return number of invalid components in this process group + */ + @ApiModelProperty( + value = "The number of invalid components in the process group." + ) + public Integer getInvalidCount() { + return invalidCount; + } + + public void setInvalidCount(Integer invalidCount) { + this.invalidCount = invalidCount; + } + + /** + * @return number of output ports in this process group + */ + @ApiModelProperty( + value = "The number of output ports in the process group." + ) + public Integer getOutputPortCount() { + return outputPortCount; + } + + public void setOutputPortCount(Integer outputPortCount) { + this.outputPortCount = outputPortCount; + } + + /** + * @return number of running component in this process group + */ + @ApiModelProperty( + value = "The number of running componetns in this process group." + ) + public Integer getRunningCount() { + return runningCount; + } + + public void setRunningCount(Integer runningCount) { + this.runningCount = runningCount; + } + + /** + * @return number of stopped components in this process group + */ + @ApiModelProperty( + value = "The number of stopped components in the process group." + ) + public Integer getStoppedCount() { + return stoppedCount; + } + + public void setStoppedCount(Integer stoppedCount) { + this.stoppedCount = stoppedCount; + } + + /** + * @return number of disabled components in this process group + */ + @ApiModelProperty( + value = "The number of disabled components in the process group." + ) + public Integer getDisabledCount() { + return disabledCount; + } + + public void setDisabledCount(Integer disabledCount) { + this.disabledCount = disabledCount; + } + + /** + * @return number of active remote ports in this process group + */ + @ApiModelProperty( + value = "The number of active remote ports in the process group." + ) + public Integer getActiveRemotePortCount() { + return activeRemotePortCount; + } + + public void setActiveRemotePortCount(Integer activeRemotePortCount) { + this.activeRemotePortCount = activeRemotePortCount; + } + + /** + * @return number of inactive remote ports in this process group + */ + @ApiModelProperty( + value = "The number of inactive remote ports in the process group." + ) + public Integer getInactiveRemotePortCount() { + return inactiveRemotePortCount; + } + + public void setInactiveRemotePortCount(Integer inactiveRemotePortCount) { + this.inactiveRemotePortCount = inactiveRemotePortCount; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupEntity.java index 3183cde979..84f13f22f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupEntity.java @@ -17,7 +17,10 @@ package org.apache.nifi.web.api.entity; import javax.xml.bind.annotation.XmlRootElement; + +import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; /** * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a RemoteProcessGroupDTO. @@ -26,6 +29,10 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; public class RemoteProcessGroupEntity extends ComponentEntity { private RemoteProcessGroupDTO component; + private RemoteProcessGroupStatusDTO status; + + private Integer inputPortCount; + private Integer outputPortCount; /** * The RemoteProcessGroupDTO that is being serialized. @@ -40,4 +47,45 @@ public class RemoteProcessGroupEntity extends ComponentEntity { this.component = component; } + /** + * @return the remote process group status + */ + @ApiModelProperty( + value = "The status of the remote process group." + ) + public RemoteProcessGroupStatusDTO getStatus() { + return status; + } + + public void setStatus(RemoteProcessGroupStatusDTO status) { + this.status = status; + } + + /** + * @return number of Remote Input Ports currently available in the remote NiFi instance + */ + @ApiModelProperty( + value = "The number of remote input ports currently available on the target." + ) + public Integer getInputPortCount() { + return inputPortCount; + } + + public void setInputPortCount(Integer inputPortCount) { + this.inputPortCount = inputPortCount; + } + + /** + * @return number of Remote Output Ports currently available in the remote NiFi instance + */ + @ApiModelProperty( + value = "The number of remote output ports currently available on the target." + ) + public Integer getOutputPortCount() { + return outputPortCount; + } + + public void setOutputPortCount(Integer outputPortCount) { + this.outputPortCount = outputPortCount; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 44f69bf631..47d0594559 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.web; -import java.util.Date; -import java.util.List; -import java.util.Set; - import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; @@ -49,7 +45,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.ResourceDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.TemplateDTO; @@ -83,6 +78,10 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.SnippetEntity; +import java.util.Date; +import java.util.List; +import java.util.Set; + /** * Defines the NiFiServiceFacade interface. */ @@ -214,10 +213,9 @@ public interface NiFiServiceFacade { /** * Creates a new archive of the flow configuration. * - * @param revision Revision to compare with current base revision * @return snapshot */ - ProcessGroupEntity createArchive(Revision revision); + ProcessGroupEntity createArchive(); /** * Sets the annotation data for a processor. @@ -297,13 +295,6 @@ public interface NiFiServiceFacade { */ Set getWorkQueuePrioritizerTypes(); - /** - * Returns the current revision. - * - * @return revision - */ - RevisionDTO getRevision(); - // ---------------------------------------- // Template methods // ---------------------------------------- @@ -328,14 +319,13 @@ public interface NiFiServiceFacade { /** * Instantiate the corresponding template. * - * @param revision revision * @param groupId group id * @param templateId template id * @param originX x * @param originY y * @return snapshot */ - FlowEntity createTemplateInstance(Revision revision, String groupId, Double originX, Double originY, String templateId); + FlowEntity createTemplateInstance(String groupId, Double originX, Double originY, String templateId); /** * Gets the template with the specified id. @@ -373,12 +363,11 @@ public interface NiFiServiceFacade { /** * Creates a new Processor. * - * @param revision Revision to compare with current base revision * @param groupId Group id * @param processorDTO The processor DTO * @return The new processor DTO */ - ProcessorEntity createProcessor(Revision revision, String groupId, ProcessorDTO processorDTO); + ProcessorEntity createProcessor(String groupId, ProcessorDTO processorDTO); /** * Gets the Processor transfer object for the specified id. @@ -492,12 +481,11 @@ public interface NiFiServiceFacade { /** * Creates a new Relationship target. * - * @param revision Revision to compare with current base revision * @param groupId group * @param connectionDTO The Connection DTO * @return The Connection DTO */ - ConnectionEntity createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO); + ConnectionEntity createConnection(String groupId, ConnectionDTO connectionDTO); /** * Determines if this connection can be listed. @@ -618,12 +606,11 @@ public interface NiFiServiceFacade { /** * Creates a new input port. * - * @param revision Revision to compare with current base revision * @param groupId The id of the group this port should be create in * @param inputPortDTO The input PortDTO * @return snapshot */ - PortEntity createInputPort(Revision revision, String groupId, PortDTO inputPortDTO); + PortEntity createInputPort(String groupId, PortDTO inputPortDTO); /** * Gets an input port. @@ -687,12 +674,11 @@ public interface NiFiServiceFacade { /** * Creates a new output port. * - * @param revision Revision to compare with current base revision * @param groupId The id of the group this port should be create in * @param outputPortDTO The output PortDTO * @return snapshot */ - PortEntity createOutputPort(Revision revision, String groupId, PortDTO outputPortDTO); + PortEntity createOutputPort( String groupId, PortDTO outputPortDTO); /** * Gets an output port. @@ -769,11 +755,10 @@ public interface NiFiServiceFacade { * Creates a new process group. * * @param parentGroupId The id of the parent group - * @param revision Revision to compare with current base revision * @param processGroupDTO The ProcessGroupDTO * @return snapshot */ - ProcessGroupEntity createProcessGroup(String parentGroupId, Revision revision, ProcessGroupDTO processGroupDTO); + ProcessGroupEntity createProcessGroup(String parentGroupId, ProcessGroupDTO processGroupDTO); /** * Returns the process group. @@ -829,12 +814,11 @@ public interface NiFiServiceFacade { /** * Creates a new remote process group. * - * @param revision Revision to compare with current base revision * @param groupId The id of the parent group * @param remoteProcessGroupDTO The RemoteProcessGroupDTO * @return snapshot */ - RemoteProcessGroupEntity createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO); + RemoteProcessGroupEntity createRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO); /** * Gets a remote process group. @@ -942,12 +926,11 @@ public interface NiFiServiceFacade { /** * Creates a funnel. * - * @param revision Revision to compare with current base revision * @param groupId group * @param funnelDTO funnel * @return The funnel DTO */ - FunnelEntity createFunnel(Revision revision, String groupId, FunnelDTO funnelDTO); + FunnelEntity createFunnel(String groupId, FunnelDTO funnelDTO); /** * Gets the specified funnel. @@ -1072,12 +1055,11 @@ public interface NiFiServiceFacade { /** * Creates a label. * - * @param revision Revision to compare with current base revision * @param groupId group * @param labelDTO The label DTO * @return The label DTO */ - LabelEntity createLabel(Revision revision, String groupId, LabelDTO labelDTO); + LabelEntity createLabel(String groupId, LabelDTO labelDTO); /** * Gets the specified label. @@ -1320,23 +1302,21 @@ public interface NiFiServiceFacade { /** * Creates a new snippet based off the existing snippet. * - * @param revision revision * @param groupId group id * @param snippetId snippet id * @param originX x * @param originY y * @return snapshot */ - FlowEntity copySnippet(Revision revision, String groupId, String snippetId, Double originX, Double originY); + FlowEntity copySnippet(String groupId, String snippetId, Double originX, Double originY); /** * Creates a new snippet. * - * @param revision revision * @param snippet snippet * @return snapshot */ - ConfigurationSnapshot createSnippet(Revision revision, SnippetDTO snippet); + SnippetEntity createSnippet(SnippetDTO snippet); /** * Gets the specified snippet. @@ -1344,7 +1324,7 @@ public interface NiFiServiceFacade { * @param snippetId id * @return snippet */ - SnippetDTO getSnippet(String snippetId); + SnippetEntity getSnippet(String snippetId); /** * Determines if this snippet can be updated. @@ -1360,7 +1340,7 @@ public interface NiFiServiceFacade { * @param snippetDto snippet * @return snapshot */ - ConfigurationSnapshot updateSnippet(Revision revision, SnippetDTO snippetDto); + UpdateResult updateSnippet(Revision revision, SnippetDTO snippetDto); /** * Determines if this snippet can be removed. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 5c73e8a35d..4ba8b33f12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,30 +16,6 @@ */ package org.apache.nifi.web; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import javax.ws.rs.WebApplicationException; - import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.nifi.action.Action; @@ -82,6 +58,7 @@ import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; @@ -190,6 +167,29 @@ import org.apache.nifi.web.revision.StandardRevisionUpdate; import org.apache.nifi.web.revision.UpdateRevisionTask; import org.apache.nifi.web.util.SnippetUtils; +import javax.ws.rs.WebApplicationException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -397,7 +397,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public UpdateResult updateConnection(final Revision revision, final ConnectionDTO connectionDTO) { // if connection does not exist, then create new connection if (connectionDAO.hasConnection(connectionDTO.getId()) == false) { - return new UpdateResult<>(createConnection(revision, connectionDTO.getParentGroupId(), connectionDTO), true); + return new UpdateResult<>(createConnection(connectionDTO.getParentGroupId(), connectionDTO), true); } final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId()); @@ -409,14 +409,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { connection -> dtoFactory.createConnectionDto(connection)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connectionNode); - return new UpdateResult<>(entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); } @Override public UpdateResult updateProcessor(final Revision revision, final ProcessorDTO processorDTO) { // if processor does not exist, then create new processor if (processorDAO.hasProcessor(processorDTO.getId()) == false) { - return new UpdateResult<>(createProcessor(revision, processorDTO.getParentGroupId(), processorDTO), true); + return new UpdateResult<>(createProcessor(processorDTO.getParentGroupId(), processorDTO), true); } // get the component, ensure we have access to it, and perform the update request @@ -427,14 +428,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { proc -> dtoFactory.createProcessorDto(proc)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processorNode); - return new UpdateResult<>(entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); } @Override public UpdateResult updateLabel(final Revision revision, final LabelDTO labelDTO) { // if label does not exist, then create new label if (labelDAO.hasLabel(labelDTO.getId()) == false) { - return new UpdateResult<>(createLabel(revision, labelDTO.getParentGroupId(), labelDTO), false); + return new UpdateResult<>(createLabel(labelDTO.getParentGroupId(), labelDTO), false); } final Label labelNode = labelDAO.getLabel(labelDTO.getId()); @@ -451,7 +453,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public UpdateResult updateFunnel(final Revision revision, final FunnelDTO funnelDTO) { // if label does not exist, then create new label if (funnelDAO.hasFunnel(funnelDTO.getId()) == false) { - return new UpdateResult<>(createFunnel(revision, funnelDTO.getParentGroupId(), funnelDTO), true); + return new UpdateResult<>(createFunnel(funnelDTO.getParentGroupId(), funnelDTO), true); } final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId()); @@ -584,14 +586,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConfigurationSnapshot updateSnippet(final Revision revision, final SnippetDTO snippetDto) { + public UpdateResult updateSnippet(final Revision revision, final SnippetDTO snippetDto) { // if label does not exist, then create new label if (snippetDAO.hasSnippet(snippetDto.getId()) == false) { - return createSnippet(revision, snippetDto); + return new UpdateResult<>(createSnippet(snippetDto), true); } final Set requiredRevisions = getRevisionsForSnippet(snippetDto); - final ProcessGroup processGroup = getGroup(snippetDto.getParentGroupId()); + + // if the parent group is specified in the request, ensure write access to it as it could be moving the components in the snippet + final String requestProcessGroupIdentifier = snippetDto.getParentGroupId(); + if (requestProcessGroupIdentifier != null) { + final ProcessGroup requestProcessGroup = processGroupDAO.getProcessGroup(requestProcessGroupIdentifier); + requestProcessGroup.authorize(authorizer, RequestAction.WRITE); + } final String modifier = NiFiUserUtils.getNiFiUserName(); final RevisionClaim revisionClaim = new StandardRevisionClaim(requiredRevisions); @@ -601,15 +609,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { versionedSnippet = revisionManager.updateRevision(revisionClaim, modifier, new UpdateRevisionTask() { @Override public RevisionUpdate update() { - // ensure write access to the flow - processGroup.authorize(authorizer, RequestAction.WRITE); - - // also ensure read access to the flow as the component must be read in order to generate a response - processGroup.authorize(authorizer, RequestAction.READ); - // get the updated component final Snippet snippet = snippetDAO.updateSnippet(snippetDto); + // ensure write access to the flow + final ProcessGroup processGroup = getGroup(snippet.getParentGroupId()); + processGroup.authorize(authorizer, RequestAction.WRITE); + // save updated controller controllerFacade.save(); @@ -634,14 +640,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Revision updatedSnippetRevision = incrementRevision(revision); final FlowModification lastModification = new FlowModification(updatedSnippetRevision, modifier); - return new StandardRevisionUpdate<>(snippetDto, lastModification); + return new StandardRevisionUpdate<>(updatedSnippet, lastModification); } }); } catch (ExpiredRevisionClaimException e) { throw new InvalidRevisionException("Failed to update Snippet", e); } - return new ConfigurationSnapshot(versionedSnippet.getLastModification().getRevision().getVersion(), versionedSnippet.getComponent()); + final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(versionedSnippet.getComponent().getParentGroupId()); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(parentGroup); + return new UpdateResult<>(entityFactory.createSnippetEntity(versionedSnippet.getComponent(), dtoFactory.createRevisionDTO(versionedSnippet.getLastModification()), accessPolicy), false); } private Map updateRevisions(final Map originalDtos, final String modifier) { @@ -664,7 +672,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public UpdateResult updateInputPort(final Revision revision, final PortDTO inputPortDTO) { // if input port does not exist, then create new input port if (inputPortDAO.hasPort(inputPortDTO.getId()) == false) { - return new UpdateResult<>(createInputPort(revision, inputPortDTO.getParentGroupId(), inputPortDTO), true); + return new UpdateResult<>(createInputPort(inputPortDTO.getParentGroupId(), inputPortDTO), true); } final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId()); @@ -674,14 +682,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { port -> dtoFactory.createPortDto(port)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(inputPortNode); - return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); } @Override public UpdateResult updateOutputPort(final Revision revision, final PortDTO outputPortDTO) { // if output port does not exist, then create new output port if (outputPortDAO.hasPort(outputPortDTO.getId()) == false) { - return new UpdateResult<>(createOutputPort(revision, outputPortDTO.getParentGroupId(), outputPortDTO), true); + return new UpdateResult<>(createOutputPort(outputPortDTO.getParentGroupId(), outputPortDTO), true); } final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId()); @@ -691,14 +700,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { port -> dtoFactory.createPortDto(port)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(outputPortNode); - return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); } @Override public UpdateResult updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) { // if controller reference does not exist, then create new controller reference if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId()) == false) { - return new UpdateResult<>(createRemoteProcessGroup(revision, remoteProcessGroupDTO.getParentGroupId(), remoteProcessGroupDTO), true); + return new UpdateResult<>(createRemoteProcessGroup(remoteProcessGroupDTO.getParentGroupId(), remoteProcessGroupDTO), true); } final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); @@ -710,7 +720,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroupNode); final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return new UpdateResult<>(entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, accessPolicy), false); + final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, accessPolicy, status), false); } @Override @@ -752,7 +763,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (processGroupDTO.getParentGroupId() == null) { throw new IllegalArgumentException("Unable to create the specified process group since the parent group was not specified."); } else { - return new UpdateResult<>(createProcessGroup(processGroupDTO.getParentGroupId(), revision, processGroupDTO), true); + return new UpdateResult<>(createProcessGroup(processGroupDTO.getParentGroupId(), processGroupDTO), true); } } @@ -764,7 +775,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroupNode); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return new UpdateResult<>(entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, accessPolicy), false); + final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, accessPolicy, status), false); } @Override @@ -889,7 +901,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> connectionDAO.deleteConnection(connectionId), dtoFactory.createConnectionDto(connection)); - return entityFactory.createConnectionEntity(snapshot, null, null); + return entityFactory.createConnectionEntity(snapshot, null, null, null); } @Override @@ -927,7 +939,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> processorDAO.deleteProcessor(processorId), dtoFactory.createProcessorDto(processor)); - return entityFactory.createProcessorEntity(snapshot, null, null); + return entityFactory.createProcessorEntity(snapshot, null, null, null); } @Override @@ -989,6 +1001,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Snippet snippet = snippetDAO.getSnippet(snippetId); final ProcessGroup processGroup = getGroup(snippet.getParentGroupId()); + // ensure access to process group + processGroup.authorize(authorizer, RequestAction.WRITE); + final SnippetDTO snapshot = deleteComponent(revision, processGroup, () -> snippetDAO.deleteSnippet(snippetId), @@ -1006,7 +1021,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> inputPortDAO.deletePort(inputPortId), dtoFactory.createPortDto(port)); - return entityFactory.createPortEntity(snapshot, null, null); + return entityFactory.createPortEntity(snapshot, null, null, null); } @Override @@ -1018,7 +1033,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> outputPortDAO.deletePort(outputPortId), dtoFactory.createPortDto(port)); - return entityFactory.createPortEntity(snapshot, null, null); + return entityFactory.createPortEntity(snapshot, null, null, null); } @Override @@ -1030,7 +1045,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> processGroupDAO.deleteProcessGroup(groupId), dtoFactory.createProcessGroupDto(processGroup)); - return entityFactory.createProcessGroupEntity(snapshot, null, null); + return entityFactory.createProcessGroupEntity(snapshot, null, null, null); } @Override @@ -1042,7 +1057,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - return entityFactory.createRemoteProcessGroupEntity(snapshot, null, null); + return entityFactory.createRemoteProcessGroupEntity(snapshot, null, null, null); } @Override @@ -1052,16 +1067,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { + public ConnectionEntity createConnection(final String groupId, final ConnectionDTO connectionDTO) { final RevisionUpdate snapshot = createComponent( - revision, connectionDTO, () -> connectionDAO.createConnection(groupId, connectionDTO), connection -> dtoFactory.createConnectionDto(connection)); final Connection connection = connectionDAO.getConnection(connectionDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); - return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId())); + return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); } @Override @@ -1091,22 +1106,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { + public ProcessorEntity createProcessor(final String groupId, final ProcessorDTO processorDTO) { final RevisionUpdate snapshot = createComponent( - revision, processorDTO, () -> processorDAO.createProcessor(groupId, processorDTO), processor -> dtoFactory.createProcessorDto(processor)); final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); - return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId())); + return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); } @Override - public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { + public LabelEntity createLabel(final String groupId, final LabelDTO labelDTO) { final RevisionUpdate snapshot = createComponent( - revision, labelDTO, () -> labelDAO.createLabel(groupId, labelDTO), label -> dtoFactory.createLabelDto(label)); @@ -1119,7 +1133,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { /** * Creates a component using the optimistic locking manager. * - * @param revision the current revision * @param componentDto the DTO that will be used to create the component * @param daoCreation A Supplier that will create the NiFi Component to use * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO @@ -1129,7 +1142,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { * * @return a RevisionUpdate that represents the updated configuration */ - private RevisionUpdate createComponent(final Revision revision, final ComponentDTO componentDto, + private RevisionUpdate createComponent(final ComponentDTO componentDto, final Supplier daoCreation, final Function dtoCreation) { final String modifier = NiFiUserUtils.getNiFiUserName(); @@ -1163,9 +1176,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override - public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { + public FunnelEntity createFunnel(final String groupId, final FunnelDTO funnelDTO) { final RevisionUpdate snapshot = createComponent( - revision, funnelDTO, () -> funnelDAO.createFunnel(groupId, funnelDTO), funnel -> dtoFactory.createFunnelDto(funnel)); @@ -1233,14 +1245,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FlowEntity copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) { - final FlowDTO flowDto = revisionManager.get(groupId, new ReadOnlyRevisionCallback() { - @Override - public FlowDTO withRevision(final Revision revision) { - String id = snippetId; + public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY) { + final FlowDTO flowDto = revisionManager.get(groupId, + rev -> { + // ensure access to process group + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); // create the new snippet - final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, id, originX, originY); + final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY); + + // TODO - READ access to all components in snippet // validate the new snippet validateSnippetContents(snippet); @@ -1248,10 +1263,41 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - return dtoFactory.createFlowDto(group, snippet); - } - }); + // identify all components added + final Set identifiers = new HashSet<>(); + snippet.getProcessors().stream() + .map(proc -> proc.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getConnections().stream() + .map(conn -> conn.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getInputPorts().stream() + .map(port -> port.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getOutputPorts().stream() + .map(port -> port.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getProcessGroups().stream() + .map(group -> group.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .map(remoteGroup -> remoteGroup.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) + .map(remoteInputPort -> remoteInputPort.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) + .map(remoteOutputPort -> remoteOutputPort.getId()) + .forEach(id -> identifiers.add(id)); + + return revisionManager.get(identifiers, + () -> { + final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); + return dtoFactory.createFlowDto(processGroup, groupStatus, snippet, revisionManager); + }); + }); final FlowEntity flowEntity = new FlowEntity(); flowEntity.setFlow(flowDto); @@ -1263,79 +1309,88 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConfigurationSnapshot createSnippet(final Revision revision, final SnippetDTO snippetDTO) { - final RevisionClaim claim = new StandardRevisionClaim(revision); + public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { final String modifier = NiFiUserUtils.getNiFiUserName(); - try { - final RevisionUpdate update = revisionManager.updateRevision(claim, modifier, new UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - // ensure id is set - if (StringUtils.isBlank(snippetDTO.getId())) { - snippetDTO.setId(UUID.randomUUID().toString()); - } - // add the snippet - final Snippet snippet = snippetDAO.createSnippet(snippetDTO); - final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet); - return new StandardRevisionUpdate<>(responseSnippetDTO, new FlowModification(incrementRevision(revision), modifier)); - } - }); - - return createConfigSnapshot(update); - } catch (ExpiredRevisionClaimException e) { - throw new InvalidRevisionException("Could not create Snippet", e); + // ensure id is set + if (StringUtils.isBlank(snippetDTO.getId())) { + snippetDTO.setId(UUID.randomUUID().toString()); } + + final String groupId = snippetDTO.getParentGroupId(); + final RevisionUpdate snapshot = revisionManager.get(groupId, rev -> { + // ensure access to process group + final ProcessGroup parent = processGroupDAO.getProcessGroup(groupId); + parent.authorize(authorizer, RequestAction.WRITE); + + // TODO - READ access to all components in snippet + + // add the component + final Snippet snippet = snippetDAO.createSnippet(snippetDTO); + + // save the flow + controllerFacade.save(); + + final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); + final FlowModification lastMod = new FlowModification(new Revision(0L, rev.getClientId(), snippetDTO.getId()), modifier); + return new StandardRevisionUpdate(dto, lastMod); + }); + + final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(snapshot.getComponent().getParentGroupId()); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(parentGroup); + return entityFactory.createSnippetEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override - public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { + public PortEntity createInputPort(final String groupId, final PortDTO inputPortDTO) { final RevisionUpdate snapshot = createComponent( - revision, inputPortDTO, () -> inputPortDAO.createPort(groupId, inputPortDTO), port -> dtoFactory.createPortDto(port)); final Port port = inputPortDAO.getPort(inputPortDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); } @Override - public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { + public PortEntity createOutputPort(final String groupId, final PortDTO outputPortDTO) { final RevisionUpdate snapshot = createComponent( - revision, outputPortDTO, () -> outputPortDAO.createPort(groupId, outputPortDTO), port -> dtoFactory.createPortDto(port)); final Port port = outputPortDAO.getPort(outputPortDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); } @Override - public ProcessGroupEntity createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) { - final RevisionUpdate snapshot = createComponent(revision, processGroupDTO, + public ProcessGroupEntity createProcessGroup(final String parentGroupId, final ProcessGroupDTO processGroupDTO) { + final RevisionUpdate snapshot = createComponent( + processGroupDTO, () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroup); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); + return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); } @Override - public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { + public RemoteProcessGroupEntity createRemoteProcessGroup(final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { final RevisionUpdate snapshot = createComponent( - revision, remoteProcessGroupDTO, () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroup); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier())); + return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); } @Override @@ -1381,23 +1436,59 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FlowEntity createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) { - final FlowDTO flowDto = revisionManager.get(groupId, new ReadOnlyRevisionCallback() { - @Override - public FlowDTO withRevision(final Revision revision) { - // instantiate the template - there is no need to make another copy of the flow snippet since the actual template - // was copied and this dto is only used to instantiate it's components (which as already completed) - final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId); + public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId) { + final FlowDTO flowDto = revisionManager.get(groupId, rev -> { + // ensure access to process group + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); - // validate the new snippet - validateSnippetContents(snippet); + // instantiate the template - there is no need to make another copy of the flow snippet since the actual template + // was copied and this dto is only used to instantiate it's components (which as already completed) + final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId); - // save the flow - controllerFacade.save(); + // TODO - READ access to all components in snippet - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - return dtoFactory.createFlowDto(group, snippet); - } + // validate the new snippet + validateSnippetContents(snippet); + + // save the flow + controllerFacade.save(); + + // identify all components added + final Set identifiers = new HashSet<>(); + snippet.getProcessors().stream() + .map(proc -> proc.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getConnections().stream() + .map(conn -> conn.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getInputPorts().stream() + .map(port -> port.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getOutputPorts().stream() + .map(port -> port.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getProcessGroups().stream() + .map(group -> group.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .map(remoteGroup -> remoteGroup.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) + .map(remoteInputPort -> remoteInputPort.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) + .map(remoteOutputPort -> remoteOutputPort.getId()) + .forEach(id -> identifiers.add(id)); + + return revisionManager.get(identifiers, + () -> { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); + return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager); + }); }); final FlowEntity flowEntity = new FlowEntity(); @@ -1406,7 +1497,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProcessGroupEntity createArchive(final Revision revision) { + public ProcessGroupEntity createArchive() { try { controllerFacade.createArchive(); } catch (IOException e) { @@ -1442,7 +1533,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); final FlowModification lastMod = new FlowModification(incrementRevision(revision), modifier); - final ProcessorEntity entity = entityFactory.createProcessorEntity(updatedProcDto, dtoFactory.createRevisionDTO(lastMod), accessPolicy); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); + final ProcessorEntity entity = entityFactory.createProcessorEntity(updatedProcDto, dtoFactory.createRevisionDTO(lastMod), accessPolicy, status); return new StandardRevisionUpdate<>(entity, lastMod); } @@ -1760,12 +1852,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- // Read Operations // ----------------------------------------- - @Override - @Deprecated - public RevisionDTO getRevision() { - final Revision revision = revisionManager.getRevision(processGroupDAO.getProcessGroup("root").getIdentifier()); - return dtoFactory.createRevisionDTO(revision.getVersion(), revision.getClientId()); - } @Override public SearchResultsDTO searchController(String query) { @@ -1804,7 +1890,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) { - return controllerFacade.getProcessGroupStatus(groupId); + return dtoFactory.createProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(groupId)); } @Override @@ -1874,23 +1960,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public Set getConnections(String groupId) { - return revisionManager.get(groupId, new ReadOnlyRevisionCallback>() { - @Override - public Set withRevision(Revision revision) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - group.authorize(authorizer, RequestAction.READ); + return revisionManager.get(groupId, rev -> { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + group.authorize(authorizer, RequestAction.READ); - final Set connections = connectionDAO.getConnections(groupId); - final Set connectionIds = connections.stream().map(connection -> connection.getIdentifier()).collect(Collectors.toSet()); - return revisionManager.get(connectionIds, () -> { - final Set connectionEntities = new LinkedHashSet<>(); - for (Connection connection : connectionDAO.getConnections(groupId)) { - connectionEntities.add(entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), null, dtoFactory.createAccessPolicyDto(connection))); - } - return connectionEntities; - }); - - } + final Set connections = connectionDAO.getConnections(groupId); + final Set connectionIds = connections.stream().map(connection -> connection.getIdentifier()).collect(Collectors.toSet()); + return revisionManager.get(connectionIds, () -> { + return connections.stream() + .map(connection -> { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); + return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, accessPolicy, status); + }) + .collect(Collectors.toSet()); + }); }); } @@ -1899,7 +1984,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return revisionManager.get(connectionId, rev -> { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.READ); - return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connectionDAO.getConnection(connectionId)), null, dtoFactory.createAccessPolicyDto(connection)); + + final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); + return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connectionDAO.getConnection(connectionId)), revision, accessPolicy, status); }); } @@ -1937,7 +2026,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConnectionStatusDTO getConnectionStatus(String connectionId) { - return revisionManager.get(connectionId, rev -> controllerFacade.getConnectionStatus(connectionId)); + return revisionManager.get(connectionId, rev -> dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId))); } @Override @@ -1953,11 +2042,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Set processors = processorDAO.getProcessors(groupId); final Set ids = processors.stream().map(proc -> proc.getIdentifier()).collect(Collectors.toSet()); return revisionManager.get(ids, () -> { - Set processorEntities = new LinkedHashSet<>(); - for (ProcessorNode processor : processorDAO.getProcessors(groupId)) { - processorEntities.add(entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), null, dtoFactory.createAccessPolicyDto(processor))); - } - return processorEntities; + return processors.stream() + .map(processor -> { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); + return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, accessPolicy, status); + }) + .collect(Collectors.toSet()); }); } @@ -2010,7 +2102,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return revisionManager.get(id, rev -> { final ProcessorNode processor = processorDAO.getProcessor(id); processor.authorize(authorizer, RequestAction.READ); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), null, dtoFactory.createAccessPolicyDto(processor)); + + final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); + return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, dtoFactory.createAccessPolicyDto(processor), status); }); } @@ -2029,7 +2124,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorStatusDTO getProcessorStatus(String id) { - return revisionManager.get(id, rev -> controllerFacade.getProcessorStatus(id)); + return revisionManager.get(id, rev -> dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id))); } @Override @@ -2221,11 +2316,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Set