diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java index 590c9089e2..63a2895b59 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java @@ -50,7 +50,6 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpoint import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.stream.io.NullOutputStream; @@ -103,7 +102,6 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new SystemDiagnosticsEndpointMerger()); endpointMergers.add(new CountersEndpointMerger()); endpointMergers.add(new FlowMerger()); - endpointMergers.add(new TemplatesEndpointMerger()); } public StandardHttpResponseMerger() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java deleted file mode 100644 index a07289de88..0000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.cluster.coordination.http.endpoints; - -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.web.api.dto.TemplateDTO; -import org.apache.nifi.web.api.entity.TemplatesEntity; - -public class TemplatesEndpointMerger implements EndpointResponseMerger { - public static final Pattern TEMPLATES_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/templates"); - - @Override - public boolean canHandle(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && TEMPLATES_URI_PATTERN.matcher(uri.getPath()).matches(); - } - - protected Class getEntityClass() { - return TemplatesEntity.class; - } - - protected Set getDtos(final TemplatesEntity entity) { - return entity.getTemplates(); - } - - protected String getComponentId(final TemplateDTO dto) { - return dto.getId(); - } - - @Override - public final NodeResponse merge(final URI uri, final String method, final Set successfulResponses, final Set problematicResponses, final NodeResponse clientResponse) { - if (!canHandle(uri, method)) { - throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); - } - - final TemplatesEntity responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass()); - - // Find the templates that all nodes know about. We do this by mapping Template ID to Template and - // then for each node, removing any template whose ID is not known to that node. After iterating over - // all of the nodes, we are left with a Map whose contents are those Templates known by all nodes. - Map templatesById = null; - for (final NodeResponse nodeResponse : successfulResponses) { - final TemplatesEntity entity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(TemplatesEntity.class); - final Set templateDtos = entity.getTemplates(); - final Map nodeTemplatesById = templateDtos.stream().collect(Collectors.toMap(dto -> dto.getId(), dto -> dto)); - - if (templatesById == null) { - // Create new HashMap so that the map that we have is modifiable. - templatesById = new HashMap<>(nodeTemplatesById); - } else { - // Only keep templates that are known by this node. - templatesById.keySet().retainAll(nodeTemplatesById.keySet()); - } - } - - // Set the templates to the set of templates that all nodes know about - responseEntity.setTemplates(new HashSet<>(templatesById.values())); - - // create a new client response - return new NodeResponse(clientResponse, responseEntity); - } - -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 8a6be16328..7fedac0ad6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -142,6 +142,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { @Override public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { + // TODO - Include templates // handle corner cases involving no proposed flow if (proposedFlow == null) { @@ -284,20 +285,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); } - // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need - // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them. - final Document existingFlowConfiguration = parseFlowBytes(existingFlow); - if (existingFlowConfiguration != null) { - final Element existingRootElement = (Element) existingFlowConfiguration.getElementsByTagName("flowController").item(0); - if (existingRootElement != null) { - final Element existingRootGroupElement = (Element) existingRootElement.getElementsByTagName("rootGroup").item(0); - if (existingRootElement != null) { - final FlowEncodingVersion existingEncodingVersion = FlowEncodingVersion.parse(existingFlowConfiguration.getDocumentElement()); - addLocalTemplates(existingRootGroupElement, rootGroup, existingEncodingVersion); - } - } - } - final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); if (controllerServicesElement != null) { final List serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); @@ -359,29 +346,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) { - // Replace the templates with those from the proposed flow - final List templateNodeList = getChildrenByTagName(processGroupElement, "template"); - if (templateNodeList != null) { - for (final Element templateElement : templateNodeList) { - final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement); - final Template template = new Template(templateDto); - - // If the Process Group does not have the template, add it. - if (processGroup.getTemplate(template.getIdentifier()) == null) { - processGroup.addTemplate(template); - } - } - } - - final List childGroupElements = getChildrenByTagName(processGroupElement, "processGroup"); - for (final Element childGroupElement : childGroupElements) { - final String childGroupId = getString(childGroupElement, "id"); - final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId); - addLocalTemplates(childGroupElement, childGroup, encodingVersion); - } - } - void scaleRootGroup(final ProcessGroup rootGroup, final FlowEncodingVersion encodingVersion) { if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) { // Calculate new Positions if the encoding version of the flow is older than 1.0. @@ -771,17 +735,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // Replace the templates with those from the proposed flow final List templateNodeList = getChildrenByTagName(processGroupElement, "template"); + for (final Template template : processGroup.getTemplates()) { + processGroup.removeTemplate(template); + } for (final Element templateElement : templateNodeList) { final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement); final Template template = new Template(templateDto); - - // If the Process Group already has the template, remove it and add it again. We do this - // to ensure that all of the nodes have the same view of the template. Templates are immutable, - // so any two nodes that have a template with the same ID should have the exact same template. - // This just makes sure that they do. - if (processGroup.getTemplate(template.getIdentifier()) != null) { - processGroup.removeTemplate(template); - } processGroup.addTemplate(template); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 5047683332..ffe212d4de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.persistence; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -79,9 +80,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor); controller.synchronize(flowSynchronizer, dataFlow); - - // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates. - save(controller); + save(new ByteArrayInputStream(dataFlow.getFlow())); } @Override