From 329192a7d3cee0689d8d438e8a452d52f93dd933 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 30 Jun 2016 10:48:46 -0400 Subject: [PATCH] NIFI-1413: Ensure that if a node's templates don't match the clusters that we take the following actions: -Local templates remain but aren't shown in the cluster's templates. -Any templates from the cluster that don't exist on the node are added to the node. -Any conflicting template definitions are replaced by those in the cluster This closes #596 --- .../http/StandardHttpResponseMerger.java | 2 + .../endpoints/TemplatesEndpointMerger.java | 86 +++++++++++++++++++ .../controller/StandardFlowSynchronizer.java | 49 ++++++++++- .../StandardXMLFlowConfigurationDAO.java | 5 +- 4 files changed, 136 insertions(+), 6 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java index 8381192d23..8ff321d533 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java @@ -51,6 +51,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpoint import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.stream.io.NullOutputStream; @@ -104,6 +105,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new SystemDiagnosticsEndpointMerger()); endpointMergers.add(new CountersEndpointMerger()); endpointMergers.add(new FlowMerger()); + endpointMergers.add(new TemplatesEndpointMerger()); } public StandardHttpResponseMerger() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java new file mode 100644 index 0000000000..a07289de88 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.web.api.dto.TemplateDTO; +import org.apache.nifi.web.api.entity.TemplatesEntity; + +public class TemplatesEndpointMerger implements EndpointResponseMerger { + public static final Pattern TEMPLATES_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/templates"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && TEMPLATES_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + protected Class getEntityClass() { + return TemplatesEntity.class; + } + + protected Set getDtos(final TemplatesEntity entity) { + return entity.getTemplates(); + } + + protected String getComponentId(final TemplateDTO dto) { + return dto.getId(); + } + + @Override + public final NodeResponse merge(final URI uri, final String method, final Set successfulResponses, final Set problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final TemplatesEntity responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass()); + + // Find the templates that all nodes know about. We do this by mapping Template ID to Template and + // then for each node, removing any template whose ID is not known to that node. After iterating over + // all of the nodes, we are left with a Map whose contents are those Templates known by all nodes. + Map templatesById = null; + for (final NodeResponse nodeResponse : successfulResponses) { + final TemplatesEntity entity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(TemplatesEntity.class); + final Set templateDtos = entity.getTemplates(); + final Map nodeTemplatesById = templateDtos.stream().collect(Collectors.toMap(dto -> dto.getId(), dto -> dto)); + + if (templatesById == null) { + // Create new HashMap so that the map that we have is modifiable. + templatesById = new HashMap<>(nodeTemplatesById); + } else { + // Only keep templates that are known by this node. + templatesById.keySet().retainAll(nodeTemplatesById.keySet()); + } + } + + // Set the templates to the set of templates that all nodes know about + responseEntity.setTemplates(new HashSet<>(templatesById.values())); + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 7fedac0ad6..8a6be16328 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -142,7 +142,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { @Override public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { - // TODO - Include templates // handle corner cases involving no proposed flow if (proposedFlow == null) { @@ -285,6 +284,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); } + // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need + // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them. + final Document existingFlowConfiguration = parseFlowBytes(existingFlow); + if (existingFlowConfiguration != null) { + final Element existingRootElement = (Element) existingFlowConfiguration.getElementsByTagName("flowController").item(0); + if (existingRootElement != null) { + final Element existingRootGroupElement = (Element) existingRootElement.getElementsByTagName("rootGroup").item(0); + if (existingRootElement != null) { + final FlowEncodingVersion existingEncodingVersion = FlowEncodingVersion.parse(existingFlowConfiguration.getDocumentElement()); + addLocalTemplates(existingRootGroupElement, rootGroup, existingEncodingVersion); + } + } + } + final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); if (controllerServicesElement != null) { final List serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); @@ -346,6 +359,29 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) { + // Replace the templates with those from the proposed flow + final List templateNodeList = getChildrenByTagName(processGroupElement, "template"); + if (templateNodeList != null) { + for (final Element templateElement : templateNodeList) { + final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement); + final Template template = new Template(templateDto); + + // If the Process Group does not have the template, add it. + if (processGroup.getTemplate(template.getIdentifier()) == null) { + processGroup.addTemplate(template); + } + } + } + + final List childGroupElements = getChildrenByTagName(processGroupElement, "processGroup"); + for (final Element childGroupElement : childGroupElements) { + final String childGroupId = getString(childGroupElement, "id"); + final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId); + addLocalTemplates(childGroupElement, childGroup, encodingVersion); + } + } + void scaleRootGroup(final ProcessGroup rootGroup, final FlowEncodingVersion encodingVersion) { if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) { // Calculate new Positions if the encoding version of the flow is older than 1.0. @@ -735,12 +771,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // Replace the templates with those from the proposed flow final List templateNodeList = getChildrenByTagName(processGroupElement, "template"); - for (final Template template : processGroup.getTemplates()) { - processGroup.removeTemplate(template); - } for (final Element templateElement : templateNodeList) { final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement); final Template template = new Template(templateDto); + + // If the Process Group already has the template, remove it and add it again. We do this + // to ensure that all of the nodes have the same view of the template. Templates are immutable, + // so any two nodes that have a template with the same ID should have the exact same template. + // This just makes sure that they do. + if (processGroup.getTemplate(template.getIdentifier()) != null) { + processGroup.removeTemplate(template); + } processGroup.addTemplate(template); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 707efa2d0a..8b0d18f064 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.persistence; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -80,7 +79,9 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor); controller.synchronize(flowSynchronizer, dataFlow); - save(new ByteArrayInputStream(dataFlow.getFlow())); + + // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates. + save(controller); } @Override