NIFI-1908 Added encoding-version attribute to TemplateDTO

added scaling of templates upon instantiation for placement on the canvas
added template-0.7.0.xml for live-testing the import of templates

Fixing issue with potentially uninitialized RemoteGroupPorts in copySnippet.

This closes #471
This commit is contained in:
Jeff Storck 2016-05-20 16:32:33 -04:00 committed by Matt Gilman
parent b075f238a5
commit 893daf567d
10 changed files with 6618 additions and 439 deletions

View File

@ -17,16 +17,19 @@
package org.apache.nifi.web.api.dto; package org.apache.nifi.web.api.dto;
import com.wordnik.swagger.annotations.ApiModelProperty; import com.wordnik.swagger.annotations.ApiModelProperty;
import java.util.Date; import org.apache.nifi.web.api.dto.util.DateTimeAdapter;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.web.api.dto.util.DateTimeAdapter; import java.util.Date;
/** /**
* Defines a template. * Defines a template.
*/ */
@XmlRootElement(name = "template") @XmlRootElement(name = "template")
public class TemplateDTO { public class TemplateDTO {
public static final String MAX_ENCODING_VERSION = "1.0";
private String uri; private String uri;
@ -35,6 +38,7 @@ public class TemplateDTO {
private String name; private String name;
private String description; private String description;
private Date timestamp; private Date timestamp;
private String encodingVersion;
private FlowSnippetDTO snippet; private FlowSnippetDTO snippet;
@ -117,6 +121,22 @@ public class TemplateDTO {
this.timestamp = timestamp; this.timestamp = timestamp;
} }
/**
* encodingVersion needs to be updated if the {@link TemplateDTO} changes.
* @return encoding version of this template.
*/
@XmlAttribute(name= "encoding-version")
@ApiModelProperty(
value = "The encoding version of this template."
)
public String getEncodingVersion() {
return encodingVersion;
}
public void setEncodingVersion(String encodingVersion) {
this.encodingVersion = encodingVersion;
}
/** /**
* @return snippet in this template * @return snippet in this template
*/ */

View File

@ -16,41 +16,7 @@
*/ */
package org.apache.nifi.controller; package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull; import com.sun.jersey.api.client.ClientHandlerException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action; import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.admin.service.AuditService;
@ -227,7 +193,39 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.ClientHandlerException; import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable { public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable {
@ -247,6 +245,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow"; public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
public static final String PRIMARY_NODE_ROLE_NAME = "primary-node"; public static final String PRIMARY_NODE_ROLE_NAME = "primary-node";
// default properties for scaling the positions of components from pre-1.0 flow encoding versions.
public static final double DEFAULT_POSITION_SCALE_FACTOR_X = 1.5;
public static final double DEFAULT_POSITION_SCALE_FACTOR_Y = 1.34;
private final AtomicInteger maxTimerDrivenThreads; private final AtomicInteger maxTimerDrivenThreads;
private final AtomicInteger maxEventDrivenThreads; private final AtomicInteger maxEventDrivenThreads;
private final AtomicReference<FlowEngine> timerDrivenEngineRef; private final AtomicReference<FlowEngine> timerDrivenEngineRef;
@ -1335,7 +1337,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/** /**
* Synchronizes this controller with the proposed flow. * Synchronizes this controller with the proposed flow.
* *
* For more details, see {@link FlowSynchronizer#sync(FlowController, DataFlow)}. * For more details, see {@link FlowSynchronizer#sync(FlowController, DataFlow, StringEncryptor)}.
* *
* @param synchronizer synchronizer * @param synchronizer synchronizer
* @param dataFlow the flow to load the controller with. If the flow is null or zero length, then the controller must not have a flow or else an UninheritableFlowException will be thrown. * @param dataFlow the flow to load the controller with. If the flow is null or zero length, then the controller must not have a flow or else an UninheritableFlowException will be thrown.

View File

@ -74,7 +74,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
@ -2149,7 +2148,6 @@ public final class StandardProcessGroup implements ProcessGroup {
positionables.addAll(findAllConnectables(this, true)); positionables.addAll(findAllConnectables(this, true));
List<ProcessGroup> allProcessGroups = findAllProcessGroups(); List<ProcessGroup> allProcessGroups = findAllProcessGroups();
positionables.addAll(allProcessGroups); positionables.addAll(allProcessGroups);
positionables.addAll(allProcessGroups.stream().flatMap(processGroup -> processGroup.findAllPositionables().stream()).collect(Collectors.toSet()));
positionables.addAll(findAllRemoteProcessGroups()); positionables.addAll(findAllRemoteProcessGroups());
positionables.addAll(findAllLabels()); positionables.addAll(findAllLabels());
return positionables; return positionables;

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.nifi.util; package org.apache.nifi.util;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -24,24 +30,34 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
/** /**
* Utility class for moving Snippets. * Utility class for moving Snippets.
*/ */
public final class SnippetUtils { public final class SnippetUtils {
/** /**
* Moves the content of the specified template around the specified location. * Moves the content of the specified snippet around the specified location. Does not scale components in child process groups.
* *
* @param snippet snippet * @param snippet snippet
* @param x x location * @param x x location
* @param y y location * @param y y location
*/ */
public static void moveSnippet(FlowSnippetDTO snippet, Double x, Double y) { public static void moveSnippet(FlowSnippetDTO snippet, Double x, Double y) {
moveAndScaleSnippet(snippet, x, y, 1.0, 1.0);
}
/**
* Moves the content of the specified snippet around the specified location
* and scales the placement of individual components of the template by the
* given factorX and factorY. Does not scale components in child process groups.
*
* @param snippet snippet
* @param x x location
* @param y y location
* @param factorX x location scaling factor
* @param factorY y location scaling factor
*/
public static void moveAndScaleSnippet(FlowSnippetDTO snippet, Double x, Double y, double factorX, double factorY) {
// ensure the point is specified // ensure the point is specified
if (x != null && y != null) { if (x != null && y != null) {
final PositionDTO origin = new PositionDTO(x, y); final PositionDTO origin = new PositionDTO(x, y);
@ -64,15 +80,15 @@ public final class SnippetUtils {
// adjust all component positions // adjust all component positions
for (final PositionDTO position : componentPositionLookup.values()) { for (final PositionDTO position : componentPositionLookup.values()) {
position.setX(origin.getX() + (position.getX() - currentOrigin.getX())); position.setX(origin.getX() + ((position.getX() - currentOrigin.getX()) * factorX));
position.setY(origin.getY() + (position.getY() - currentOrigin.getY())); position.setY(origin.getY() + ((position.getY() - currentOrigin.getY()) * factorY));
} }
// adjust all connection positions // adjust all connection positions
for (final List<PositionDTO> bends : connectionPositionLookup.values()) { for (final List<PositionDTO> bends : connectionPositionLookup.values()) {
for (final PositionDTO bend : bends) { for (final PositionDTO bend : bends) {
bend.setX(origin.getX() + (bend.getX() - currentOrigin.getX())); bend.setX(origin.getX() + ((bend.getX() - currentOrigin.getX()) * factorX));
bend.setY(origin.getY() + (bend.getY() - currentOrigin.getY())); bend.setY(origin.getY() + ((bend.getY() - currentOrigin.getY()) * factorY));
} }
} }
@ -81,6 +97,61 @@ public final class SnippetUtils {
} }
} }
/**
* Scales the placement of individual components of the snippet by the
* given factorX and factorY. Does not scale components in child process groups.
*
* @param snippet snippet
* @param factorX x location scaling factor
* @param factorY y location scaling factor
*/
public static void scaleSnippet(FlowSnippetDTO snippet, double factorX, double factorY) {
// get the connections
final Collection<ConnectionDTO> connections = getConnections(snippet);
// get the components and their positions from the template contents
final Collection<ComponentDTO> components = getComponents(snippet);
// only perform the operation if there are components in this snippet
if (connections.isEmpty() && components.isEmpty()) {
return;
}
// get the component positions from the snippet contents
final Map<ComponentDTO, PositionDTO> componentPositionLookup = getPositionLookup(components);
final Map<ConnectionDTO, List<PositionDTO>> connectionPositionLookup = getConnectionPositionLookup(connections);
// adjust all component positions
for (final PositionDTO position : componentPositionLookup.values()) {
position.setX(position.getX() * factorX);
position.setY(position.getY() * factorY);
}
// adjust all connection positions
for (final List<PositionDTO> bends : connectionPositionLookup.values()) {
for (final PositionDTO bend : bends) {
bend.setX(bend.getX() * factorX);
bend.setY(bend.getY() * factorY);
}
}
// apply the updated positions
applyUpdatedPositions(componentPositionLookup, connectionPositionLookup);
}
/**
* Finds all {@link ProcessGroupDTO}s in the given {@link FlowSnippetDTO}.
* @param snippet containing the child {@link ProcessGroupDTO}s to be returned
* @return List of child {@link ProcessGroupDTO}s found in the given {@link FlowSnippetDTO}.
*/
public static List<ProcessGroupDTO> findAllProcessGroups(FlowSnippetDTO snippet) {
final List<ProcessGroupDTO> allProcessGroups = new ArrayList<>(snippet.getProcessGroups());
for (final ProcessGroupDTO childGroup : snippet.getProcessGroups()) {
allProcessGroups.addAll(findAllProcessGroups(childGroup.getContents()));
}
return allProcessGroups;
}
/** /**
* Gets all connections that are part of the specified template. * Gets all connections that are part of the specified template.
* *

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util
import org.apache.nifi.web.api.dto.*
import spock.lang.Specification
import spock.lang.Unroll
class SnippetUtilsSpec extends Specification {
@Unroll
def "test moveAndScaleSnippet"() {
given:
def Map<ComponentDTO, PositionDTO> positions = (snippet.connections + snippet.inputPorts + snippet.outputPorts + snippet.labels + snippet.processGroups + snippet.processGroups +
snippet.processors + snippet.funnels + snippet.remoteProcessGroups).collectEntries { ComponentDTO component ->
[(component): new PositionDTO(component.position.x, component.position.y)]
}
when:
SnippetUtils.moveAndScaleSnippet(snippet, nX, nY, fX, fY)
then:
positions.entrySet().forEach {
def expectedPosition = calculateMoveAndScalePosition(it.value, oX, oY, nX, nY, fX, fY)
def positionScaledBySnippetUtils = it.key.getPosition()
assert positionScaledBySnippetUtils.x == expectedPosition.x
assert positionScaledBySnippetUtils.y == expectedPosition.y
}
where:
nX | nY | oX | oY | fX | fY | snippet
500 | 500 | 0 | 0 | 1.5 | 1.34 | new FlowSnippetDTO()
500 | 500 | 10 | 10 | 1.5 | 1.34 | new FlowSnippetDTO(processors: [new ProcessorDTO(position: new PositionDTO(x: 10, y: 10))])
}
@Unroll
def "test scaleSnippet"() {
given:
def Map<ComponentDTO, PositionDTO> positions = (snippet.connections + snippet.inputPorts + snippet.outputPorts + snippet.labels + snippet.processGroups + snippet.processGroups +
snippet.processors + snippet.funnels + snippet.remoteProcessGroups).collectEntries { ComponentDTO component ->
[(component): new PositionDTO(component.position.x, component.position.y)]
}
when:
SnippetUtils.scaleSnippet(snippet, fX, fY)
then:
positions.entrySet().forEach {
def expectedPosition = calculateScalePosition(it.value, fX, fY)
def positionScaledBySnippetUtils = it.key.getPosition()
assert positionScaledBySnippetUtils.x == expectedPosition.x
assert positionScaledBySnippetUtils.y == expectedPosition.y
}
where:
fX | fY | snippet
1.5 | 1.34 | new FlowSnippetDTO()
1.5 | 1.34 | new FlowSnippetDTO(
processors: [new ProcessorDTO(position: new PositionDTO(x: 10, y: 10))],
processGroups: [
new ProcessGroupDTO(position: new PositionDTO(x: 105, y: -10), name: 'pg2',
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc1', position: new PositionDTO(x: 50, y: 60))]))])
}
def PositionDTO calculateMoveAndScalePosition(position, oldOriginX, oldOriginY, newOriginX, newOriginY, factorX, factorY) {
new PositionDTO(
x: newOriginX + (position.x - oldOriginX) * factorX,
y: newOriginY + (position.y - oldOriginY) * factorY)
}
def PositionDTO calculateScalePosition(position, factorX, factorY) {
new PositionDTO(
x: position.x * factorX,
y: position.y * factorY)
}
}

View File

@ -350,5 +350,15 @@
<version>1.0.1.RELEASE</version> <version>1.0.1.RELEASE</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.spockframework</groupId>
<artifactId>spock-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -663,10 +663,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
* @param revision the current revision * @param revision the current revision
* @param daoUpdate a Supplier that will update the component via the appropriate DAO * @param daoUpdate a Supplier that will update the component via the appropriate DAO
* @param dtoCreation a Function to convert a component into a dao * @param dtoCreation a Function to convert a component into a dao
*
* @param <D> the DTO Type of the updated component * @param <D> the DTO Type of the updated component
* @param <C> the Component Type of the updated component * @param <C> the Component Type of the updated component
*
* @return A RevisionUpdate that represents the new configuration * @return A RevisionUpdate that represents the new configuration
*/ */
private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
@ -1252,10 +1250,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
* @param componentDto the DTO that will be used to create the component * @param componentDto the DTO that will be used to create the component
* @param daoCreation A Supplier that will create the NiFi Component to use * @param daoCreation A Supplier that will create the NiFi Component to use
* @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO
*
* @param <D> the DTO Type * @param <D> the DTO Type
* @param <C> the NiFi Component Type * @param <C> the NiFi Component Type
*
* @return a RevisionUpdate that represents the updated configuration * @return a RevisionUpdate that represents the updated configuration
*/ */
private <D, C> RevisionUpdate<D> createComponent(final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) { private <D, C> RevisionUpdate<D> createComponent(final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) {
@ -1281,7 +1277,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
} }
@Override @Override
public FunnelEntity createFunnel(final String groupId, final FunnelDTO funnelDTO) { public FunnelEntity createFunnel(final String groupId, final FunnelDTO funnelDTO) {
final RevisionUpdate<FunnelDTO> snapshot = createComponent( final RevisionUpdate<FunnelDTO> snapshot = createComponent(
@ -1358,50 +1353,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// create the new snippet // create the new snippet
final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);
// validate the new snippet
validateSnippetContents(snippet);
// save the flow // save the flow
controllerFacade.save(); controllerFacade.save();
// drop the snippet // drop the snippet
snippetDAO.dropSnippet(snippetId); snippetDAO.dropSnippet(snippetId);
// identify all components added // post process new flow snippet
final Set<String> identifiers = new HashSet<>(); return postProcessNewFlowSnippet(groupId, snippet);
snippet.getProcessors().stream()
.map(proc -> proc.getId())
.forEach(id -> identifiers.add(id));
snippet.getConnections().stream()
.map(conn -> conn.getId())
.forEach(id -> identifiers.add(id));
snippet.getInputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getOutputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getProcessGroups().stream()
.map(group -> group.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.map(remoteGroup -> remoteGroup.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
.map(remoteInputPort -> remoteInputPort.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
.map(remoteOutputPort -> remoteOutputPort.getId())
.forEach(id -> identifiers.add(id));
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
return revisionManager.get(identifiers,
() -> {
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
return dtoFactory.createFlowDto(processGroup, groupStatus, snippet, revisionManager);
});
}); });
final FlowEntity flowEntity = new FlowEntity(); final FlowEntity flowEntity = new FlowEntity();
@ -1493,6 +1452,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
templateDTO.setDescription(description); templateDTO.setDescription(description);
templateDTO.setTimestamp(new Date()); templateDTO.setTimestamp(new Date());
templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true)); templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true));
templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION);
// set the id based on the specified seed // set the id based on the specified seed
final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
@ -1523,19 +1483,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return dtoFactory.createTemplateDTO(template); return dtoFactory.createTemplateDTO(template);
} }
@Override /**
public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) { * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion.
final FlowDTO flowDto = revisionManager.get(groupId, rev -> { *
// instantiate the template - there is no need to make another copy of the flow snippet since the actual template * @param groupId group id
// was copied and this dto is only used to instantiate it's components (which as already completed) * @param snippet snippet
final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed); * @return flow dto
*/
private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) {
// validate the new snippet // validate the new snippet
validateSnippetContents(snippet); validateSnippetContents(snippet);
// save the flow
controllerFacade.save();
// identify all components added // identify all components added
final Set<String> identifiers = new HashSet<>(); final Set<String> identifiers = new HashSet<>();
snippet.getProcessors().stream() snippet.getProcessors().stream()
@ -1557,13 +1515,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
.map(remoteGroup -> remoteGroup.getId()) .map(remoteGroup -> remoteGroup.getId())
.forEach(id -> identifiers.add(id)); .forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream() snippet.getRemoteProcessGroups().stream()
.filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
.flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
.map(remoteInputPort -> remoteInputPort.getId()) .map(remoteInputPort -> remoteInputPort.getId())
.forEach(id -> identifiers.add(id)); .forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream() snippet.getRemoteProcessGroups().stream()
.filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
.flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
.map(remoteOutputPort -> remoteOutputPort.getId()) .map(remoteOutputPort -> remoteOutputPort.getId())
.forEach(id -> identifiers.add(id)); .forEach(id -> identifiers.add(id));
snippet.getLabels().stream()
.map(label -> label.getId())
.forEach(id -> identifiers.add(id));
return revisionManager.get(identifiers, return revisionManager.get(identifiers,
() -> { () -> {
@ -1571,6 +1534,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager); return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager);
}); });
}
@Override
public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) {
final FlowDTO flowDto = revisionManager.get(groupId, rev -> {
// instantiate the template - there is no need to make another copy of the flow snippet since the actual template
// was copied and this dto is only used to instantiate it's components (which as already completed)
final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed);
// save the flow
controllerFacade.save();
// post process the new flow snippet
return postProcessNewFlowSnippet(groupId, snippet);
}); });
final FlowEntity flowEntity = new FlowEntity(); final FlowEntity flowEntity = new FlowEntity();

View File

@ -16,22 +16,25 @@
*/ */
package org.apache.nifi.web.dao.impl; package org.apache.nifi.web.dao.impl;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.Template; import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.TemplateUtils; import org.apache.nifi.controller.TemplateUtils;
import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.dao.TemplateDAO; import org.apache.nifi.web.dao.TemplateDAO;
import org.apache.nifi.web.util.SnippetUtils; import org.apache.nifi.web.util.SnippetUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/** /**
* *
*/ */
@ -88,8 +91,23 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
TemplateDTO templateDetails = template.getDetails(); TemplateDTO templateDetails = template.getDetails();
FlowSnippetDTO snippet = snippetUtils.copy(templateDetails.getSnippet(), group, idGenerationSeed); FlowSnippetDTO snippet = snippetUtils.copy(templateDetails.getSnippet(), group, idGenerationSeed);
// reposition the template contents // calculate scaling factors based on the template encoding version
org.apache.nifi.util.SnippetUtils.moveSnippet(snippet, originX, originY); // attempt to parse the encoding version
final FlowEncodingVersion templateEncodingVersion = FlowEncodingVersion.parse(templateDetails.getEncodingVersion());
// get the major version, or 0 if no version could be parsed
int templateEncodingMajorVersion = templateEncodingVersion != null ? templateEncodingVersion.getMajorVersion() : 0;
// based on the major version < 1, use the default scaling factors. Otherwise, don't scale (use factor of 1.0)
double factorX = templateEncodingMajorVersion < 1 ? FlowController.DEFAULT_POSITION_SCALE_FACTOR_X : 1.0;
double factorY = templateEncodingMajorVersion < 1 ? FlowController.DEFAULT_POSITION_SCALE_FACTOR_Y : 1.0;
// reposition and scale the template contents
org.apache.nifi.util.SnippetUtils.moveAndScaleSnippet(snippet, originX, originY, factorX, factorY);
// find all the child process groups in each process group in the top level of this snippet
final List<ProcessGroupDTO> childProcessGroups = org.apache.nifi.util.SnippetUtils.findAllProcessGroups(snippet);
// scale (but don't reposition) child process groups
childProcessGroups.stream().forEach(processGroup -> org.apache.nifi.util.SnippetUtils.scaleSnippet(processGroup.getContents(), factorX, factorY));
// instantiate the template into this group // instantiate the template into this group
flowController.instantiateSnippet(group, snippet); flowController.instantiateSnippet(group, snippet);

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl
import org.apache.nifi.authorization.Authorizer
import org.apache.nifi.controller.FlowController
import org.apache.nifi.controller.Template
import org.apache.nifi.controller.serialization.FlowEncodingVersion
import org.apache.nifi.controller.service.ControllerServiceProvider
import org.apache.nifi.groups.ProcessGroup
import org.apache.nifi.web.api.dto.*
import org.apache.nifi.web.util.SnippetUtils
import spock.lang.Specification
import spock.lang.Unroll
class StandardTemplateDAOSpec extends Specification {
@Unroll
def "test InstantiateTemplate moves and scales templates"() {
given:
def flowController = Mock FlowController
def snippetUtils = new SnippetUtils()
snippetUtils.flowController = flowController
def dtoFactory = new DtoFactory()
dtoFactory.authorizer = Mock Authorizer
dtoFactory.controllerServiceProvider = Mock ControllerServiceProvider
snippetUtils.dtoFactory = dtoFactory
def standardTemplateDAO = new StandardTemplateDAO()
standardTemplateDAO.flowController = flowController
standardTemplateDAO.snippetUtils = snippetUtils
def templateEncodingVersion = FlowEncodingVersion.parse(encodingVersion);
// get the major version, or 0 if no version could be parsed
int templateEncodingMajorVersion = templateEncodingVersion != null ? templateEncodingVersion.getMajorVersion() : 0;
double factorX = templateEncodingMajorVersion < 1 ? FlowController.DEFAULT_POSITION_SCALE_FACTOR_X : 1.0;
double factorY = templateEncodingMajorVersion < 1 ? FlowController.DEFAULT_POSITION_SCALE_FACTOR_Y : 1.0;
// get all top-level component starting positions
def List<ComponentDTO> components = [snippet.connections + snippet.inputPorts + snippet.outputPorts + snippet.labels + snippet.processGroups + snippet.processGroups +
snippet.processors + snippet.funnels + snippet.remoteProcessGroups].flatten()
// get all starting subcomponent starting positions
def List<ComponentDTO> subComponents = org.apache.nifi.util.SnippetUtils.findAllProcessGroups(snippet).collect { ProcessGroupDTO processGroup ->
def childSnippet = processGroup.contents
childSnippet.connections + childSnippet.inputPorts + childSnippet.outputPorts + childSnippet.labels + childSnippet.processGroups + childSnippet.processGroups +
childSnippet.processors + childSnippet.funnels + childSnippet.remoteProcessGroups
}.flatten()
when:
def instantiatedTemplate = standardTemplateDAO.instantiateTemplate(rootGroupId, newOriginX, newOriginY, templateId, idGenerationSeed)
then:
flowController.getGroup(_) >> { String gId ->
def pg = Mock ProcessGroup
pg.identifier >> gId
pg.findTemplate(templateId) >> { tId ->
def t = Mock Template
t.getDetails() >> { tDetails ->
def td = Mock TemplateDTO
td.snippet >> snippet
td.encodingVersion >> encodingVersion
return td
}
return t
}
pg.inputPorts >> []
pg.outputPorts >> []
pg.processGroups >> []
return pg
}
flowController.rootGroupId >> rootGroupId
flowController.instantiateSnippet(*_) >> {}
0 * _
def instantiatedComponents = [instantiatedTemplate.connections + instantiatedTemplate.inputPorts + instantiatedTemplate.outputPorts + instantiatedTemplate.labels +
instantiatedTemplate.processGroups + instantiatedTemplate.processGroups + instantiatedTemplate.processors + instantiatedTemplate.funnels +
instantiatedTemplate.remoteProcessGroups].flatten()
components.forEach { component ->
def correspondingScaledPosition = instantiatedComponents.find { scaledComponent ->
scaledComponent.name.equals(component.name)
}.position
assert correspondingScaledPosition != null
def expectedPosition = calculateMoveAndScalePosition(component.position, oldOriginX, oldOriginY, newOriginX, newOriginY, factorX, factorY)
assert correspondingScaledPosition.x == expectedPosition.x
assert correspondingScaledPosition.y == expectedPosition.y
}
def instantiatedSubComponents = org.apache.nifi.util.SnippetUtils.findAllProcessGroups(instantiatedTemplate).collect { ProcessGroupDTO processGroup ->
def childSnippet = processGroup.contents
childSnippet.connections + childSnippet.inputPorts + childSnippet.outputPorts + childSnippet.labels + childSnippet.processGroups + childSnippet.processGroups +
childSnippet.processors + childSnippet.funnels + childSnippet.remoteProcessGroups
}.flatten()
subComponents.forEach { subComponent ->
def correspondingScaledPosition = instantiatedSubComponents.find { scaledComponent ->
scaledComponent.name.equals(subComponent.name)
}.position
assert correspondingScaledPosition != null
def expectedPosition = calculateScalePosition(subComponent.position, factorX, factorY)
assert correspondingScaledPosition.x == expectedPosition.x
assert correspondingScaledPosition.y == expectedPosition.y
}
where:
rootGroupId | oldOriginX | oldOriginY | newOriginX | newOriginY | templateId | idGenerationSeed | encodingVersion | snippet
'g1' | 0.0 | 0.0 | 5.0 | 5.0 | 't1' | 'AAAA' | null | new FlowSnippetDTO()
'g1' | 10.0 | 10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '0.7' | new FlowSnippetDTO(
processors: [new ProcessorDTO(name: 'proc1', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))])
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | null | new FlowSnippetDTO(
processors: [new ProcessorDTO(name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
processGroups: [
new ProcessGroupDTO(
name: 'g2',
position: new PositionDTO(x: 105, y: -10),
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '0.7' | new FlowSnippetDTO(
processors: [new ProcessorDTO(name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
processGroups: [
new ProcessGroupDTO(
name: 'g2',
position: new PositionDTO(x: 105, y: -10),
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '1.0' | new FlowSnippetDTO(
processors: [new ProcessorDTO(name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
processGroups: [
new ProcessGroupDTO(
name: 'g2',
position: new PositionDTO(x: 105, y: -10),
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
}
def PositionDTO calculateMoveAndScalePosition(position, oldOriginX, oldOriginY, newOriginX, newOriginY, factorX, factorY) {
new PositionDTO(
x: newOriginX + (position.x - oldOriginX) * factorX,
y: newOriginY + (position.y - oldOriginY) * factorY)
}
def PositionDTO calculateScalePosition(position, factorX, factorY) {
new PositionDTO(
x: position.x * factorX,
y: position.y * factorY)
}
}