From 270944ec692e12c221cdff202bdab56309dfcfd7 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 May 2016 13:09:36 -0400 Subject: [PATCH] NIFI-1801: Scope Templates to Process Groups. This closes #446. --- .../java/org/apache/nifi/web/Revision.java | 5 - .../apache/nifi/web/api/dto/TemplateDTO.java | 15 +- .../cluster/protocol/StandardDataFlow.java | 11 +- .../jaxb/message/AdaptedDataFlow.java | 9 - .../jaxb/message/DataFlowAdapter.java | 3 +- .../impl/NodeProtocolSenderImplTest.java | 2 +- .../ClusterProtocolHeartbeatMonitor.java | 17 - .../endpoints/FlowSnippetEndpointMerger.java | 44 +- .../endpoints/ProcessorEndpointMerger.java | 14 + .../RemoteProcessGroupEndpointMerger.java | 13 + .../http/replication/ResponseUtils.java | 2 +- .../ThreadPoolRequestReplicator.java | 30 +- .../cluster/flow/impl/DataFlowDaoImpl.java | 4 +- .../manager/impl/WebClusterManager.java | 6 +- .../DataFlowManagementServiceImplTest.java | 14 +- .../nifi/cluster/protocol/DataFlow.java | 5 - .../org/apache/nifi/controller/Template.java | 191 +++++++ .../org/apache/nifi/groups/ProcessGroup.java | 42 ++ .../nifi/controller/FlowController.java | 75 +-- .../nifi/controller/StandardFlowService.java | 100 +++- .../controller/StandardFlowSynchronizer.java | 99 +--- .../org/apache/nifi/controller/Template.java | 37 -- .../nifi/controller/TemplateManager.java | 524 ------------------ .../apache/nifi/controller/TemplateUtils.java | 287 ++++++++++ .../serialization/StandardFlowSerializer.java | 26 + .../nifi/fingerprint/FingerprintFactory.java | 5 +- .../nifi/groups/StandardProcessGroup.java | 95 ++++ .../nifi/persistence/TemplateSerializer.java | 1 + .../controller/StandardFlowServiceTest.java | 16 +- .../service/mock/MockProcessGroup.java | 30 + .../apache/nifi/web/NiFiServiceFacade.java | 7 +- .../nifi/web/StandardNiFiServiceFacade.java | 300 +++++++--- .../nifi/web/api/ProcessGroupResource.java | 11 +- .../nifi/web/api/ProcessorResource.java | 23 +- .../IllegalArgumentExceptionMapper.java | 5 +- .../nifi/web/controller/ControllerFacade.java | 2 +- .../org/apache/nifi/web/dao/TemplateDAO.java | 6 +- .../web/dao/impl/StandardTemplateDAO.java | 45 +- .../web/revision/NaiveRevisionManager.java | 131 +++-- .../nifi/web/revision/RevisionManager.java | 8 + .../revision/TestNaiveRevisionManager.java | 65 ++- 41 files changed, 1325 insertions(+), 1000 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java diff --git a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java index 0533307e91..4c47dde792 100644 --- a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java +++ b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java @@ -81,11 +81,6 @@ public class Revision implements Serializable { return false; } - // TODO: THIS IS FOR TESTING PURPOSES! DO NOT LET THIS GET CHECKED IN THIS WAY!!!!!!!!!!!! - if (true) { - return true; - } - Revision thatRevision = (Revision) obj; // ensure that component ID's are the same (including null) if (thatRevision.getComponentId() == null && getComponentId() != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java index 6fa9dafc51..4f549231b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java @@ -31,6 +31,7 @@ public class TemplateDTO { private String uri; private String id; + private String groupId; private String name; private String description; private Date timestamp; @@ -40,9 +41,7 @@ public class TemplateDTO { /** * @return id for this template */ - @ApiModelProperty( - value = "The id of the template." - ) + @ApiModelProperty("The id of the template.") public String getId() { return id; } @@ -51,6 +50,16 @@ public class TemplateDTO { this.id = id; } + @ApiModelProperty("The id of the Process Group that the template belongs to.") + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + /** * @return uri for this template */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java index 3b6d110913..fc3558a5d7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java @@ -29,9 +29,9 @@ import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; */ @XmlJavaTypeAdapter(DataFlowAdapter.class) public class StandardDataFlow implements Serializable, DataFlow { + private static final long serialVersionUID = 1L; private final byte[] flow; - private final byte[] templateBytes; private final byte[] snippetBytes; private boolean autoStartProcessors; @@ -40,23 +40,20 @@ public class StandardDataFlow implements Serializable, DataFlow { * Constructs an instance. * * @param flow a valid flow as bytes, which cannot be null - * @param templateBytes an XML representation of templates. May be null. * @param snippetBytes an XML representation of snippets. May be null. * * @throws NullPointerException if flow is null */ - public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { + public StandardDataFlow(final byte[] flow, final byte[] snippetBytes) { if(flow == null){ throw new NullPointerException("Flow cannot be null"); } this.flow = flow; - this.templateBytes = templateBytes; this.snippetBytes = snippetBytes; } public StandardDataFlow(final DataFlow toCopy) { this.flow = copy(toCopy.getFlow()); - this.templateBytes = copy(toCopy.getTemplates()); this.snippetBytes = copy(toCopy.getSnippets()); this.autoStartProcessors = toCopy.isAutoStartProcessors(); } @@ -70,10 +67,6 @@ public class StandardDataFlow implements Serializable, DataFlow { return flow; } - @Override - public byte[] getTemplates() { - return templateBytes; - } @Override public byte[] getSnippets() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java index 683fdf5cc3..62796d7190 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java @@ -21,7 +21,6 @@ package org.apache.nifi.cluster.protocol.jaxb.message; public class AdaptedDataFlow { private byte[] flow; - private byte[] templates; private byte[] snippets; private boolean autoStartProcessors; @@ -37,14 +36,6 @@ public class AdaptedDataFlow { this.flow = flow; } - public byte[] getTemplates() { - return templates; - } - - public void setTemplates(byte[] templates) { - this.templates = templates; - } - public byte[] getSnippets() { return snippets; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java index 520b8eb9b2..bd3e69e0dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java @@ -31,7 +31,6 @@ public class DataFlowAdapter extends XmlAdapter getClusterNodeIds() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java index d7f69485a2..2063de4d49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java @@ -26,10 +26,10 @@ import java.util.regex.Pattern; import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.entity.FlowSnippetEntity; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.FlowEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; public class FlowSnippetEndpointMerger implements EndpointResponseMerger { public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); @@ -43,21 +43,21 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger { @Override public NodeResponse merge(final URI uri, final String method, Set successfulResponses, final Set problematicResponses, final NodeResponse clientResponse) { - final FlowSnippetEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class); - final FlowSnippetDTO contents = responseEntity.getContents(); + final FlowEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowEntity.class); + final FlowDTO flowDto = responseEntity.getFlow(); - if (contents == null) { + if (flowDto == null) { return clientResponse; } else { - final Map> processorMap = new HashMap<>(); - final Map> remoteProcessGroupMap = new HashMap<>(); + final Map> processorMap = new HashMap<>(); + final Map> remoteProcessGroupMap = new HashMap<>(); for (final NodeResponse nodeResponse : successfulResponses) { - final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class); - final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents(); + final FlowEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowEntity.class); + final FlowDTO nodeContents = nodeResponseEntity.getFlow(); - for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) { - Map innerMap = processorMap.get(nodeProcessor.getId()); + for (final ProcessorEntity nodeProcessor : nodeContents.getProcessors()) { + Map innerMap = processorMap.get(nodeProcessor.getId()); if (innerMap == null) { innerMap = new HashMap<>(); processorMap.put(nodeProcessor.getId(), innerMap); @@ -66,8 +66,8 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger { innerMap.put(nodeResponse.getNodeId(), nodeProcessor); } - for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) { - Map innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); + for (final RemoteProcessGroupEntity nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) { + Map innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); if (innerMap == null) { innerMap = new HashMap<>(); remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap); @@ -78,21 +78,19 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger { } final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); - for (final ProcessorDTO processor : contents.getProcessors()) { + for (final ProcessorEntity processor : flowDto.getProcessors()) { final String procId = processor.getId(); - final Map mergeMap = processorMap.get(procId); + final Map mergeMap = processorMap.get(procId); procMerger.mergeResponses(processor, mergeMap, successfulResponses, problematicResponses); } final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); - for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) { - if (remoteProcessGroup.getContents() != null) { - final String remoteProcessGroupId = remoteProcessGroup.getId(); - final Map mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); + for (final RemoteProcessGroupEntity remoteProcessGroup : flowDto.getRemoteProcessGroups()) { + final String remoteProcessGroupId = remoteProcessGroup.getId(); + final Map mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); - rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); - } + rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java index 6a040fa661..a892e8ad63 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java @@ -72,4 +72,18 @@ public class ProcessorEndpointMerger extends AbstractSingleEntityEndpoint entityMap, final Set successfulResponses, + final Set problematicResponses) { + + final ProcessorDTO clientDto = clientEntity.getComponent(); + final Map dtoMap = new HashMap<>(); + for (final Map.Entry entry : entityMap.entrySet()) { + final ProcessorEntity nodeProcEntity = entry.getValue(); + final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent(); + dtoMap.put(entry.getKey(), nodeProcDto); + } + + mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java index 94383ded2a..56636fbe62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.http.endpoints; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -113,4 +114,16 @@ public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoi clientDto.setAuthorizationIssues(mergedAuthorizationIssues); } } + + protected void mergeResponses(RemoteProcessGroupEntity clientEntity, Map entityMap, + Set successfulResponses, Set problematicResponses) { + + final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); + final Map dtoMap = new HashMap<>(); + for (final Map.Entry entry : entityMap.entrySet()) { + dtoMap.put(entry.getKey(), entry.getValue().getComponent()); + } + + mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java index 8435c60ed0..2fac89ee90 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java @@ -39,7 +39,7 @@ public class ResponseUtils { public static Set findLongResponseTimes(final AsyncClusterResponse response, final double stdDeviationMultiple) { final Set slowResponses = new HashSet<>(); - if (response.isOlderThan(2, TimeUnit.SECONDS)) { + if (response.isOlderThan(1, TimeUnit.SECONDS)) { // If the response is older than 2 seconds, determines if any node took a long time to respond. final Set completedIds = response.getCompletedNodeIdentifiers(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 3b71654b3c..d218af2806 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -228,17 +228,24 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); } + logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); + + // Update headers to indicate the current revision so that we can + // prevent multiple users changing the flow at the same time + final Map updatedHeaders = new HashMap<>(headers); + final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> UUID.randomUUID().toString()); + if (performVerification) { verifyState(method, uri.getPath()); } final int numRequests = responseMap.size(); if (numRequests >= MAX_CONCURRENT_REQUESTS) { + logger.debug("Cannot replicate request because there are {} outstanding HTTP Requests already", numRequests); throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); } // create the request objects and replicate to all nodes - final String requestId = UUID.randomUUID().toString(); final CompletionCallback completionCallback = clusterResponse -> onCompletedResponse(requestId); final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); @@ -249,10 +256,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { responseMap.put(requestId, response); } - // Update headers to indicate the current revision so that we can - // prevent multiple users changing the flow at the same time - final Map updatedHeaders = new HashMap<>(headers); - updatedHeaders.put(REQUEST_TRANSACTION_ID, UUID.randomUUID().toString()); + logger.debug("For Request ID {}, response object is {}", requestId, response); // setRevision(updatedHeaders); // if mutable request, we have to do a two-phase commit where we ask each node to verify @@ -262,6 +266,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // replicate the actual request. final boolean mutableRequest = isMutableRequest(method, uri.getPath()); if (mutableRequest && performVerification) { + logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); performVerification(nodeIds, method, uri, entity, updatedHeaders, response); return response; } @@ -309,13 +314,22 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { @Override public void onCompletion(final NodeResponse nodeResponse) { - // Add the node response to our collection. - nodeResponses.add(nodeResponse); + // Add the node response to our collection. We later need to know whether or + // not this is the last node response, so we add the response and then check + // the size within a synchronized block to ensure that those two things happen + // atomically. Otherwise, we could have multiple threads checking the sizes of + // the sets at the same time, which could result in multiple threads performing + // the 'all nodes are complete' logic. + final boolean allNodesResponded; + synchronized (nodeResponses) { + nodeResponses.add(nodeResponse); + allNodesResponded = nodeResponses.size() == numNodes; + } try { // If we have all of the node responses, then we can verify the responses // and if good replicate the original request to all of the nodes. - if (nodeResponses.size() == numNodes) { + if (allNodesResponded) { // Check if we have any requests that do not have a 150-Continue status code. final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java index 335c0ef5a4..7c99c932d7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@ -512,7 +512,7 @@ public class DataFlowDaoImpl implements DataFlowDao { clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes)); } - final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); + final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, snippetBytes); dataFlow.setAutoStartProcessors(autoStart); return new ClusterDataFlow(dataFlow, clusterMetadata == null ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes); @@ -543,11 +543,9 @@ public class DataFlowDaoImpl implements DataFlowDao { final DataFlow dataFlow = clusterDataFlow.getDataFlow(); if (dataFlow == null) { writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes()); - writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]); writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]); } else { writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow()); - writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates()); writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets()); } writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, clusterDataFlow.getControllerServices()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 7c2eabfb74..59d582b2af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -1957,10 +1957,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // disconnect problematic nodes if (!problematicNodeResponses.isEmpty()) { if (problematicNodeResponses.size() < nodeResponses.size()) { - logger.warn(String.format("The following nodes failed to process URI '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses)); - disconnectNodes(problematicNodeResponses, "Failed to process URI " + uriPath); + logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses)); + disconnectNodes(problematicNodeResponses, "Failed to process URI " + method + " " + uriPath); } else { - logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uriPath); + logger.warn("All nodes failed to process URI {} {}. As a result, no node will be disconnected from cluster", method, uriPath); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java index e526ea3301..34189acb17 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java @@ -145,7 +145,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowSingleNode() throws Exception { String flowStr = ""; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); @@ -165,7 +165,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowWithSameNodeIds() throws Exception { String flowStr = ""; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -193,7 +193,7 @@ public class DataFlowManagementServiceImplTest { String flowStr = ""; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -214,7 +214,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowWithConstantNodeIdChanging() throws Exception { String flowStr = ""; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -236,7 +236,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception { String flowStr = ""; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -259,7 +259,7 @@ public class DataFlowManagementServiceImplTest { public void testStopRequestedWhileRetrieving() throws Exception { String flowStr = ""; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); Set nodeIds = new HashSet<>(); for (int i = 0; i < 1000; i++) { nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false)); @@ -289,7 +289,7 @@ public class DataFlowManagementServiceImplTest { String flowStr = ""; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java index 57c1c30970..312e3b021a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java @@ -23,11 +23,6 @@ public interface DataFlow { */ public byte[] getFlow(); - /** - * @return the raw byte array of the templates - */ - public byte[] getTemplates(); - /** * @return the raw byte array of the snippets */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java new file mode 100644 index 0000000000..b9fd0cbf19 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationRequest; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; + +public class Template implements Authorizable { + + private final TemplateDTO dto; + private volatile ProcessGroup processGroup; + + public Template(final TemplateDTO dto) { + this.dto = dto; + } + + public String getIdentifier() { + return dto.getId(); + } + + /** + * Returns a TemplateDTO object that describes the contents of this Template + * + * @return template dto + */ + public TemplateDTO getDetails() { + return dto; + } + + public void setProcessGroup(final ProcessGroup group) { + this.processGroup = group; + } + + public ProcessGroup getProcessGroup() { + return processGroup; + } + + + @Override + public Authorizable getParentAuthorizable() { + return null; + } + + @Override + public Resource getResource() { + return ResourceFactory.getComponentResource(ResourceType.Template, dto.getId(), dto.getName()); + } + + private Set getAuthorizableComponents() { + return getAuthorizableComponents(processGroup); + } + + private Set getAuthorizableComponents(final ProcessGroup processGroup) { + final Set authComponents = new HashSet<>(); + final FlowSnippetDTO snippet = dto.getSnippet(); + + authComponents.add(processGroup); + + // If there is any component in the DTO that still exists in the flow, check its authorizations + for (final ConnectionDTO connectionDto : snippet.getConnections()) { + final Connection connection = processGroup.getConnection(connectionDto.getId()); + if (connection != null) { + authComponents.add(connection); + } + } + + // TODO: Authorize Controller Services + for (final ControllerServiceDTO service : snippet.getControllerServices()) { + } + + for (final LabelDTO labelDto : snippet.getLabels()) { + final Label label = processGroup.getLabel(labelDto.getId()); + if (label != null) { + authComponents.add(label); + } + } + + for (final ProcessorDTO processorDto : snippet.getProcessors()) { + final ProcessorNode procNode = processGroup.getProcessor(processorDto.getId()); + if (procNode != null) { + authComponents.add(procNode); + } + } + + for (final RemoteProcessGroupDTO groupDto : snippet.getRemoteProcessGroups()) { + final RemoteProcessGroup rpg = processGroup.getRemoteProcessGroup(groupDto.getId()); + if (rpg != null) { + authComponents.add(rpg); + } + } + + for (final ProcessGroupDTO groupDto : snippet.getProcessGroups()) { + final ProcessGroup group = processGroup.getProcessGroup(groupDto.getId()); + if (group != null) { + authComponents.addAll(getAuthorizableComponents(processGroup)); + } + } + + return authComponents; + } + + @Override + public void authorize(final Authorizer authorizer, final RequestAction action) throws AccessDeniedException { + final AuthorizationResult result = checkAuthorization(authorizer, action, true); + if (Result.Denied.equals(result)) { + final String explanation = result.getExplanation() == null ? "Access is denied" : result.getExplanation(); + throw new AccessDeniedException(explanation); + } + } + + @Override + public AuthorizationResult checkAuthorization(final Authorizer authorizer, final RequestAction action) { + return checkAuthorization(authorizer, action, false); + } + + private AuthorizationResult checkAuthorization(final Authorizer authorizer, final RequestAction action, final boolean accessAttempt) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // TODO - include user details context + + // build the request + final AuthorizationRequest request = new AuthorizationRequest.Builder() + .identity(user.getIdentity()) + .anonymous(user.isAnonymous()) + .accessAttempt(accessAttempt) + .action(action) + .resource(getResource()) + .build(); + + // perform the authorization + final AuthorizationResult result = authorizer.authorize(request); + + // verify the results + if (Result.ResourceNotFound.equals(result.getResult())) { + for (final Authorizable child : getAuthorizableComponents()) { + final AuthorizationResult childResult = child.checkAuthorization(authorizer, action); + if (Result.Denied.equals(childResult)) { + return childResult; + } + } + + return AuthorizationResult.denied(); + } else { + return result; + } + } + + @Override + public String toString() { + return "Template[id=" + getIdentifier() + ", Name=" + dto.getName() + "]"; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index e1f66ee4c5..c3a4c8e6d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -28,6 +28,7 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; +import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFile; @@ -768,4 +769,45 @@ public interface ProcessGroup extends Authorizable { * @throws IllegalStateException if the move is not valid at this time */ void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup); + + /** + * Adds the given template to this Process Group + * + * @param template the template to add + */ + void addTemplate(Template template); + + /** + * Removes the given template from the Process Group + * + * @param template the template to remove + */ + void removeTemplate(Template template); + + /** + * Returns the template with the given ID + * + * @param id the ID of the template + * @return the template with the given ID or null if no template + * exists in this Process Group with the given ID + */ + Template getTemplate(String id); + + /** + * @param id of the template + * @return the Template with the given ID, if it exists as a child or + * descendant of this ProcessGroup. This performs a recursive search of all + * descendant ProcessGroups + */ + Template findTemplate(String id); + + /** + * @return a Set of all Templates that belong to this Process Group + */ + Set