mirror of https://github.com/apache/nifi.git
NIFI-1799 Implements auto-scaling flow layout
added utility class to scale positions of components on the canvas, extracted get/setPosition methods from ProcesGroup, RemoteProcessGroup, Label, and Connectable into new interface Positionable added interface method for finding all Positionables in a ProcessGroup to the ProcessGroup interface and added implementation to StandardProcessGroup added test flow for position rescaling added Spock config to POM and a spec for testing the scaling of Positionables forced Surefire to use JUnit (TestNG was on classpath and Surefire seems to prioritize that over JUnit), added check in StandardFlowSynchronizer to scale positions only when flow encoding version is less than 1.0 added spec for StandardFlowfileSynchronizer updated FlowConfiguration.xsd to allow encoding-version attribute added new test flow used in StandardFlowSynchronizerSpec This closes #442. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
7923fd04c3
commit
433db23567
|
@ -32,7 +32,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
|
|||
/**
|
||||
* Represents a connectable component to which or from which data can flow.
|
||||
*/
|
||||
public interface Connectable extends Triggerable, Authorizable {
|
||||
public interface Connectable extends Triggerable, Authorizable, Positionable {
|
||||
|
||||
/**
|
||||
* @return the unique identifier for this <code>Connectable</code>
|
||||
|
@ -106,18 +106,6 @@ public interface Connectable extends Triggerable, Authorizable {
|
|||
*/
|
||||
Set<Connection> getConnections(Relationship relationship);
|
||||
|
||||
/**
|
||||
* @return the position on the graph where this Connectable is located
|
||||
*/
|
||||
Position getPosition();
|
||||
|
||||
/**
|
||||
* Updates this component's position on the graph
|
||||
*
|
||||
* @param position new position
|
||||
*/
|
||||
void setPosition(Position position);
|
||||
|
||||
/**
|
||||
* @return the name of this Connectable
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.connectable;
|
||||
|
||||
/**
|
||||
* Represents a component that can be positioned with X,Y coordinates on the canvas.
|
||||
*/
|
||||
public interface Positionable {
|
||||
|
||||
/**
|
||||
* @return the position on the graph where this Connectable is located
|
||||
*/
|
||||
Position getPosition();
|
||||
|
||||
/**
|
||||
* Updates this component's position on the graph
|
||||
*
|
||||
* @param position new position
|
||||
*/
|
||||
void setPosition(Position position);
|
||||
}
|
|
@ -16,21 +16,17 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.label;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.connectable.Positionable;
|
||||
import org.apache.nifi.connectable.Size;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
|
||||
public interface Label extends Authorizable {
|
||||
import java.util.Map;
|
||||
|
||||
public interface Label extends Authorizable, Positionable {
|
||||
|
||||
String getIdentifier();
|
||||
|
||||
Position getPosition();
|
||||
|
||||
void setPosition(Position position);
|
||||
|
||||
Map<String, String> getStyle();
|
||||
|
||||
void setStyle(Map<String, String> style);
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.nifi.connectable.Connectable;
|
|||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.connectable.Funnel;
|
||||
import org.apache.nifi.connectable.Port;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.connectable.Positionable;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.ScheduledState;
|
||||
import org.apache.nifi.controller.Snippet;
|
||||
|
@ -45,7 +45,7 @@ import java.util.function.Predicate;
|
|||
* <p>
|
||||
* MUST BE THREAD-SAFE</p>
|
||||
*/
|
||||
public interface ProcessGroup extends Authorizable {
|
||||
public interface ProcessGroup extends Authorizable, Positionable {
|
||||
|
||||
/**
|
||||
* Predicate for filtering schedulable Processors.
|
||||
|
@ -97,17 +97,6 @@ public interface ProcessGroup extends Authorizable {
|
|||
*/
|
||||
void setName(String name);
|
||||
|
||||
/**
|
||||
* Updates the position of where this ProcessGroup is located in the graph
|
||||
* @param position new position
|
||||
*/
|
||||
void setPosition(Position position);
|
||||
|
||||
/**
|
||||
* @return the position of where this ProcessGroup is located in the graph
|
||||
*/
|
||||
Position getPosition();
|
||||
|
||||
/**
|
||||
* @return the user-set comments about this ProcessGroup, or
|
||||
* <code>null</code> if no comments have been set
|
||||
|
@ -739,6 +728,12 @@ public interface ProcessGroup extends Authorizable {
|
|||
*/
|
||||
Connectable findConnectable(String identifier);
|
||||
|
||||
/**
|
||||
* @return a Set of all {@link org.apache.nifi.connectable.Positionable}s contained within this
|
||||
* {@link ProcessGroup} and any child {@link ProcessGroup}s
|
||||
*/
|
||||
Set<Positionable> findAllPositionables();
|
||||
|
||||
/**
|
||||
* Moves all of the components whose ID's are specified within the given
|
||||
* {@link Snippet} from this ProcessGroup into the given destination
|
||||
|
|
|
@ -16,18 +16,18 @@
|
|||
*/
|
||||
package org.apache.nifi.groups;
|
||||
|
||||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.connectable.Positionable;
|
||||
import org.apache.nifi.controller.exception.CommunicationsException;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.remote.RemoteGroupPort;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Date;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.controller.exception.CommunicationsException;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.remote.RemoteGroupPort;
|
||||
|
||||
public interface RemoteProcessGroup extends Authorizable {
|
||||
public interface RemoteProcessGroup extends Authorizable, Positionable {
|
||||
|
||||
String getIdentifier();
|
||||
|
||||
|
@ -37,10 +37,6 @@ public interface RemoteProcessGroup extends Authorizable {
|
|||
|
||||
void setProcessGroup(ProcessGroup group);
|
||||
|
||||
void setPosition(Position position);
|
||||
|
||||
Position getPosition();
|
||||
|
||||
String getComments();
|
||||
|
||||
void setComments(String comments);
|
||||
|
|
|
@ -13,7 +13,8 @@
|
|||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -158,6 +159,17 @@
|
|||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.spockframework</groupId>
|
||||
<artifactId>spock-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cglib</groupId>
|
||||
<artifactId>cglib-nodep</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller;
|
||||
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.connectable.Positionable;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Provides utility to scale the positions of {@link Positionable}s and bend points of {@link Connection}s
|
||||
* by a given factor.
|
||||
*/
|
||||
class PositionScaler {
|
||||
|
||||
/**
|
||||
* Scales the positions of all {@link Position}s in the given {@link ProcessGroup} by
|
||||
* the provided factor. This method replaces all {@link Position}s in each {@link Positionable}
|
||||
* in the {@link ProcessGroup} with a new scaled {@link Position}.
|
||||
*
|
||||
* @param processGroup containing the {@link Positionable}s to be scaled
|
||||
* @param factorX used to scale a {@link Positionable}'s X-coordinate position
|
||||
* @param factorY used to scale a {@link Positionable}'s Y-coordinate position
|
||||
*/
|
||||
public static void scale(ProcessGroup processGroup, double factorX, double factorY) {
|
||||
processGroup.findAllPositionables().stream().forEach(p -> scale(p, factorX, factorY));
|
||||
Map<Connection, List<Position>> bendPointsByConnection =
|
||||
processGroup.findAllConnections().stream().collect(Collectors.toMap(connection -> connection, Connection::getBendPoints));
|
||||
bendPointsByConnection.entrySet().stream()
|
||||
.forEach(connectionListEntry -> connectionListEntry.getKey().setBendPoints(connectionListEntry.getValue().stream()
|
||||
.map(p -> scalePosition(p, factorX, factorY)).collect(Collectors.toList())));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Scales the {@link Position} of the given {@link Positionable} by the provided factor. This method
|
||||
* replaces the {@link Position} in the {@link Positionable} with a new scaled {@link Position}.
|
||||
*
|
||||
* @param positionable containing a {@link Position} to scale
|
||||
* @param factorX used to scale a {@link Positionable}'s X-coordinate position
|
||||
* @param factorY used to scale a {@link Positionable}'s Y-coordinate position
|
||||
*/
|
||||
public static void scale(Positionable positionable, double factorX, double factorY) {
|
||||
final Position startingPosition = positionable.getPosition();
|
||||
final Position scaledPosition = scalePosition(startingPosition, factorX, factorY);
|
||||
positionable.setPosition(scaledPosition);
|
||||
}
|
||||
|
||||
private static Position scalePosition(Position position, double factorX, double factorY) {
|
||||
return new Position(position.getX() * factorX,
|
||||
position.getY() * factorY);
|
||||
}
|
||||
|
||||
}
|
|
@ -277,6 +277,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
scaleRootGroup(rootGroup, encodingVersion);
|
||||
|
||||
final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks");
|
||||
if (reportingTasksElement != null) {
|
||||
final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
|
||||
|
@ -311,6 +313,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
void scaleRootGroup(ProcessGroup rootGroup, FlowEncodingVersion encodingVersion) {
|
||||
if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) {
|
||||
// Calculate new Positions if the encoding version of the flow is older than 1.0.
|
||||
PositionScaler.scale(rootGroup, 1.5, 1.34);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isEmpty(final ProcessGroupDTO dto) {
|
||||
if (dto == null) {
|
||||
return true;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.groups;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
|
@ -36,6 +37,7 @@ import org.apache.nifi.connectable.Funnel;
|
|||
import org.apache.nifi.connectable.LocalPort;
|
||||
import org.apache.nifi.connectable.Port;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.connectable.Positionable;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
|
@ -72,6 +74,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
|
@ -2140,6 +2143,18 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Positionable> findAllPositionables() {
|
||||
Set<Positionable> positionables = Sets.newHashSet();
|
||||
positionables.addAll(findAllConnectables(this, true));
|
||||
List<ProcessGroup> allProcessGroups = findAllProcessGroups();
|
||||
positionables.addAll(allProcessGroups);
|
||||
positionables.addAll(allProcessGroups.stream().flatMap(processGroup -> processGroup.findAllPositionables().stream()).collect(Collectors.toSet()));
|
||||
positionables.addAll(findAllRemoteProcessGroups());
|
||||
positionables.addAll(findAllLabels());
|
||||
return positionables;
|
||||
}
|
||||
|
||||
private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) {
|
||||
final Set<Connectable> set = new HashSet<>();
|
||||
set.addAll(group.getInputPorts());
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
-->
|
||||
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" version="1.0">
|
||||
<xs:element name="flowController" type="FlowControllerType" />
|
||||
|
||||
|
||||
<xs:complexType name="FlowControllerType">
|
||||
<xs:sequence>
|
||||
<xs:choice>
|
||||
|
@ -35,6 +35,7 @@
|
|||
|
||||
<xs:element name="reportingTasks" type="ReportingTasksType" minOccurs="0" maxOccurs="1" />
|
||||
</xs:sequence>
|
||||
<xs:attribute name="encoding-version" type="xs:string"/>
|
||||
</xs:complexType>
|
||||
|
||||
<!-- the processor "id" is a key that should be valid within each flowController-->
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller
|
||||
|
||||
import org.apache.nifi.connectable.Connectable
|
||||
import org.apache.nifi.connectable.Position
|
||||
import org.apache.nifi.connectable.Positionable
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Unroll
|
||||
|
||||
class PositionScalerSpec extends Specification {
|
||||
|
||||
@Unroll
|
||||
def "scale #positionableType.getSimpleName()"() {
|
||||
given:
|
||||
def positionable = Mock positionableType
|
||||
|
||||
when:
|
||||
PositionScaler.scale positionable, factorX, factorY
|
||||
|
||||
then:
|
||||
1 * positionable.position >> new Position(originalX, originalY)
|
||||
1 * positionable.setPosition(_) >> { Position p ->
|
||||
assert p.x == newX
|
||||
assert p.y == newY
|
||||
}
|
||||
|
||||
where:
|
||||
positionableType | originalX | originalY | factorX | factorY | newX | newY
|
||||
Connectable | 10 | 10 | 1.5 | 1.5 | 15 | 15
|
||||
Positionable | -10 | -10 | 1.5 | 1.5 | -15 | -15
|
||||
}
|
||||
|
||||
//TODO Test scaling of a ProcessGroup
|
||||
}
|
|
@ -0,0 +1,244 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller
|
||||
|
||||
import groovy.xml.XmlUtil
|
||||
import org.apache.nifi.cluster.protocol.DataFlow
|
||||
import org.apache.nifi.connectable.*
|
||||
import org.apache.nifi.controller.label.Label
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue
|
||||
import org.apache.nifi.groups.ProcessGroup
|
||||
import org.apache.nifi.groups.RemoteProcessGroup
|
||||
import org.apache.nifi.processor.Relationship
|
||||
import org.apache.nifi.reporting.BulletinRepository
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Unroll
|
||||
|
||||
class StandardFlowSynchronizerSpec extends Specification {
|
||||
|
||||
def setupSpec() {
|
||||
System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"
|
||||
}
|
||||
|
||||
def teardownSpec() {
|
||||
System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "scaling of #filename with encoding version \"#flowEncodingVersion\""() {
|
||||
given: "a StandardFlowSynchronizer with mocked collaborators"
|
||||
def controller = Mock FlowController
|
||||
def proposedFlow = Mock DataFlow
|
||||
def snippetManager = Mock SnippetManager
|
||||
def bulletinRepository = Mock BulletinRepository
|
||||
def flowFileQueue = Mock FlowFileQueue
|
||||
def flowFile = new File(StandardFlowSynchronizerSpec.getResource(filename).toURI())
|
||||
def flowControllerXml = new XmlSlurper().parse(flowFile)
|
||||
def Map<String, Position> originalPositionablePositionsById = flowControllerXml.rootGroup.'**'
|
||||
.findAll { !it.name().equals('connection') && it.id.size() == 1 && it.position.size() == 1 }
|
||||
.collectEntries { [it.id.text(), new Position(it.position.@x.toDouble(), it.position.@y.toDouble())] }
|
||||
def Map<String, List<Position>> originalBendPointsByConnectionId = flowControllerXml.rootGroup.'**'
|
||||
.findAll { it.name().equals('connection') && it.bendPoints.size() > 0 }
|
||||
.collectEntries { [it.id.text(), it.bendPoints.children().collect { new Position(it.@x.toDouble(), it.@y.toDouble()) }] }
|
||||
flowControllerXml.@'encoding-version' = flowEncodingVersion
|
||||
def testFlowBytes = XmlUtil.serialize(flowControllerXml).bytes
|
||||
def Map<String, Position> positionablePositionsById = [:]
|
||||
def Map<String, Positionable> positionableMocksById = [:]
|
||||
def Map<String, Connection> connectionMocksById = [:]
|
||||
def Map<String, List<Position>> bendPointPositionsByConnectionId = [:]
|
||||
// the unit under test
|
||||
def flowSynchronizer = new StandardFlowSynchronizer(null)
|
||||
|
||||
when: "the flow is synchronized with the current state of the controller"
|
||||
flowSynchronizer.sync controller, proposedFlow, null
|
||||
|
||||
then: "establish interactions for the mocked collaborators of StandardFlowSynchronizer to store the ending positions of components"
|
||||
1 * controller.isInitialized() >> false
|
||||
_ * controller.rootGroupId >> flowControllerXml.rootGroup.id.text()
|
||||
_ * controller.getGroup(_) >> { String id -> positionableMocksById.get(id) }
|
||||
_ * controller.snippetManager >> snippetManager
|
||||
_ * controller.bulletinRepository >> bulletinRepository
|
||||
_ * controller./set.*/(*_)
|
||||
_ * controller.createProcessGroup(_) >> { String pgId ->
|
||||
def processGroup = Mock(ProcessGroup)
|
||||
_ * processGroup.getIdentifier() >> pgId
|
||||
_ * processGroup.getPosition() >> { positionablePositionsById.get(pgId) }
|
||||
_ * processGroup.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put pgId, pos
|
||||
}
|
||||
_ * processGroup./(add|set).*/(*_)
|
||||
_ * processGroup.isEmpty() >> true
|
||||
_ * processGroup.isRootGroup() >> { pgId == flowControllerXml.rootGroup.id }
|
||||
_ * processGroup.getConnectable(_) >> { String connId -> positionableMocksById.get(connId) }
|
||||
_ * processGroup.findAllPositionables() >> {
|
||||
def foundProcessGroup = flowControllerXml.rootGroup.'**'.find { it.id == pgId }
|
||||
def idsUnderPg = foundProcessGroup.'**'.findAll { it.name() == 'id' }.collect { it.text() }
|
||||
positionableMocksById.entrySet().collect {
|
||||
if (idsUnderPg.contains(it.key)) {
|
||||
it.value
|
||||
}
|
||||
}
|
||||
}
|
||||
_ * processGroup.findAllConnections() >> {
|
||||
def foundProcessGroup = flowControllerXml.rootGroup.'**'.find { it.id == pgId }
|
||||
def foundConnections = foundProcessGroup.'**'.findAll { it.name() == 'connection' }.collect { it.id.text() }
|
||||
connectionMocksById.entrySet().collect {
|
||||
if (foundConnections.contains(it.key)) {
|
||||
it.value
|
||||
}
|
||||
}
|
||||
}
|
||||
positionableMocksById.put(pgId, processGroup)
|
||||
return processGroup
|
||||
}
|
||||
|
||||
_ * controller.createProcessor(_, _, _) >> { String type, String id, boolean firstTimeAdded ->
|
||||
def processor = Mock(ProcessorNode)
|
||||
_ * processor.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * processor.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * processor./(add|set).*/(*_)
|
||||
_ * processor.getIdentifier() >> id
|
||||
_ * processor.getRelationship(_) >> { String n -> new Relationship.Builder().name(n).build() }
|
||||
positionableMocksById.put(id, processor)
|
||||
return processor
|
||||
}
|
||||
_ * controller.createFunnel(_) >> { String id ->
|
||||
def funnel = Mock(Funnel)
|
||||
_ * funnel.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * funnel.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * funnel./(add|set).*/(*_)
|
||||
positionableMocksById.put id, funnel
|
||||
return funnel
|
||||
}
|
||||
_ * controller.createLabel(_, _) >> { String id, String text ->
|
||||
def l = Mock(Label)
|
||||
_ * l.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * l.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * l./(add|set).*/(*_)
|
||||
positionableMocksById.put(id, l)
|
||||
return l
|
||||
}
|
||||
_ * controller./create.*Port/(_, _) >> { String id, String text ->
|
||||
def port = Mock(Port)
|
||||
_ * port.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * port.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * port./(add|set).*/(*_)
|
||||
positionableMocksById.put(id, port)
|
||||
return port
|
||||
}
|
||||
_ * controller.createRemoteProcessGroup(_, _) >> { String id, String uri ->
|
||||
def rpg = Mock(RemoteProcessGroup)
|
||||
_ * rpg.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * rpg.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * rpg./(add|set).*/(*_)
|
||||
_ * rpg.getOutputPort(_) >> { String rpgId -> positionableMocksById.get(rpgId) }
|
||||
_ * rpg.getIdentifier() >> id
|
||||
positionableMocksById.put(id, rpg)
|
||||
return rpg
|
||||
}
|
||||
_ * controller.createConnection(_, _, _, _, _) >> { String id, String name, Connectable source, Connectable destination, Collection<String> relationshipNames ->
|
||||
def connection = Mock(Connection)
|
||||
_ * connection.getIdentifier() >> id
|
||||
_ * connection.getBendPoints() >> {
|
||||
def bendpoints = bendPointPositionsByConnectionId.get(id)
|
||||
return bendpoints
|
||||
}
|
||||
_ * connection.setBendPoints(_) >> {
|
||||
// There seems to be a bug in Spock method matching where a list of arguments to a method
|
||||
// is being coerced into an Arrays$ArrayList with the actual list of bend points as an
|
||||
// ArrayList in the 0th element.
|
||||
// Need to keep an eye on this...
|
||||
bendPointPositionsByConnectionId.put id, it[0]
|
||||
}
|
||||
_ * connection./set.*/(*_)
|
||||
_ * connection.flowFileQueue >> flowFileQueue
|
||||
connectionMocksById.put(id, connection)
|
||||
return connection
|
||||
}
|
||||
_ * controller.startProcessor(*_)
|
||||
_ * controller.startConnectable(_)
|
||||
_ * controller.enableControllerServices(_)
|
||||
_ * snippetManager.export() >> {
|
||||
[] as byte[]
|
||||
}
|
||||
_ * snippetManager.clear()
|
||||
1 * proposedFlow.flow >> testFlowBytes
|
||||
_ * proposedFlow.snippets >> {
|
||||
[] as byte[]
|
||||
}
|
||||
_ * flowFileQueue./set.*/(*_)
|
||||
_ * _.hashCode() >> 1
|
||||
0 * _ // no other mock calls allowed
|
||||
|
||||
then: "verify that the flow was scaled properly"
|
||||
originalPositionablePositionsById.entrySet().forEach { entry ->
|
||||
assert positionablePositionsById.containsKey(entry.key)
|
||||
def originalPosition = entry.value
|
||||
def position = positionablePositionsById.get(entry.key)
|
||||
compareOriginalPointToScaledPoint(originalPosition, position, isSyncedPositionGreater)
|
||||
}
|
||||
originalBendPointsByConnectionId.entrySet().forEach { entry ->
|
||||
assert bendPointPositionsByConnectionId.containsKey(entry.key)
|
||||
def originalBendPoints = entry.value
|
||||
def sortedBendPoints = bendPointPositionsByConnectionId.get(entry.key).sort { it.x }
|
||||
def sortedOriginalBendPoints = originalBendPoints.sort { it.x }
|
||||
assert sortedOriginalBendPoints.size() == sortedBendPoints.size()
|
||||
[sortedOriginalBendPoints, sortedBendPoints].transpose().forEach { Position originalPosition, Position position ->
|
||||
compareOriginalPointToScaledPoint(originalPosition, position, isSyncedPositionGreater)
|
||||
}
|
||||
}
|
||||
|
||||
where: "the each flowfile and flow encoding version is run through the StandardFlowSynchronizer"
|
||||
filename | flowEncodingVersion | isSyncedPositionGreater
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | null | true
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | '0.7' | true
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | '1.0' | false
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | '99.0' | false
|
||||
}
|
||||
|
||||
private void compareOriginalPointToScaledPoint(Position originalPosition, Position position, boolean isSyncedPositionGreater) {
|
||||
if (originalPosition.x == 0) {
|
||||
assert position.x == 0
|
||||
}
|
||||
if (originalPosition.y == 0) {
|
||||
assert position.y == 0
|
||||
}
|
||||
if (originalPosition.x > 0) {
|
||||
assert isSyncedPositionGreater == position.x > originalPosition.x
|
||||
}
|
||||
if (originalPosition.y > 0) {
|
||||
assert isSyncedPositionGreater == position.y > originalPosition.y
|
||||
}
|
||||
if (originalPosition.x < 0) {
|
||||
assert isSyncedPositionGreater == position.x < originalPosition.x
|
||||
}
|
||||
if (originalPosition.y < 0) {
|
||||
assert isSyncedPositionGreater == position.y < originalPosition.y
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,12 +17,6 @@
|
|||
|
||||
package org.apache.nifi.controller.service.mock;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.authorization.Resource;
|
||||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
|
@ -30,6 +24,7 @@ import org.apache.nifi.connectable.Connection;
|
|||
import org.apache.nifi.connectable.Funnel;
|
||||
import org.apache.nifi.connectable.Port;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.connectable.Positionable;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.Snippet;
|
||||
import org.apache.nifi.controller.Template;
|
||||
|
@ -39,6 +34,12 @@ import org.apache.nifi.groups.ProcessGroup;
|
|||
import org.apache.nifi.groups.ProcessGroupCounts;
|
||||
import org.apache.nifi.groups.RemoteProcessGroup;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class MockProcessGroup implements ProcessGroup {
|
||||
private Map<String, ControllerServiceNode> serviceMap = new HashMap<>();
|
||||
|
||||
|
@ -267,6 +268,11 @@ public class MockProcessGroup implements ProcessGroup {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Positionable> findAllPositionables() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connectable getConnectable(String id) {
|
||||
return null;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
19
pom.xml
19
pom.xml
|
@ -1222,6 +1222,12 @@ language governing permissions and limitations under the License. -->
|
|||
<version>2.4.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.spockframework</groupId>
|
||||
<artifactId>spock-core</artifactId>
|
||||
<version>1.0-groovy-2.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-all</artifactId>
|
||||
|
@ -1291,9 +1297,22 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.18</version>
|
||||
<configuration>
|
||||
<includes>
|
||||
<include>**/*Test.class</include>
|
||||
<include>**/Test*.class</include>
|
||||
<include>**/*Spec.class</include>
|
||||
</includes>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
<argLine combine.children="append">-Xmx1G -Djava.net.preferIPv4Stack=true</argLine>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<!-- Force surefire to use JUnit -->
|
||||
<groupId>org.apache.maven.surefire</groupId>
|
||||
<artifactId>surefire-junit4</artifactId>
|
||||
<version>2.18</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
|
|
Loading…
Reference in New Issue