NIFI-2933 Remote input/output ports at any PG

Specify remote access at port creation.
Incorporated comments, and finished refactoring.
Renamed RootGroupPort to PublicPort.
Fix error message for creating a connection from a child PG having only PublicPorts.
Enhanced ProcessGroup instances rendered in the parent ProcessGroup
Loosen Port move check, allow moving public port between PG.
Show 'Remote NiFi Instance' info on Connection dialogs
Make labels narrative.
'Within Remote Group'.
Fixed DTO (de)serialization.
Return null only if all values are null.

This closes #3351.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Koji Kawamura 2018-10-31 11:56:27 +09:00 committed by Mark Payne
parent a97766d62f
commit 8a50cb10b2
66 changed files with 1501 additions and 710 deletions

View File

@ -1238,12 +1238,14 @@ value entered is not valid, the Remote Process Group will not be valid and will
[[Site-to-Site_Input_Port]]
*Input Port*: In order to allow another NiFi instance to push data to your local instance, you can simply drag an <<input_port,Input Port>> onto the Root Process Group of your canvas. After entering a name for the port, it will be added to your flow. You can now right-click on the Input Port and choose Configure in order to adjust the name and the number of concurrent tasks that are used for the port.
Also, you can create an Input Port for Site-to-Site in child Process Groups by selecting "Receive Data From" as "Site-to-Site connections".
If Site-to-Site is configured to run securely, you will need to manage the port's "receive data via site-to-site" component access policy. Only those users who have been added to the policy will be able to communicate with the port.
[[Site-to-Site_Output_Port]]
*Output Port*: Similar to an Input Port, a DataFlow Manager may choose to add an <<output_port,Output Port>> to the Root Process Group. The Output Port allows an authorized NiFi instance to remotely connect to your instance and pull data from the Output Port. Configuring the Output Port and managing the port's access policies will again allow the DFM to control how many concurrent tasks are allowed, as well as which users are authorized to pull data from the instance being configured.
*Output Port*: Similar to an Input Port, a DataFlow Manager may choose to add an <<output_port,Output Port>> to the Root Process Group. Or an Output Port in child Process Groups for Site-to-Site connections. The Output Port allows an authorized NiFi instance to remotely connect to your instance and pull data from the Output Port. Configuring the Output Port and managing the port's access policies will again allow the DFM to control how many concurrent tasks are allowed, as well as which users are authorized to pull data from the instance being configured.
In addition to other instances of NiFi, some other applications may use a Site-to-Site client in order to push data to or receive data from a NiFi instance. For example, NiFi provides an Apache Storm spout and an Apache Spark Receiver that are able to pull data from NiFi's Root Group Output Ports.
In addition to other instances of NiFi, some other applications may use a Site-to-Site client in order to push data to or receive data from a NiFi instance. For example, NiFi provides an Apache Storm spout and an Apache Spark Receiver that are able to pull data from NiFi's Root Group Output Ports, and Output Ports in child Process Groups for Site-to-Site connections.
For information on how to enable and configure Site-to-Site on a NiFi instance, see the
link:administration-guide.html#site_to_site_properties[Site-to-Site Properties] section of the

View File

@ -36,6 +36,7 @@ public class PortDTO extends ComponentDTO {
private Integer concurrentlySchedulableTaskCount;
private Set<String> userAccessControl;
private Set<String> groupAccessControl;
private Boolean allowRemoteAccess;
private Collection<String> validationErrors;
@ -114,10 +115,10 @@ public class PortDTO extends ComponentDTO {
}
/**
* @return whether this port has incoming or outgoing connections to a remote NiFi. This is only applicable when the port is running on the root group
* @return whether this port has incoming or outgoing connections to a remote NiFi. This is only applicable when the port is allowed to be accessed remotely.
*/
@ApiModelProperty(
value = "Whether the port has incoming or output connections to a remote NiFi. This is only applicable when the port is running in the root group."
value = "Whether the port has incoming or output connections to a remote NiFi. This is only applicable when the port is allowed to be accessed remotely."
)
public Boolean isTransmitting() {
return transmitting;
@ -171,4 +172,18 @@ public class PortDTO extends ComponentDTO {
this.validationErrors = validationErrors;
}
/**
* @return whether this port can be accessed remotely via Site-to-Site protocol.
*/
@ApiModelProperty(
value = "Whether this port can be accessed remotely via Site-to-Site protocol."
)
public Boolean getAllowRemoteAccess() {
return allowRemoteAccess;
}
public void setAllowRemoteAccess(Boolean allowRemoteAccess) {
this.allowRemoteAccess = allowRemoteAccess;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.web.api.dto;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.util.NumberUtil;
import javax.xml.bind.annotation.XmlType;
import java.util.Map;
@ -45,8 +46,11 @@ public class ProcessGroupDTO extends ComponentDTO {
private Integer locallyModifiedAndStaleCount;
private Integer syncFailureCount;
private Integer inputPortCount;
private Integer outputPortCount;
private Integer localInputPortCount;
private Integer localOutputPortCount;
private Integer publicInputPortCount;
private Integer publicOutputPortCount;
private FlowSnippetDTO contents;
@ -102,14 +106,46 @@ public class ProcessGroupDTO extends ComponentDTO {
* @return number of input ports contained in this process group
*/
@ApiModelProperty(
value = "The number of input ports in the process group."
value = "The number of input ports in the process group.",
readOnly = true
)
public Integer getInputPortCount() {
return inputPortCount;
return NumberUtil.sumNullableIntegers(localInputPortCount, publicInputPortCount);
}
public void setInputPortCount(Integer inputPortCount) {
this.inputPortCount = inputPortCount;
// Without having setter for 'inputPortCount', deserialization fails.
// If we use Jackson annotation @JsonIgnoreProperties, this empty setter is not needed.
// Ex. @JsonIgnoreProperties(value={"inputPortCount", "outputPortCount"}, allowGetters=true)
// But in order to minimize dependencies, we don't use Jackson annotations in this module.
}
/**
* @return number of local input ports contained in this process group
*/
@ApiModelProperty(
value = "The number of local input ports in the process group."
)
public Integer getLocalInputPortCount() {
return localInputPortCount;
}
public void setLocalInputPortCount(Integer localInputPortCount) {
this.localInputPortCount = localInputPortCount;
}
/**
* @return number of public input ports contained in this process group
*/
@ApiModelProperty(
value = "The number of public input ports in the process group."
)
public Integer getPublicInputPortCount() {
return publicInputPortCount;
}
public void setPublicInputPortCount(Integer publicInputPortCount) {
this.publicInputPortCount = publicInputPortCount;
}
/**
@ -130,14 +166,43 @@ public class ProcessGroupDTO extends ComponentDTO {
* @return number of output ports in this process group
*/
@ApiModelProperty(
value = "The number of output ports in the process group."
value = "The number of output ports in the process group.",
readOnly = true
)
public Integer getOutputPortCount() {
return outputPortCount;
return NumberUtil.sumNullableIntegers(localOutputPortCount, publicOutputPortCount);
}
public void setOutputPortCount(Integer outputPortCount) {
this.outputPortCount = outputPortCount;
// See setInputPortCount for the reason why this is needed.
}
/**
* @return number of local output ports in this process group
*/
@ApiModelProperty(
value = "The number of local output ports in the process group."
)
public Integer getLocalOutputPortCount() {
return localOutputPortCount;
}
public void setLocalOutputPortCount(Integer localOutputPortCount) {
this.localOutputPortCount = localOutputPortCount;
}
/**
* @return number of public output ports in this process group
*/
@ApiModelProperty(
value = "The number of public output ports in the process group."
)
public Integer getPublicOutputPortCount() {
return publicOutputPortCount;
}
public void setPublicOutputPortCount(Integer publicOutputPortCount) {
this.publicOutputPortCount = publicOutputPortCount;
}
/**

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.util;
/**
* Utility class for numbers.
*/
public class NumberUtil {
/**
* Calculate sum of Integers those can be null.
* This method can be used to avoid getting NullPointerException when a null Integer being auto-boxed into an int.
* @param values Integers to add
* @return the sum of given values or null if all values are null
*/
public static Integer sumNullableIntegers(Integer ... values) {
int sum = 0;
int count = 0;
for (Integer value : values) {
if (value == null) {
continue;
}
sum += value;
count++;
}
return count == 0 ? null : sum;
}
}

View File

@ -33,6 +33,7 @@ public class PortEntity extends ComponentEntity implements Permissible<PortDTO>,
private PortStatusDTO status;
private String portType;
private PermissionsDTO operatePermissions;
private Boolean allowRemoteAccess;
/**
* @return input PortDTO that are being serialized
@ -84,4 +85,18 @@ public class PortEntity extends ComponentEntity implements Permissible<PortDTO>,
public void setOperatePermissions(PermissionsDTO permissions) {
this.operatePermissions = permissions;
}
/**
* @return whether this port can be accessed remotely via Site-to-Site protocol.
*/
@ApiModelProperty(
value = "Whether this port can be accessed remotely via Site-to-Site protocol."
)
public Boolean isAllowRemoteAccess() {
return allowRemoteAccess;
}
public void setAllowRemoteAccess(Boolean allowRemoteAccess) {
this.allowRemoteAccess = allowRemoteAccess;
}
}

View File

@ -20,6 +20,7 @@ import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.util.NumberUtil;
import javax.xml.bind.annotation.XmlRootElement;
@ -48,8 +49,10 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
private Integer locallyModifiedAndStaleCount;
private Integer syncFailureCount;
private Integer inputPortCount;
private Integer outputPortCount;
private Integer localInputPortCount;
private Integer localOutputPortCount;
private Integer publicInputPortCount;
private Integer publicOutputPortCount;
/**
* The ProcessGroupDTO that is being serialized.
@ -84,14 +87,43 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
* @return number of input ports contained in this process group
*/
@ApiModelProperty(
value = "The number of input ports in the process group."
value = "The number of input ports in the process group.",
readOnly = true
)
public Integer getInputPortCount() {
return inputPortCount;
return NumberUtil.sumNullableIntegers(localInputPortCount, publicInputPortCount);
}
public void setInputPortCount(Integer inputPortCount) {
this.inputPortCount = inputPortCount;
// See ProcessGroupDTO.setInputPortCount for the reason why this is needed.
}
/**
* @return number of local input ports contained in this process group
*/
@ApiModelProperty(
value = "The number of local input ports in the process group."
)
public Integer getLocalInputPortCount() {
return localInputPortCount;
}
public void setLocalInputPortCount(Integer localInputPortCount) {
this.localInputPortCount = localInputPortCount;
}
/**
* @return number of public input ports contained in this process group
*/
@ApiModelProperty(
value = "The number of public input ports in the process group."
)
public Integer getPublicInputPortCount() {
return publicInputPortCount;
}
public void setPublicInputPortCount(Integer publicInputPortCount) {
this.publicInputPortCount = publicInputPortCount;
}
/**
@ -112,14 +144,43 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
* @return number of output ports in this process group
*/
@ApiModelProperty(
value = "The number of output ports in the process group."
value = "The number of output ports in the process group.",
readOnly = true
)
public Integer getOutputPortCount() {
return outputPortCount;
return NumberUtil.sumNullableIntegers(localOutputPortCount, publicOutputPortCount);
}
public void setOutputPortCount(Integer outputPortCount) {
this.outputPortCount = outputPortCount;
// See ProcessGroupDTO.setInputPortCount for the reason why this is needed.
}
/**
* @return number of local output ports in this process group
*/
@ApiModelProperty(
value = "The number of local output ports in the process group."
)
public Integer getLocalOutputPortCount() {
return localOutputPortCount;
}
public void setLocalOutputPortCount(Integer localOutputPortCount) {
this.localOutputPortCount = localOutputPortCount;
}
/**
* @return number of public output ports in this process group
*/
@ApiModelProperty(
value = "The number of public output ports in the process group."
)
public Integer getPublicOutputPortCount() {
return publicOutputPortCount;
}
public void setPublicOutputPortCount(Integer publicOutputPortCount) {
this.publicOutputPortCount = publicOutputPortCount;
}
/**

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestProcessGroupDTO {
@Test
public void testGetInputPortCount() {
final ProcessGroupDTO dto = new ProcessGroupDTO();
assertEquals(null, dto.getInputPortCount());
dto.setLocalInputPortCount(3);
dto.setPublicInputPortCount(4);
assertEquals(Integer.valueOf(7), dto.getInputPortCount());
assertEquals(Integer.valueOf(3), dto.getLocalInputPortCount());
assertEquals(Integer.valueOf(4), dto.getPublicInputPortCount());
}
@Test
public void testGetOutputPortCount() {
final ProcessGroupDTO dto = new ProcessGroupDTO();
assertEquals(null, dto.getOutputPortCount());
dto.setLocalOutputPortCount(2);
dto.setPublicOutputPortCount(3);
assertEquals(Integer.valueOf(5), dto.getOutputPortCount());
assertEquals(Integer.valueOf(2), dto.getLocalOutputPortCount());
assertEquals(Integer.valueOf(3), dto.getPublicOutputPortCount());
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestProcessGroupEntity {
@Test
public void testGetInputPortCount() {
final ProcessGroupEntity entity = new ProcessGroupEntity();
assertEquals(null, entity.getInputPortCount());
entity.setLocalInputPortCount(3);
entity.setPublicInputPortCount(4);
assertEquals(Integer.valueOf(7), entity.getInputPortCount());
assertEquals(Integer.valueOf(3), entity.getLocalInputPortCount());
assertEquals(Integer.valueOf(4), entity.getPublicInputPortCount());
}
@Test
public void testGetOutputPortCount() {
final ProcessGroupEntity entity = new ProcessGroupEntity();
assertEquals(null, entity.getOutputPortCount());
entity.setLocalOutputPortCount(2);
entity.setPublicOutputPortCount(3);
assertEquals(Integer.valueOf(5), entity.getOutputPortCount());
assertEquals(Integer.valueOf(2), entity.getLocalOutputPortCount());
assertEquals(Integer.valueOf(3), entity.getPublicOutputPortCount());
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.util;
import org.junit.Test;
import static org.apache.nifi.web.api.dto.util.NumberUtil.sumNullableIntegers;
import static org.junit.Assert.assertEquals;
public class TestNumberUtil {
@Test
public void testSumNullableIntegers() {
assertEquals(Integer.valueOf(6), sumNullableIntegers(1, 2, 3));
assertEquals(Integer.valueOf(4), sumNullableIntegers(1, null, 3));
assertEquals(Integer.valueOf(0), sumNullableIntegers(-1, null, 1));
assertEquals(null, sumNullableIntegers(null, null));
}
}

View File

@ -61,12 +61,7 @@ public abstract class AbstractPort implements Port {
.name("")
.build();
public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final long MINIMUM_YIELD_MILLIS = 0L;
public static final long DEFAULT_YIELD_PERIOD = 10000L;
public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
private final List<Relationship> relationships;
@ -143,11 +138,11 @@ public abstract class AbstractPort implements Port {
final ProcessGroup parentGroup = this.processGroup.get();
if (getConnectableType() == ConnectableType.INPUT_PORT) {
if (parentGroup.getInputPortByName(name) != null) {
throw new IllegalStateException("The requested new port name is not available");
throw new IllegalStateException("A port with the same name already exists.");
}
} else if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
if (parentGroup.getOutputPortByName(name) != null) {
throw new IllegalStateException("The requested new port name is not available");
throw new IllegalStateException("A port with the same name already exists.");
}
}
@ -249,12 +244,9 @@ public abstract class AbstractPort implements Port {
try {
onTrigger(context, session);
session.commit();
} catch (final ProcessException e) {
session.rollback();
throw e;
} catch (final Throwable t) {
session.rollback();
throw new RuntimeException(t);
throw t;
}
}

View File

@ -48,7 +48,7 @@ public interface FlowManager {
* @throws NullPointerException if the ID or name is not unique
* @throws IllegalStateException if a Port already exists with the same id.
*/
Port createRemoteInputPort(String id, String name);
Port createPublicInputPort(String id, String name);
/**
* Creates a Port to use as an Output Port for transferring data via Site-to-Site communications
@ -59,7 +59,21 @@ public interface FlowManager {
* @throws NullPointerException if the ID or name is not unique
* @throws IllegalStateException if a Port already exists with the same id.
*/
Port createRemoteOutputPort(String id, String name);
Port createPublicOutputPort(String id, String name);
/**
* Gets the all remotely accessible InputPorts in any ProcessGroups.
*
* @return input ports
*/
Set<Port> getPublicInputPorts();
/**
* Gets the all remotely accessible OutputPorts in any ProcessGroups.
*
* @return output ports
*/
Set<Port> getPublicOutputPorts();
/**
* Creates a new Remote Process Group with the given ID that points to the given URI

View File

@ -18,13 +18,17 @@ package org.apache.nifi.groups;
public class ProcessGroupCounts {
private final int inputPortCount, outputPortCount, runningCount, stoppedCount, invalidCount, disabledCount, activeRemotePortCount, inactiveRemotePortCount,
private final int localInputPortCount, localOutputPortCount, publicInputPortCount, publicOutputPortCount,
runningCount, stoppedCount, invalidCount, disabledCount, activeRemotePortCount, inactiveRemotePortCount,
upToDateCount, locallyModifiedCount, staleCount, locallyModifiedAndStaleCount, syncFailureCount;
public ProcessGroupCounts(int inputPortCount, int outputPortCount, int runningCount, int stoppedCount, int invalidCount, int disabledCount, int activeRemotePortCount,
public ProcessGroupCounts(int localInputPortCount, int localOutputPortCount, int publicInputPortCount, int publicOutputPortCount,
int runningCount, int stoppedCount, int invalidCount, int disabledCount, int activeRemotePortCount,
int inactiveRemotePortCount, int upToDateCount, int locallyModifiedCount, int staleCount, int locallyModifiedAndStaleCount, int syncFailureCount) {
this.inputPortCount = inputPortCount;
this.outputPortCount = outputPortCount;
this.localInputPortCount = localInputPortCount;
this.localOutputPortCount = localOutputPortCount;
this.publicInputPortCount = publicInputPortCount;
this.publicOutputPortCount = publicOutputPortCount;
this.runningCount = runningCount;
this.stoppedCount = stoppedCount;
this.invalidCount = invalidCount;
@ -38,12 +42,20 @@ public class ProcessGroupCounts {
this.syncFailureCount = syncFailureCount;
}
public int getInputPortCount() {
return inputPortCount;
public int getLocalInputPortCount() {
return localInputPortCount;
}
public int getOutputPortCount() {
return outputPortCount;
public int getPublicInputPortCount() {
return publicInputPortCount;
}
public int getLocalOutputPortCount() {
return localOutputPortCount;
}
public int getPublicOutputPortCount() {
return publicOutputPortCount;
}
public int getRunningCount() {

View File

@ -25,7 +25,10 @@ import org.apache.nifi.remote.protocol.ServerProtocol;
import java.util.Set;
public interface RootGroupPort extends Port {
/**
* Represents an input or output port that can receive or transfer data via Site-to-Site protocol.
*/
public interface PublicPort extends Port {
boolean isTransmitting();
@ -67,9 +70,9 @@ public interface RootGroupPort extends Port {
* @param serverProtocol protocol
*
* @return the number of FlowFiles received
* @throws org.apache.nifi.remote.exception.NotAuthorizedException nae
* @throws org.apache.nifi.remote.exception.BadRequestException bre
* @throws org.apache.nifi.remote.exception.RequestExpiredException ree
* @throws NotAuthorizedException nae
* @throws BadRequestException bre
* @throws RequestExpiredException ree
*/
int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
@ -80,10 +83,12 @@ public interface RootGroupPort extends Port {
* @param serverProtocol protocol
*
* @return the number of FlowFiles transferred
* @throws org.apache.nifi.remote.exception.NotAuthorizedException nae
* @throws org.apache.nifi.remote.exception.BadRequestException bre
* @throws org.apache.nifi.remote.exception.RequestExpiredException ree
* @throws NotAuthorizedException nae
* @throws BadRequestException bre
* @throws RequestExpiredException ree
*/
int transferFlowFiles(Peer peer, ServerProtocol serverProtocol) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
TransferDirection getDirection();
}

View File

@ -23,7 +23,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformant;
@ -40,7 +40,7 @@ public interface ServerProtocol extends VersionedRemoteResource {
*/
void setRootProcessGroup(ProcessGroup rootGroup);
RootGroupPort getPort();
PublicPort getPort();
/**
* Optional operation. Sets the NodeInformant to use in this Protocol, if a

View File

@ -63,25 +63,40 @@ public class LocalPort extends AbstractPort {
maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0));
}
private boolean[] validateConnections() {
// LocalPort requires both in/out.
final boolean requireInput = true;
final boolean requireOutput = true;
return new boolean[]{requireInput, hasIncomingConnection(),
requireOutput, !getConnections(Relationship.ANONYMOUS).isEmpty()};
}
@Override
public boolean isValid() {
return !getConnections(Relationship.ANONYMOUS).isEmpty() && hasIncomingConnection();
final boolean[] connectionRequirements = validateConnections();
return (!connectionRequirements[0] || connectionRequirements[1])
&& (!connectionRequirements[2] || connectionRequirements[3]);
}
@Override
public Collection<ValidationResult> getValidationErrors() {
final boolean[] connectionRequirements = validateConnections();
final Collection<ValidationResult> validationErrors = new ArrayList<>();
if (getConnections(Relationship.ANONYMOUS).isEmpty()) {
// Incoming connections are required but not set
if (connectionRequirements[0] && !connectionRequirements[1]) {
validationErrors.add(new ValidationResult.Builder()
.explanation("Port has no outgoing connections")
.explanation("Port has no incoming connections")
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build());
}
if (!hasIncomingConnection()) {
// Outgoing connections are required but not set
if (connectionRequirements[2] && !connectionRequirements[3]) {
validationErrors.add(new ValidationResult.Builder()
.explanation("Port has no incoming connections")
.explanation("Port has no outgoing connections")
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build());

View File

@ -40,7 +40,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
@ -283,14 +283,13 @@ public class StandardFlowSnippet implements FlowSnippet {
//
for (final PortDTO portDTO : dto.getInputPorts()) {
final Port inputPort;
if (group.isRootGroup()) {
inputPort = flowManager.createRemoteInputPort(portDTO.getId(), portDTO.getName());
inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
if (group.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
inputPort = flowManager.createPublicInputPort(portDTO.getId(), portDTO.getName());
if (portDTO.getGroupAccessControl() != null) {
((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
((PublicPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
}
if (portDTO.getUserAccessControl() != null) {
((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl());
((PublicPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl());
}
} else {
inputPort = flowManager.createLocalInputPort(portDTO.getId(), portDTO.getName());
@ -301,20 +300,20 @@ public class StandardFlowSnippet implements FlowSnippet {
}
inputPort.setPosition(toPosition(portDTO.getPosition()));
inputPort.setProcessGroup(group);
inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
inputPort.setComments(portDTO.getComments());
group.addInputPort(inputPort);
}
for (final PortDTO portDTO : dto.getOutputPorts()) {
final Port outputPort;
if (group.isRootGroup()) {
outputPort = flowManager.createRemoteOutputPort(portDTO.getId(), portDTO.getName());
outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
if (group.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
outputPort = flowManager.createPublicOutputPort(portDTO.getId(), portDTO.getName());
if (portDTO.getGroupAccessControl() != null) {
((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
((PublicPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
}
if (portDTO.getUserAccessControl() != null) {
((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl());
((PublicPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl());
}
} else {
outputPort = flowManager.createLocalOutputPort(portDTO.getId(), portDTO.getName());
@ -325,6 +324,7 @@ public class StandardFlowSnippet implements FlowSnippet {
}
outputPort.setPosition(toPosition(portDTO.getPosition()));
outputPort.setProcessGroup(group);
outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
outputPort.setComments(portDTO.getComments());
group.addOutputPort(outputPort);
}

View File

@ -63,8 +63,8 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.ExecutionNode;
@ -1271,8 +1271,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final PortDTO portDTO = FlowFromDOMFactory.getPort(inputPortElement);
final Port port;
if (processGroup.isRootGroup()) {
port = flowManager.createRemoteInputPort(portDTO.getId(), portDTO.getName());
if (processGroup.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
port = flowManager.createPublicInputPort(portDTO.getId(), portDTO.getName());
} else {
port = flowManager.createLocalInputPort(portDTO.getId(), portDTO.getName());
}
@ -1284,17 +1284,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Set<String> userControls = portDTO.getUserAccessControl();
if (userControls != null && !userControls.isEmpty()) {
if (!(port instanceof RootGroupPort)) {
if (!(port instanceof PublicPort)) {
throw new IllegalStateException("Attempting to add User Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
}
((RootGroupPort) port).setUserAccessControl(userControls);
((PublicPort) port).setUserAccessControl(userControls);
}
final Set<String> groupControls = portDTO.getGroupAccessControl();
if (groupControls != null && !groupControls.isEmpty()) {
if (!(port instanceof RootGroupPort)) {
if (!(port instanceof PublicPort)) {
throw new IllegalStateException("Attempting to add Group Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
}
((RootGroupPort) port).setGroupAccessControl(groupControls);
((PublicPort) port).setGroupAccessControl(groupControls);
}
processGroup.addInputPort(port);
@ -1316,8 +1316,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final PortDTO portDTO = FlowFromDOMFactory.getPort(outputPortElement);
final Port port;
if (processGroup.isRootGroup()) {
port = flowManager.createRemoteOutputPort(portDTO.getId(), portDTO.getName());
if (processGroup.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
port = flowManager.createPublicOutputPort(portDTO.getId(), portDTO.getName());
} else {
port = flowManager.createLocalOutputPort(portDTO.getId(), portDTO.getName());
}
@ -1329,17 +1329,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Set<String> userControls = portDTO.getUserAccessControl();
if (userControls != null && !userControls.isEmpty()) {
if (!(port instanceof RootGroupPort)) {
if (!(port instanceof PublicPort)) {
throw new IllegalStateException("Attempting to add User Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
}
((RootGroupPort) port).setUserAccessControl(userControls);
((PublicPort) port).setUserAccessControl(userControls);
}
final Set<String> groupControls = portDTO.getGroupAccessControl();
if (groupControls != null && !groupControls.isEmpty()) {
if (!(port instanceof RootGroupPort)) {
if (!(port instanceof PublicPort)) {
throw new IllegalStateException("Attempting to add Group Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
}
((RootGroupPort) port).setGroupAccessControl(groupControls);
((PublicPort) port).setGroupAccessControl(groupControls);
}
processGroup.addOutputPort(port);

View File

@ -122,9 +122,11 @@ public class TemplateUtils {
processGroupDTO.setActiveRemotePortCount(null);
processGroupDTO.setDisabledCount(null);
processGroupDTO.setInactiveRemotePortCount(null);
processGroupDTO.setInputPortCount(null);
processGroupDTO.setLocalInputPortCount(null);
processGroupDTO.setPublicInputPortCount(null);
processGroupDTO.setInvalidCount(null);
processGroupDTO.setOutputPortCount(null);
processGroupDTO.setLocalOutputPortCount(null);
processGroupDTO.setPublicOutputPortCount(null);
processGroupDTO.setRunningCount(null);
processGroupDTO.setStoppedCount(null);
processGroupDTO.setUpToDateCount(null);

View File

@ -20,6 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
@ -62,9 +63,10 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.StandardPublicPort;
import org.apache.nifi.remote.StandardRemoteProcessGroup;
import org.apache.nifi.remote.StandardRootGroupPort;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.NiFiProperties;
@ -84,6 +86,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
@ -122,22 +125,61 @@ public class StandardFlowManager implements FlowManager {
this.isSiteToSiteSecure = Boolean.TRUE.equals(nifiProperties.isSiteToSiteSecure());
}
public Port createRemoteInputPort(String id, String name) {
public Port createPublicInputPort(String id, String name) {
id = requireNonNull(id).intern();
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties);
return new StandardPublicPort(id, name, null,
TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, authorizer, bulletinRepository,
processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(),
IdentityMappingUtil.getIdentityMappings(nifiProperties));
}
public Port createRemoteOutputPort(String id, String name) {
public Port createPublicOutputPort(String id, String name) {
id = requireNonNull(id).intern();
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties);
return new StandardPublicPort(id, name, null,
TransferDirection.SEND, ConnectableType.OUTPUT_PORT, authorizer, bulletinRepository,
processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(),
IdentityMappingUtil.getIdentityMappings(nifiProperties));
}
/**
* Gets all remotely accessible ports in any process group.
*
* @return input ports
*/
public Set<Port> getPublicInputPorts() {
return getPublicPorts(ProcessGroup::getInputPorts);
}
/**
* Gets all remotely accessible ports in any process group.
*
* @return output ports
*/
public Set<Port> getPublicOutputPorts() {
return getPublicPorts(ProcessGroup::getOutputPorts);
}
private Set<Port> getPublicPorts(final Function<ProcessGroup, Set<Port>> getPorts) {
final Set<Port> publicPorts = new HashSet<>();
ProcessGroup rootGroup = getRootGroup();
getPublicPorts(publicPorts, rootGroup, getPorts);
return publicPorts;
}
private void getPublicPorts(final Set<Port> publicPorts, final ProcessGroup group, final Function<ProcessGroup, Set<Port>> getPorts) {
for (final Port port : getPorts.apply(group)) {
if (port instanceof PublicPort) {
publicPorts.add(port);
}
}
group.getProcessGroups().forEach(childGroup -> getPublicPorts(publicPorts, childGroup, getPorts));
}
public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
return new StandardRemoteProcessGroup(requireNonNull(id), uris, null, processScheduler, bulletinRepository, sslContext, nifiProperties);
}

View File

@ -358,6 +358,7 @@ public class FlowFromDOMFactory {
portDTO.setPosition(getPosition(DomUtils.getChild(element, "position")));
portDTO.setName(getString(element, "name"));
portDTO.setComments(getString(element, "comments"));
portDTO.setAllowRemoteAccess(getBoolean(element, "allowRemoteAccess"));
final ScheduledState scheduledState = getScheduledState(element);
portDTO.setState(scheduledState.toString());

View File

@ -42,8 +42,8 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.StringUtils;
import org.w3c.dom.DOMException;
@ -212,20 +212,18 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addProcessor(element, processor, scheduledStateLookup);
}
if (group.isRootGroup()) {
for (final Port port : group.getInputPorts()) {
addRootGroupPort(element, (RootGroupPort) port, "inputPort", scheduledStateLookup);
}
for (final Port port : group.getOutputPorts()) {
addRootGroupPort(element, (RootGroupPort) port, "outputPort", scheduledStateLookup);
}
} else {
for (final Port port : group.getInputPorts()) {
for (final Port port : group.getInputPorts()) {
if (port instanceof PublicPort) {
addPublicPort(element, (PublicPort) port, "inputPort", scheduledStateLookup);
} else {
addPort(element, port, "inputPort", scheduledStateLookup);
}
}
for (final Port port : group.getOutputPorts()) {
for (final Port port : group.getOutputPorts()) {
if (port instanceof PublicPort) {
addPublicPort(element, (PublicPort) port, "outputPort", scheduledStateLookup);
} else {
addPort(element, port, "outputPort", scheduledStateLookup);
}
}
@ -416,7 +414,7 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
parentElement.appendChild(element);
}
private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
private void addPublicPort(final Element parentElement, final PublicPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
@ -427,6 +425,7 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
addTextElement(element, "allowRemoteAccess", Boolean.TRUE.toString());
for (final String user : port.getUserAccessControl()) {
addTextElement(element, "userAccessControl", user);
}

View File

@ -472,6 +472,7 @@ public class FingerprintFactory {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "id"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "versionedComponentId"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "name"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "allowRemoteAccess"));
final NodeList userAccessControlNodeList = DomUtils.getChildNodesByTagName(portElem, "userAccessControl");
if (userAccessControlNodeList == null || userAccessControlNodeList.getLength() == 0) {

View File

@ -133,8 +133,8 @@ import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
@ -280,8 +280,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ProcessGroupCounts getCounts() {
int inputPortCount = 0;
int outputPortCount = 0;
int localInputPortCount = 0;
int localOutputPortCount = 0;
int publicInputPortCount = 0;
int publicOutputPortCount = 0;
int running = 0;
int stopped = 0;
@ -310,8 +312,12 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
inputPortCount = inputPorts.size();
for (final Port port : inputPorts.values()) {
if (port instanceof PublicPort) {
publicInputPortCount++;
} else {
localInputPortCount++;
}
if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
disabled++;
} else if (port.isRunning()) {
@ -323,8 +329,12 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
outputPortCount = outputPorts.size();
for (final Port port : outputPorts.values()) {
if (port instanceof PublicPort) {
publicOutputPortCount++;
} else {
localOutputPortCount++;
}
if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
disabled++;
} else if (port.isRunning()) {
@ -405,7 +415,8 @@ public final class StandardProcessGroup implements ProcessGroup {
readLock.unlock();
}
return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped, invalid, disabled, activeRemotePorts,
return new ProcessGroupCounts(localInputPortCount, localOutputPortCount, publicInputPortCount, publicOutputPortCount,
running, stopped, invalid, disabled, activeRemotePorts,
inactiveRemotePorts, upToDate, locallyModified, stale, locallyModifiedAndStale, syncFailure);
}
@ -491,22 +502,31 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private void verifyPortUniqueness(final Port port,
final Map<String, Port> portIdMap,
final Function<String, Port> getPortByName) {
if (portIdMap.containsKey(requireNonNull(port).getIdentifier())) {
throw new IllegalStateException("A port with the same id already exists.");
}
if (getPortByName.apply(port.getName()) != null) {
throw new IllegalStateException("A port with the same name already exists.");
}
}
@Override
public void addInputPort(final Port port) {
if (isRootGroup()) {
if (!(port instanceof RootGroupPort)) {
if (!(port instanceof PublicPort)) {
throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to the Root Group");
}
} else if (!(port instanceof LocalPort)) {
throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to a non-root group");
}
writeLock.lock();
try {
if (inputPorts.containsKey(requireNonNull(port).getIdentifier())
|| getInputPortByName(port.getName()) != null) {
throw new IllegalStateException("The input port name or identifier is not available to be added.");
}
// Unique port check within the same group.
verifyPortUniqueness(port, inputPorts, name -> getInputPortByName(name));
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
@ -579,19 +599,15 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void addOutputPort(final Port port) {
if (isRootGroup()) {
if (!(port instanceof RootGroupPort)) {
if (!(port instanceof PublicPort)) {
throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to the Root Group");
}
} else if (!(port instanceof LocalPort)) {
throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to a non-root group");
}
writeLock.lock();
try {
if (outputPorts.containsKey(requireNonNull(port).getIdentifier())
|| getOutputPortByName(port.getName()) != null) {
throw new IllegalStateException("Output Port with given identifier or name is not available");
}
// Unique port check within the same group.
verifyPortUniqueness(port, outputPorts, name -> getOutputPortByName(name));
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
@ -2349,12 +2365,10 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
}
if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
throw new IllegalStateException("Cannot move Ports out of the root group");
}
if (destination.isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
throw new IllegalStateException("Cannot move Ports into the root group");
if (destination.isRootGroup() && (
snippet.getInputPorts().keySet().stream().map(this::getInputPort).anyMatch(port -> port instanceof LocalPort)
|| snippet.getOutputPorts().keySet().stream().map(this::getOutputPort).anyMatch(port -> port instanceof LocalPort))) {
throw new IllegalStateException("Cannot move local Ports into the root group");
}
onComponentModified();
@ -2763,10 +2777,6 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
}
if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group");
}
for (final String id : snippet.getInputPorts().keySet()) {
final Port port = getInputPort(id);
final String portName = port.getName();

View File

@ -51,8 +51,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import java.io.IOException;
import java.util.ArrayList;
@ -405,10 +405,9 @@ public class StandardEventAccess implements UserAwareEventAccess {
portStatus.setRunStatus(RunStatus.Stopped);
}
// special handling for root group ports
if (port instanceof RootGroupPort) {
final RootGroupPort rootGroupPort = (RootGroupPort) port;
portStatus.setTransmitting(rootGroupPort.isTransmitting());
// special handling for public ports
if (port instanceof PublicPort) {
portStatus.setTransmitting(((PublicPort) port).isTransmitting());
}
final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier());
@ -428,8 +427,9 @@ public class StandardEventAccess implements UserAwareEventAccess {
portStatus.setInputBytes(inputBytes);
portStatus.setInputCount(inputCount);
flowFilesIn += inputCount;
bytesIn += inputBytes;
flowFilesIn += port instanceof PublicPort ? entry.getFlowFilesReceived() : inputCount;
bytesIn += port instanceof PublicPort ? entry.getBytesReceived() : inputBytes;
bytesWritten += entry.getBytesWritten();
flowFilesReceived += entry.getFlowFilesReceived();
@ -468,10 +468,9 @@ public class StandardEventAccess implements UserAwareEventAccess {
portStatus.setRunStatus(RunStatus.Stopped);
}
// special handling for root group ports
if (port instanceof RootGroupPort) {
final RootGroupPort rootGroupPort = (RootGroupPort) port;
portStatus.setTransmitting(rootGroupPort.isTransmitting());
// special handling for public ports
if (port instanceof PublicPort) {
portStatus.setTransmitting(((PublicPort) port).isTransmitting());
}
final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier());
@ -493,8 +492,8 @@ public class StandardEventAccess implements UserAwareEventAccess {
bytesRead += entry.getBytesRead();
flowFilesOut += entry.getFlowFilesOut();
bytesOut += entry.getContentSizeOut();
flowFilesOut += port instanceof PublicPort ? entry.getFlowFilesSent() : entry.getFlowFilesOut();
bytesOut += port instanceof PublicPort ? entry.getBytesSent() : entry.getContentSizeOut();
flowFilesSent = entry.getFlowFilesSent();
bytesSent += entry.getBytesSent();

View File

@ -157,8 +157,8 @@
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
<xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="inputPort" type="PortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="outputPort" type="PortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="inputPort" type="PublicPortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="outputPort" type="PublicPortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="label" type="LabelType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="funnel" type="FunnelType" minOccurs="0" maxOccurs="unbounded" />
@ -188,8 +188,9 @@
</xs:sequence>
</xs:complexType>
<!-- Same as ProcessGroupType except that instead of input ports & output ports being of type PortType,
they are of type RootGroupPortType -->
<!-- Same as ProcessGroupType except:
- RootProcessGroupType doesn't have versionControlInformation
- RootProcessGroupType doesn't have variable -->
<xs:complexType name="RootProcessGroupType">
<xs:sequence>
<xs:element name="id" type="NonEmptyStringType" />
@ -201,8 +202,8 @@
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
<xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="inputPort" type="RootGroupPortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="outputPort" type="RootGroupPortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="inputPort" type="PublicPortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="outputPort" type="PublicPortType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="label" type="LabelType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="funnel" type="FunnelType" minOccurs="0" maxOccurs="unbounded" />
@ -311,13 +312,14 @@
</xs:sequence>
</xs:complexType>
<xs:complexType name="RootGroupPortType">
<xs:complexType name="PublicPortType">
<xs:complexContent>
<xs:extension base="PortType">
<xs:sequence>
<xs:element name="maxConcurrentTasks" type="xs:positiveInteger"></xs:element>
<xs:element name="maxConcurrentTasks" type="xs:positiveInteger" minOccurs="0" ></xs:element>
<xs:element name="userAccessControl" type="xs:string" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="groupAccessControl" type="xs:string" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="allowRemoteAccess" type="xs:boolean" minOccurs="0" maxOccurs="1" />
</xs:sequence>
</xs:extension>
</xs:complexContent>

View File

@ -17,25 +17,32 @@
package org.apache.nifi.connectable;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class TestLocalPort {
@Test
public void testDefaultValues() {
LocalPort port = getLocalPort("", "");
LocalPort port = getLocalInputPort();
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(10, port.maxIterations);
}
@Test
public void testSetConcurrentTasks() {
LocalPort port = getLocalPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2");
LocalPort port = getLocalInputPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2");
assertEquals(2, port.getMaxConcurrentTasks());
assertEquals(10, port.maxIterations);
}
@ -43,36 +50,108 @@ public class TestLocalPort {
@Test
public void testSetFlowFileLimit() {
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000");
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(100, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001");
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(101, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999");
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(100, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0");
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(1, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1");
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(1, port.maxIterations);
}
}
private LocalPort getLocalPort(String name, String value) {
HashMap<String, String> additionalProperties = new HashMap<>();
additionalProperties.put(name, value);
NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, additionalProperties);
return new LocalPort("1", "test", ConnectableType.INPUT_PORT, null, niFiProperties);
private LocalPort getLocalInputPort() {
return getLocalPort(ConnectableType.INPUT_PORT, Collections.emptyMap());
}
}
private LocalPort getLocalInputPort(String name, String value) {
Map<String, String> additionalProperties = new HashMap<>();
additionalProperties.put(name, value);
return getLocalPort(ConnectableType.INPUT_PORT, additionalProperties);
}
private LocalPort getLocalPort(ConnectableType type, Map<String, String> additionalProperties) {
NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, additionalProperties);
return new LocalPort("1", "test", type, null, niFiProperties);
}
private LocalPort getLocalOutputPort() {
return getLocalPort(ConnectableType.OUTPUT_PORT, Collections.emptyMap());
}
@Test
public void testInvalidLocalInputPort() {
final LocalPort port = getLocalInputPort();
assertFalse(port.isValid());
}
@Test
public void testValidLocalInputPort() {
final LocalPort port = getLocalInputPort();
// Add an incoming relationship.
port.addConnection(new StandardConnection.Builder(null)
.source(mock(Connectable.class))
.destination(port)
.relationships(Collections.singleton(Relationship.ANONYMOUS))
.flowFileQueueFactory(mock(FlowFileQueueFactory.class))
.build());
// Add an outgoing relationship.
port.addConnection(new StandardConnection.Builder(null)
.source(port)
.destination(mock(Connectable.class))
.relationships(Collections.singleton(Relationship.ANONYMOUS))
.flowFileQueueFactory(mock(FlowFileQueueFactory.class))
.build());
assertTrue(port.isValid());
}
@Test
public void testInvalidLocalOutputPort() {
final LocalPort port = getLocalOutputPort();
assertFalse(port.isValid());
}
@Test
public void testValidLocalOutputPort() {
final LocalPort port = getLocalOutputPort();
// Add an incoming relationship.
port.addConnection(new StandardConnection.Builder(null)
.source(mock(Connectable.class))
.destination(port)
.relationships(Collections.singleton(Relationship.ANONYMOUS))
.flowFileQueueFactory(mock(FlowFileQueueFactory.class))
.build());
// Add an outgoing relationship.
port.addConnection(new StandardConnection.Builder(null)
.source(port)
.destination(mock(Connectable.class))
.relationships(Collections.singleton(Relationship.ANONYMOUS))
.flowFileQueueFactory(mock(FlowFileQueueFactory.class))
.build());
assertTrue(port.isValid());
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.remote;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
@ -52,7 +54,6 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,17 +75,19 @@ import java.util.concurrent.locks.ReentrantLock;
import static java.util.Objects.requireNonNull;
public class StandardRootGroupPort extends AbstractPort implements RootGroupPort {
public class StandardPublicPort extends AbstractPort implements PublicPort {
private static final String CATEGORY = "Site to Site";
private static final Logger logger = LoggerFactory.getLogger(StandardRootGroupPort.class);
private static final Logger logger = LoggerFactory.getLogger(StandardPublicPort.class);
private final AtomicReference<Set<String>> groupAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
private final AtomicReference<Set<String>> userAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
private final AtomicReference<Set<String>> groupAccessControl = new AtomicReference<>(new HashSet<>());
private final AtomicReference<Set<String>> userAccessControl = new AtomicReference<>(new HashSet<>());
private final boolean secure;
private final Authorizer authorizer;
private final List<IdentityMapping> identityMappings;
private TransferDirection direction;
@SuppressWarnings("unused")
private final BulletinRepository bulletinRepository;
@ -98,34 +101,34 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
private final Lock requestLock = new ReentrantLock();
private boolean shutdown = false; // guarded by requestLock
public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup,
final TransferDirection direction, final ConnectableType type, final Authorizer authorizer,
final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure,
final NiFiProperties nifiProperties) {
public StandardPublicPort(final String id, final String name, final ProcessGroup processGroup,
final TransferDirection direction, final ConnectableType type, final Authorizer authorizer,
final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure,
final String yieldPeriod, final List<IdentityMapping> identityMappings) {
super(id, name, processGroup, type, scheduler);
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
this.authorizer = authorizer;
this.secure = secure;
this.identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties);
this.identityMappings = identityMappings;
this.bulletinRepository = bulletinRepository;
this.scheduler = scheduler;
setYieldPeriod(nifiProperties.getBoredYieldDuration());
this.direction = direction;
setYieldPeriod(yieldPeriod);
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
final String groupName = StandardRootGroupPort.this.getProcessGroup().getName();
final String sourceId = StandardRootGroupPort.this.getIdentifier();
final String sourceName = StandardRootGroupPort.this.getName();
final String groupId = processGroup.getIdentifier();
final String groupName = processGroup.getName();
final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT;
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, groupName, sourceId, componentType, sourceName, category, severity.name(), message));
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, groupName, id, componentType, name, category, severity.name(), message));
}
};
relationships = direction == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.<Relationship>emptySet();
relationships = direction == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.emptySet();
}
@Override
@ -240,11 +243,6 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
} else {
transferCount = transferFlowFiles(context, session, codec, flowFileRequest);
}
} catch (final IOException e) {
session.rollback();
responseQueue.add(new ProcessingResult(e));
return;
} catch (final Exception e) {
session.rollback();
responseQueue.add(new ProcessingResult(e));
@ -252,16 +250,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
return;
}
// TODO: Comfirm this. Session.commit here is not required since it has been committed inside receiveFlowFiles/transferFlowFiles.
// session.commit();
responseQueue.add(new ProcessingResult(transferCount));
}
private int transferFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest request) throws IOException, ProtocolException {
private int transferFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest request) throws IOException {
return request.getProtocol().transferFlowFiles(request.getPeer(), context, session, codec);
}
private int receiveFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest receiveRequest) throws IOException, ProtocolException {
private int receiveFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest receiveRequest) throws IOException {
return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec);
}
@ -376,7 +372,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
}
if (user == null) {
final String message = String.format("%s authorization failed because the user is unknown", this, user);
final String message = String.format("%s authorization failed because the user is unknown", this);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
return new StandardPortAuthorizationResult(false, "User is not known");
@ -500,7 +496,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port");
}
if (!this.isRunning()) {
if (!isRunning()) {
throw new IllegalStateException("Port not running");
}
@ -556,7 +552,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port");
}
if (!this.isRunning()) {
if (!isRunning()) {
throw new IllegalStateException("Port not running");
}
@ -570,7 +566,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
scheduler.registerEvent(this);
// Get a response from the response queue but don't wait forever if the port is stopped
ProcessingResult result = null;
ProcessingResult result;
// wait for the request to start getting serviced... and time out if it doesn't happen
// before the request expires
@ -617,6 +613,17 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
@Override
public String getComponentType() {
return "RootGroupPort";
return "PublicPort";
}
@Override
public TransferDirection getDirection() {
return direction;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("id", getIdentifier()).append("name", getName()).toString();
}
}

View File

@ -28,7 +28,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
@ -60,7 +60,7 @@ import java.util.zip.CheckedOutputStream;
public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
protected ProcessGroup rootGroup;
protected RootGroupPort port;
protected PublicPort port;
protected boolean handshakeCompleted;
protected boolean shutdown = false;
@ -134,20 +134,20 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
}
protected void checkPortStatus(final Peer peer, String portId) throws HandshakeException {
Port receivedPort = rootGroup.getInputPort(portId);
Port receivedPort = rootGroup.findInputPort(portId);
if (receivedPort == null) {
receivedPort = rootGroup.getOutputPort(portId);
receivedPort = rootGroup.findOutputPort(portId);
}
if (receivedPort == null) {
logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", portId);
throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received unknown port identifier: " + portId);
}
if (!(receivedPort instanceof RootGroupPort)) {
if (!(receivedPort instanceof PublicPort)) {
logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", portId);
throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received port identifier " + portId + ", but this Port is not a RootGroupPort");
throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received port identifier " + portId + ", but this Port is not remotely accessible");
}
this.port = (RootGroupPort) receivedPort;
this.port = (PublicPort) receivedPort;
final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
if (!portAuthResult.isAuthorized()) {
logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
@ -178,7 +178,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
}
@Override
public RootGroupPort getPort() {
public PublicPort getPort() {
return port;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.remote;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.groups.ProcessGroup;
@ -36,9 +37,9 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class TestStandardRootGroupPort {
public class TestStandardPublicPort {
private RootGroupPort createRootGroupPort(NiFiProperties nifiProperties) {
private PublicPort createPublicPort(NiFiProperties nifiProperties) {
final BulletinRepository bulletinRepository = mock(BulletinRepository.class);
final ProcessScheduler processScheduler = null;
@ -56,9 +57,9 @@ public class TestStandardRootGroupPort {
final ProcessGroup processGroup = mock(ProcessGroup.class);
doReturn("process-group-id").when(processGroup).getIdentifier();
return new StandardRootGroupPort("id", "name", processGroup,
TransferDirection.SEND, ConnectableType.INPUT_PORT, authorizer, bulletinRepository,
processScheduler, true, nifiProperties);
return new StandardPublicPort("id", "name", processGroup,
TransferDirection.SEND, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, processScheduler, true,
nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties));
}
@Test
@ -66,7 +67,7 @@ public class TestStandardRootGroupPort {
final NiFiProperties nifiProperties = mock(NiFiProperties.class);
doReturn("1 millis").when(nifiProperties).getBoredYieldDuration();
final RootGroupPort port = createRootGroupPort(nifiProperties);
final PublicPort port = createPublicPort(nifiProperties);
PortAuthorizationResult authResult = port.checkUserAuthorization("CN=node1, OU=nifi.test");
Assert.assertFalse(authResult.isAuthorized());
@ -91,7 +92,7 @@ public class TestStandardRootGroupPort {
doReturn(mapValue).when(nifiProperties).getProperty(eq(NiFiProperties.SECURITY_IDENTITY_MAPPING_VALUE_PREFIX + mapKey));
doReturn("1 millis").when(nifiProperties).getBoredYieldDuration();
final RootGroupPort port = createRootGroupPort(nifiProperties);
final PublicPort port = createPublicPort(nifiProperties);
PortAuthorizationResult authResult = port.checkUserAuthorization("CN=node2, OU=nifi.test");
Assert.assertFalse(authResult.isAuthorized());
@ -119,7 +120,7 @@ public class TestStandardRootGroupPort {
doReturn(mapTransform).when(nifiProperties).getProperty(eq(NiFiProperties.SECURITY_IDENTITY_MAPPING_TRANSFORM_PREFIX + mapKey));
doReturn("1 millis").when(nifiProperties).getBoredYieldDuration();
final RootGroupPort port = createRootGroupPort(nifiProperties);
final PublicPort port = createPublicPort(nifiProperties);
PortAuthorizationResult authResult = port.checkUserAuthorization("CN=node2, OU=nifi.test");
Assert.assertFalse(authResult.isAuthorized());

View File

@ -31,7 +31,7 @@ import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
@ -72,6 +72,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -153,10 +154,10 @@ public class TestHttpFlowFileServerProtocol {
.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier");
final ProcessGroup processGroup = mock(ProcessGroup.class);
final RootGroupPort port = mock(RootGroupPort.class);
final PublicPort port = mock(PublicPort.class);
final PortAuthorizationResult authResult = mock(PortAuthorizationResult.class);
doReturn(true).when(processGroup).isRootGroup();
doReturn(port).when(processGroup).getOutputPort("port-identifier");
doReturn(port).when(processGroup).findOutputPort("port-identifier");
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
serverProtocol.setRootProcessGroup(processGroup);
@ -178,10 +179,10 @@ public class TestHttpFlowFileServerProtocol {
.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier");
final ProcessGroup processGroup = mock(ProcessGroup.class);
final RootGroupPort port = mock(RootGroupPort.class);
final PublicPort port = mock(PublicPort.class);
final PortAuthorizationResult authResult = mock(PortAuthorizationResult.class);
doReturn(true).when(processGroup).isRootGroup();
doReturn(port).when(processGroup).getOutputPort("port-identifier");
doReturn(port).when(processGroup).findInputPort(eq("port-identifier"));
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
doReturn(true).when(authResult).isAuthorized();
@ -204,10 +205,10 @@ public class TestHttpFlowFileServerProtocol {
.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier");
final ProcessGroup processGroup = mock(ProcessGroup.class);
final RootGroupPort port = mock(RootGroupPort.class);
final PublicPort port = mock(PublicPort.class);
final PortAuthorizationResult authResult = mock(PortAuthorizationResult.class);
doReturn(true).when(processGroup).isRootGroup();
doReturn(port).when(processGroup).getOutputPort("port-identifier");
doReturn(port).when(processGroup).findOutputPort("port-identifier");
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
doReturn(true).when(authResult).isAuthorized();
doReturn(true).when(port).isValid();

View File

@ -27,7 +27,7 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.dao.PortDAO;
@ -94,11 +94,12 @@ public class PortAuditor extends NiFiAuditor {
final Set<String> existingUsers = new HashSet<>();
final Set<String> existingGroups = new HashSet<>();
boolean isRootGroupPort = false;
if (port instanceof RootGroupPort) {
isRootGroupPort = true;
existingUsers.addAll(((RootGroupPort) port).getUserAccessControl());
existingGroups.addAll(((RootGroupPort) port).getGroupAccessControl());
boolean isPublicPort = false;
if (port instanceof PublicPort) {
isPublicPort = true;
final PublicPort publicPort = (PublicPort) port;
existingUsers.addAll(publicPort.getUserAccessControl());
existingGroups.addAll(publicPort.getGroupAccessControl());
}
// perform the underlying operation
@ -134,7 +135,7 @@ public class PortAuditor extends NiFiAuditor {
}
// if this is a root group port, consider concurrent tasks
if (isRootGroupPort) {
if (isPublicPort) {
if (portDTO.getConcurrentlySchedulableTaskCount() != null && updatedPort.getMaxConcurrentTasks() != maxConcurrentTasks) {
// create the config details
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();

View File

@ -95,20 +95,20 @@ public interface AuthorizableLookup {
Authorizable getFlow();
/**
* Get the authorizable RootGroup InputPort.
* Get the authorizable public InputPort.
*
* @param id input port id
* @return authorizable
*/
RootGroupPortAuthorizable getRootGroupInputPort(String id);
PublicPortAuthorizable getPublicInputPort(String id);
/**
* Get the authorizable RootGroup OutputPort.
* Get the authorizable public OutputPort.
*
* @param id output port id
* @return authorizable
*/
RootGroupPortAuthorizable getRootGroupOutputPort(String id);
PublicPortAuthorizable getPublicOutputPort(String id);
/**
* Get the authorizable InputPort.

View File

@ -20,11 +20,11 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
/**
* Authorizable for a RootGroupPort.
* Authorizable for a PublicPort.
*/
public interface RootGroupPortAuthorizable {
public interface PublicPortAuthorizable {
/**
* Returns the authorizable for this RootGroupPort. Non null
* Returns the authorizable for this PublicGroupPort. Non null
*
* @return authorizable
*/

View File

@ -45,7 +45,7 @@ import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
@ -203,15 +203,15 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
}
@Override
public RootGroupPortAuthorizable getRootGroupInputPort(String id) {
public PublicPortAuthorizable getPublicInputPort(String id) {
final Port inputPort = inputPortDAO.getPort(id);
if (!(inputPort instanceof RootGroupPort)) {
throw new IllegalArgumentException(String.format("The specified id '%s' does not represent an input port in the root group.", id));
if (!(inputPort instanceof PublicPort)) {
throw new IllegalArgumentException(String.format("The specified id '%s' does not represent an input port which can be accessed remotely.", id));
}
final DataTransferAuthorizable baseAuthorizable = new DataTransferAuthorizable(inputPort);
return new RootGroupPortAuthorizable() {
return new PublicPortAuthorizable() {
@Override
public Authorizable getAuthorizable() {
return baseAuthorizable;
@ -220,7 +220,7 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
@Override
public AuthorizationResult checkAuthorization(NiFiUser user) {
// perform the authorization of the user by using the underlying component, ensures consistent authorization with raw s2s
final PortAuthorizationResult authorizationResult = ((RootGroupPort) inputPort).checkUserAuthorization(user);
final PortAuthorizationResult authorizationResult = ((PublicPort) inputPort).checkUserAuthorization(user);
if (authorizationResult.isAuthorized()) {
return AuthorizationResult.approved();
} else {
@ -231,15 +231,15 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
}
@Override
public RootGroupPortAuthorizable getRootGroupOutputPort(String id) {
public PublicPortAuthorizable getPublicOutputPort(String id) {
final Port outputPort = outputPortDAO.getPort(id);
if (!(outputPort instanceof RootGroupPort)) {
throw new IllegalArgumentException(String.format("The specified id '%s' does not represent an output port in the root group.", id));
if (!(outputPort instanceof PublicPort)) {
throw new IllegalArgumentException(String.format("The specified id '%s' does not represent an output port which can be accessed remotely.", id));
}
final DataTransferAuthorizable baseAuthorizable = new DataTransferAuthorizable(outputPort);
return new RootGroupPortAuthorizable() {
return new PublicPortAuthorizable() {
@Override
public Authorizable getAuthorizable() {
return baseAuthorizable;
@ -248,7 +248,7 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
@Override
public AuthorizationResult checkAuthorization(NiFiUser user) {
// perform the authorization of the user by using the underlying component, ensures consistent authorization with raw s2s
final PortAuthorizationResult authorizationResult = ((RootGroupPort) outputPort).checkUserAuthorization(user);
final PortAuthorizationResult authorizationResult = ((PublicPort) outputPort).checkUserAuthorization(user);
if (authorizationResult.isAuthorized()) {
return AuthorizationResult.approved();
} else {

View File

@ -914,6 +914,28 @@ public interface NiFiServiceFacade {
*/
PortEntity deleteOutputPort(Revision revision, String outputPortId);
/**
* Verifies public input port unique constraint throughout the flow will be retained,
* even if a new port is added with the given port id and name.
*
* @param portId port id
* @param portName port name
*
* @throws IllegalStateException If there is any port with the same name or the same identifier
*/
void verifyPublicInputPortUniqueness(final String portId, final String portName);
/**
* Verifies public output port unique constraint throughout the flow will be retained,
* even if a new port is added with the given port id and name.
*
* @param portId port id
* @param portName port name
*
* @throws IllegalStateException If there is any port with the same name or the same identifier
*/
void verifyPublicOutputPortUniqueness(final String portId, final String portName);
// ------------
// Current user
// ------------

View File

@ -122,7 +122,6 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
@ -3088,7 +3087,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
* authorized for the transfer. This method is only invoked when obtaining the site to site
* details so the entire chain isn't necessary.
*/
private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
private boolean isUserAuthorized(final NiFiUser user, final Port port) {
final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
// if site to site is not secure, allow all users
@ -3128,8 +3127,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// serialize the input ports this NiFi has access to
final Set<PortDTO> inputPortDtos = new LinkedHashSet<>();
final Set<RootGroupPort> inputPorts = controllerFacade.getInputPorts();
for (final RootGroupPort inputPort : inputPorts) {
for (final Port inputPort : controllerFacade.getPublicInputPorts()) {
if (isUserAuthorized(user, inputPort)) {
final PortDTO dto = new PortDTO();
dto.setId(inputPort.getIdentifier());
@ -3142,7 +3140,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// serialize the output ports this NiFi has access to
final Set<PortDTO> outputPortDtos = new LinkedHashSet<>();
for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) {
for (final Port outputPort : controllerFacade.getPublicOutputPorts()) {
if (isUserAuthorized(user, outputPort)) {
final PortDTO dto = new PortDTO();
dto.setId(outputPort.getIdentifier());
@ -4798,6 +4796,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
};
}
@Override
public void verifyPublicInputPortUniqueness(final String portId, final String portName) {
inputPortDAO.verifyPublicPortUniqueness(portId, portName);
}
@Override
public void verifyPublicOutputPortUniqueness(final String portId, final String portName) {
outputPortDAO.verifyPublicPortUniqueness(portId, portName);
}
/* setters */
public void setProperties(final NiFiProperties properties) {

View File

@ -27,7 +27,7 @@ import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.RootGroupPortAuthorizable;
import org.apache.nifi.authorization.PublicPortAuthorizable;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
@ -135,11 +135,11 @@ public class DataTransferResource extends ApplicationResource {
}
// get the authorizable
final RootGroupPortAuthorizable authorizable;
final PublicPortAuthorizable authorizable;
if (ResourceType.InputPort.equals(resourceType)) {
authorizable = lookup.getRootGroupInputPort(identifier);
authorizable = lookup.getPublicInputPort(identifier);
} else {
authorizable = lookup.getRootGroupOutputPort(identifier);
authorizable = lookup.getPublicOutputPort(identifier);
}
// perform the authorization

View File

@ -64,6 +64,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
@ -2964,7 +2965,7 @@ public class ProcessGroupResource extends ApplicationResource {
connectionEntity -> {
final ConnectionDTO connection = connectionEntity.getComponent();
// set the processor id as appropriate
// set the connection id as appropriate
connection.setId(generateUuid());
// create the new relationship target
@ -3291,9 +3292,14 @@ public class ProcessGroupResource extends ApplicationResource {
},
() -> serviceFacade.verifyComponentTypes(requestInstantiateTemplateRequestEntity.getSnippet()),
instantiateTemplateRequestEntity -> {
final FlowSnippetDTO snippet = instantiateTemplateRequestEntity.getSnippet();
// Check if the snippet contains any public port violating public port unique constraint with the current flow
verifyPublicPortUniqueness(snippet);
// create the template and generate the json
final FlowEntity entity = serviceFacade.createTemplateInstance(groupId, instantiateTemplateRequestEntity.getOriginX(), instantiateTemplateRequestEntity.getOriginY(),
instantiateTemplateRequestEntity.getEncodingVersion(), instantiateTemplateRequestEntity.getSnippet(), getIdGenerationSeed().orElse(null));
instantiateTemplateRequestEntity.getEncodingVersion(), snippet, getIdGenerationSeed().orElse(null));
final FlowDTO flowSnippet = entity.getFlow();
@ -3311,6 +3317,33 @@ public class ProcessGroupResource extends ApplicationResource {
);
}
private void verifyPublicPortUniqueness(FlowSnippetDTO snippet) {
snippet.getInputPorts().stream().filter(portDTO -> Boolean.TRUE.equals(portDTO.getAllowRemoteAccess()))
.forEach(portDTO -> {
try {
serviceFacade.verifyPublicInputPortUniqueness(portDTO.getId(), portDTO.getName());
} catch (IllegalStateException e) {
throw toPublicPortUniqueConstraintViolationException("input", portDTO);
}
});
snippet.getOutputPorts().stream().filter(portDTO -> Boolean.TRUE.equals(portDTO.getAllowRemoteAccess()))
.forEach(portDTO -> {
try {
serviceFacade.verifyPublicOutputPortUniqueness(portDTO.getId(), portDTO.getName());
} catch (IllegalStateException e) {
throw toPublicPortUniqueConstraintViolationException("output", portDTO);
}
});
snippet.getProcessGroups().forEach(processGroupDTO -> verifyPublicPortUniqueness(processGroupDTO.getContents()));
}
private IllegalStateException toPublicPortUniqueConstraintViolationException(final String portType, final PortDTO portDTO) {
return new IllegalStateException(String.format("The %s port [%s] named '%s' will violate the public port unique constraint." +
" Rename the existing port name, or the one in the template to instantiate the template in this flow.", portType, portDTO.getId(), portDTO.getName()));
}
// ---------
// templates
// ---------

View File

@ -147,8 +147,8 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPor
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
@ -1332,12 +1332,13 @@ public final class DtoFactory {
dto.setType(port.getConnectableType().name());
dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null));
// if this port is on the root group, determine if its actually connected to another nifi
if (port instanceof RootGroupPort) {
final RootGroupPort rootGroupPort = (RootGroupPort) port;
dto.setTransmitting(rootGroupPort.isTransmitting());
dto.setGroupAccessControl(rootGroupPort.getGroupAccessControl());
dto.setUserAccessControl(rootGroupPort.getUserAccessControl());
// if this port is remotely accessible, determine if its actually connected to another nifi
if (port instanceof PublicPort) {
final PublicPort publicPort = (PublicPort) port;
dto.setAllowRemoteAccess(true);
dto.setTransmitting(publicPort.isTransmitting());
dto.setGroupAccessControl(publicPort.getGroupAccessControl());
dto.setUserAccessControl(publicPort.getUserAccessControl());
}
final Collection<ValidationResult> validationErrors = port.getValidationErrors();
@ -2293,8 +2294,10 @@ public final class DtoFactory {
dto.setStoppedCount(counts.getStoppedCount());
dto.setInvalidCount(counts.getInvalidCount());
dto.setDisabledCount(counts.getDisabledCount());
dto.setInputPortCount(counts.getInputPortCount());
dto.setOutputPortCount(counts.getOutputPortCount());
dto.setLocalInputPortCount(counts.getLocalInputPortCount());
dto.setLocalOutputPortCount(counts.getLocalOutputPortCount());
dto.setPublicInputPortCount(counts.getPublicInputPortCount());
dto.setPublicOutputPortCount(counts.getPublicOutputPortCount());
dto.setActiveRemotePortCount(counts.getActiveRemotePortCount());
dto.setInactiveRemotePortCount(counts.getInactiveRemotePortCount());
dto.setUpToDateCount(counts.getUpToDateCount());
@ -3969,6 +3972,7 @@ public final class DtoFactory {
copy.setGroupAccessControl(copy(original.getGroupAccessControl()));
copy.setValidationErrors(copy(original.getValidationErrors()));
copy.setVersionedComponentId(original.getVersionedComponentId());
copy.setAllowRemoteAccess(original.getAllowRemoteAccess());
return copy;
}
@ -4004,11 +4008,13 @@ public final class DtoFactory {
copy.setContents(copy(original.getContents(), deep));
copy.setPosition(original.getPosition());
copy.setId(original.getId());
copy.setInputPortCount(original.getInputPortCount());
copy.setLocalInputPortCount(original.getLocalInputPortCount());
copy.setPublicInputPortCount(original.getPublicInputPortCount());
copy.setInvalidCount(original.getInvalidCount());
copy.setName(original.getName());
copy.setVersionControlInformation(copy(original.getVersionControlInformation()));
copy.setOutputPortCount(original.getOutputPortCount());
copy.setLocalOutputPortCount(original.getLocalOutputPortCount());
copy.setPublicOutputPortCount(original.getPublicOutputPortCount());
copy.setParentGroupId(original.getParentGroupId());
copy.setVersionedComponentId(original.getVersionedComponentId());

View File

@ -230,6 +230,7 @@ public final class EntityFactory {
entity.setId(dto.getId());
entity.setPosition(dto.getPosition());
entity.setPortType(dto.getType());
entity.setAllowRemoteAccess(dto.getAllowRemoteAccess());
if (permissions != null && permissions.getCanRead()) {
entity.setComponent(dto);
entity.setBulletins(bulletins);
@ -249,8 +250,10 @@ public final class EntityFactory {
entity.setId(dto.getId());
entity.setPosition(dto.getPosition());
entity.setInputPortCount(dto.getInputPortCount());
entity.setOutputPortCount(dto.getOutputPortCount());
entity.setLocalInputPortCount(dto.getLocalInputPortCount());
entity.setLocalOutputPortCount(dto.getLocalOutputPortCount());
entity.setPublicInputPortCount(dto.getPublicInputPortCount());
entity.setPublicOutputPortCount(dto.getPublicOutputPortCount());
entity.setRunningCount(dto.getRunningCount());
entity.setStoppedCount(dto.getStoppedCount());
entity.setInvalidCount(dto.getInvalidCount());

View File

@ -78,8 +78,8 @@ import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.services.FlowService;
@ -256,35 +256,19 @@ public class ControllerFacade implements Authorizable {
}
/**
* Gets the input ports on the root group.
*
* Gets the remotely accessible InputPorts in any ProcessGroup.
* @return input ports
*/
public Set<RootGroupPort> getInputPorts() {
final Set<RootGroupPort> inputPorts = new HashSet<>();
ProcessGroup rootGroup = getRootGroup();
for (final Port port : rootGroup.getInputPorts()) {
if (port instanceof RootGroupPort) {
inputPorts.add((RootGroupPort) port);
}
}
return inputPorts;
public Set<Port> getPublicInputPorts() {
return flowController.getFlowManager().getPublicInputPorts();
}
/**
* Gets the output ports on the root group.
*
* Gets the remotely accessible OutputPorts in any ProcessGroup.
* @return output ports
*/
public Set<RootGroupPort> getOutputPorts() {
final Set<RootGroupPort> outputPorts = new HashSet<>();
ProcessGroup rootGroup = getRootGroup();
for (final Port port : rootGroup.getOutputPorts()) {
if (port instanceof RootGroupPort) {
outputPorts.add((RootGroupPort) port);
}
}
return outputPorts;
public Set<Port> getPublicOutputPorts() {
return flowController.getFlowManager().getPublicOutputPorts();
}
/**
@ -907,7 +891,7 @@ public class ControllerFacade implements Authorizable {
resources.add(ResourceFactory.getProvenanceDataResource(inputPortResource));
resources.add(ResourceFactory.getPolicyResource(inputPortResource));
resources.add(ResourceFactory.getOperationResource(inputPortResource));
if (inputPort instanceof RootGroupPort) {
if (inputPort instanceof PublicPort) {
resources.add(ResourceFactory.getDataTransferResource(inputPortResource));
}
}
@ -920,7 +904,7 @@ public class ControllerFacade implements Authorizable {
resources.add(ResourceFactory.getProvenanceDataResource(outputPortResource));
resources.add(ResourceFactory.getPolicyResource(outputPortResource));
resources.add(ResourceFactory.getOperationResource(outputPortResource));
if (outputPort instanceof RootGroupPort) {
if (outputPort instanceof PublicPort) {
resources.add(ResourceFactory.getDataTransferResource(outputPortResource));
}
}

View File

@ -42,7 +42,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.search.SearchContext;
@ -186,16 +186,16 @@ public class ControllerSearchService {
}
}
if (port instanceof RootGroupPort) {
final RootGroupPort rootGroupPort = (RootGroupPort) port;
if (port instanceof PublicPort) {
final PublicPort publicPort = (PublicPort) port;
// user access controls
for (final String userAccessControl : rootGroupPort.getUserAccessControl()) {
for (final String userAccessControl : publicPort.getUserAccessControl()) {
addIfAppropriate(searchStr, userAccessControl, "User access control", matches);
}
// group access controls
for (final String groupAccessControl : rootGroupPort.getGroupAccessControl()) {
for (final String groupAccessControl : publicPort.getGroupAccessControl()) {
addIfAppropriate(searchStr, groupAccessControl, "Group access control", matches);
}
}

View File

@ -82,4 +82,15 @@ public interface PortDAO {
* @param portId The port id
*/
void deletePort(String portId);
/**
* Verifies public port unique constraint throughout the flow will be retained,
* even if a new port is added with the given port id and name.
*
* @param portId port id
* @param portName port name
*
* @throws IllegalStateException If there is any port with the same name or the same identifier
*/
void verifyPublicPortUniqueness(final String portId, final String portName);
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.dao.PortDAO;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public abstract class AbstractPortDAO extends ComponentDAO implements PortDAO {
protected FlowController flowController;
protected abstract Port locatePort(final String portId);
@Override
public void verifyUpdate(PortDTO portDTO) {
final Port port = locatePort(portDTO.getId());
verifyUpdate(port, portDTO);
}
protected void verifyUpdate(final Port port, final PortDTO portDTO) {
if (isNotNull(portDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(port.getScheduledState())) {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
port.verifyCanStart();
break;
case STOPPED:
switch (port.getScheduledState()) {
case RUNNING:
port.verifyCanStop();
break;
case DISABLED:
port.verifyCanEnable();
break;
}
break;
case DISABLED:
port.verifyCanDisable();
break;
}
}
}
// see what's be modified
if (isAnyNotNull(portDTO.getUserAccessControl(),
portDTO.getGroupAccessControl(),
portDTO.getConcurrentlySchedulableTaskCount(),
portDTO.getName(),
portDTO.getComments(),
portDTO.getAllowRemoteAccess())) {
// validate the request
final List<String> requestValidation = validateProposedConfiguration(port, portDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
// ensure the port can be updated
port.verifyCanUpdate();
}
}
private List<String> validateProposedConfiguration(final Port port, final PortDTO portDTO) {
List<String> validationErrors = new ArrayList<>();
if (isNotNull(portDTO.getName()) && portDTO.getName().trim().isEmpty()) {
validationErrors.add("The name of the port must be specified.");
}
if (isNotNull(portDTO.getConcurrentlySchedulableTaskCount()) && portDTO.getConcurrentlySchedulableTaskCount() <= 0) {
validationErrors.add("Concurrent tasks must be a positive integer.");
}
// Although StandardProcessGroup.addIn/OutputPort has the similar validation,
// this validation is necessary to prevent a port becomes public with an existing port name.
if (port instanceof PublicPort) {
final String portName = isNotNull(portDTO.getName()) ? portDTO.getName() : port.getName();
// If there is any port with the same name, but different identifier, throw an error.
if (getPublicPorts().stream()
.anyMatch(p -> portName.equals(p.getName()) && !port.getIdentifier().equals(p.getIdentifier()))) {
throw new IllegalStateException("Public port name must be unique throughout the flow.");
}
}
return validationErrors;
}
@Override
public void verifyPublicPortUniqueness(final String portId, final String portName) {
for (Port port : getPublicPorts()) {
if (portId.equals(port.getIdentifier())) {
throw new IllegalStateException("Public port identifier must be unique throughout the flow.");
} else if(portName.equals(port.getName())) {
throw new IllegalStateException("Public port name must be unique throughout the flow.");
}
}
}
protected abstract Set<Port> getPublicPorts();
protected abstract void handleStateTransition(final Port port, final ScheduledState proposedScheduledState) throws IllegalStateException;
@Override
public Port updatePort(PortDTO portDTO) {
final Port port = locatePort(portDTO.getId());
final ProcessGroup processGroup = port.getProcessGroup();
// ensure we can do this update
verifyUpdate(port, portDTO);
// handle state transition
if (isNotNull(portDTO.getState())) {
final ScheduledState proposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!proposedScheduledState.equals(port.getScheduledState())) {
try {
handleStateTransition(port, proposedScheduledState);
} catch (IllegalStateException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
}
}
}
if (port instanceof PublicPort) {
final PublicPort publicPort = (PublicPort) port;
if (isNotNull(portDTO.getGroupAccessControl())) {
publicPort.setGroupAccessControl(portDTO.getGroupAccessControl());
}
if (isNotNull(portDTO.getUserAccessControl())) {
publicPort.setUserAccessControl(portDTO.getUserAccessControl());
}
}
// update the port
final String name = portDTO.getName();
final String comments = portDTO.getComments();
final Integer concurrentTasks = portDTO.getConcurrentlySchedulableTaskCount();
if (isNotNull(portDTO.getPosition())) {
port.setPosition(new Position(portDTO.getPosition().getX(), portDTO.getPosition().getY()));
}
if (isNotNull(name)) {
port.setName(name);
}
if (isNotNull(comments)) {
port.setComments(comments);
}
if (isNotNull(concurrentTasks)) {
port.setMaxConcurrentTasks(concurrentTasks);
}
processGroup.onComponentModified();
return port;
}
@Override
public void verifyDelete(final String portId) {
final Port inputPort = locatePort(portId);
inputPort.verifyCanDelete();
}
/* setters */
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
}

View File

@ -40,7 +40,9 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.ResourceNotFoundException;
@ -406,6 +408,10 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
if (sourceConnectable == null) {
throw new IllegalArgumentException("The specified source for the connection does not exist");
}
if (sourceConnectable instanceof PublicPort
&& TransferDirection.SEND.equals(((PublicPort) sourceConnectable).getDirection())) {
throw new IllegalArgumentException("The specified source for the connection cannot be connected to local components.");
}
}
if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) {
@ -428,6 +434,10 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
if (destinationConnectable == null) {
throw new IllegalArgumentException("The specified destination for the connection does not exist");
}
if (destinationConnectable instanceof PublicPort
&& TransferDirection.RECEIVE.equals(((PublicPort) destinationConnectable).getDirection())) {
throw new IllegalArgumentException("The specified destination for the connection cannot be connected from local components.");
}
}
}

View File

@ -18,25 +18,18 @@ package org.apache.nifi.web.dao.impl;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.dao.PortDAO;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
public class StandardInputPortDAO extends AbstractPortDAO implements PortDAO {
private FlowController flowController;
private Port locatePort(final String portId) {
protected Port locatePort(final String portId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
Port port = rootGroup.findInputPort(portId);
@ -73,13 +66,18 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
// determine if this is the root group
Port port;
if (group.getParent() == null) {
port = flowController.getFlowManager().createRemoteInputPort(portDTO.getId(), portDTO.getName());
if (group.getParent() == null || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
port = flowController.getFlowManager().createPublicInputPort(portDTO.getId(), portDTO.getName());
} else {
port = flowController.getFlowManager().createLocalInputPort(portDTO.getId(), portDTO.getName());
}
// ensure we can perform the update before we add the processor to the flow
// Unique public port check among all groups.
if (port instanceof PublicPort) {
verifyPublicPortUniqueness(port.getIdentifier(), port.getName());
}
// ensure we can perform the update before we add the port to the flow
verifyUpdate(port, portDTO);
// configure
@ -105,146 +103,31 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
}
@Override
public void verifyUpdate(PortDTO portDTO) {
final Port inputPort = locatePort(portDTO.getId());
verifyUpdate(inputPort, portDTO);
protected Set<Port> getPublicPorts() {
return flowController.getFlowManager().getPublicInputPorts();
}
private void verifyUpdate(final Port inputPort, final PortDTO portDTO) {
if (isNotNull(portDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(inputPort.getScheduledState())) {
// perform the appropriate action
switch (purposedScheduledState) {
@Override
protected void handleStateTransition(final Port port, final ScheduledState proposedScheduledState) throws IllegalStateException {
final ProcessGroup processGroup = port.getProcessGroup();
switch (proposedScheduledState) {
case RUNNING:
processGroup.startInputPort(port);
break;
case STOPPED:
switch (port.getScheduledState()) {
case RUNNING:
inputPort.verifyCanStart();
break;
case STOPPED:
switch (inputPort.getScheduledState()) {
case RUNNING:
inputPort.verifyCanStop();
break;
case DISABLED:
inputPort.verifyCanEnable();
break;
}
processGroup.stopInputPort(port);
break;
case DISABLED:
inputPort.verifyCanDisable();
processGroup.enableInputPort(port);
break;
}
}
break;
case DISABLED:
processGroup.disableInputPort(port);
break;
}
// see what's be modified
if (isAnyNotNull(portDTO.getUserAccessControl(),
portDTO.getGroupAccessControl(),
portDTO.getConcurrentlySchedulableTaskCount(),
portDTO.getName(),
portDTO.getComments())) {
// validate the request
final List<String> requestValidation = validateProposedConfiguration(portDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
// ensure the port can be updated
inputPort.verifyCanUpdate();
}
}
private List<String> validateProposedConfiguration(PortDTO portDTO) {
List<String> validationErrors = new ArrayList<>();
if (isNotNull(portDTO.getName()) && portDTO.getName().trim().isEmpty()) {
validationErrors.add("The name of the port must be specified.");
}
if (isNotNull(portDTO.getConcurrentlySchedulableTaskCount()) && portDTO.getConcurrentlySchedulableTaskCount() <= 0) {
validationErrors.add("Concurrent tasks must be a positive integer.");
}
return validationErrors;
}
@Override
public Port updatePort(PortDTO portDTO) {
Port inputPort = locatePort(portDTO.getId());
// ensure we can do this update
verifyUpdate(inputPort, portDTO);
// handle state transition
if (isNotNull(portDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(inputPort.getScheduledState())) {
try {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
inputPort.getProcessGroup().startInputPort(inputPort);
break;
case STOPPED:
switch (inputPort.getScheduledState()) {
case RUNNING:
inputPort.getProcessGroup().stopInputPort(inputPort);
break;
case DISABLED:
inputPort.getProcessGroup().enableInputPort(inputPort);
break;
}
break;
case DISABLED:
inputPort.getProcessGroup().disableInputPort(inputPort);
break;
}
} catch (IllegalStateException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
}
}
}
if (inputPort instanceof RootGroupPort) {
final RootGroupPort rootPort = (RootGroupPort) inputPort;
if (isNotNull(portDTO.getGroupAccessControl())) {
rootPort.setGroupAccessControl(portDTO.getGroupAccessControl());
}
if (isNotNull(portDTO.getUserAccessControl())) {
rootPort.setUserAccessControl(portDTO.getUserAccessControl());
}
}
// update the port
final String name = portDTO.getName();
final String comments = portDTO.getComments();
final Integer concurrentTasks = portDTO.getConcurrentlySchedulableTaskCount();
if (isNotNull(portDTO.getPosition())) {
inputPort.setPosition(new Position(portDTO.getPosition().getX(), portDTO.getPosition().getY()));
}
if (isNotNull(name)) {
inputPort.setName(name);
}
if (isNotNull(comments)) {
inputPort.setComments(comments);
}
if (isNotNull(concurrentTasks)) {
inputPort.setMaxConcurrentTasks(concurrentTasks);
}
inputPort.getProcessGroup().onComponentModified();
return inputPort;
}
@Override
public void verifyDelete(final String portId) {
final Port inputPort = locatePort(portId);
inputPort.verifyCanDelete();
}
@Override
@ -253,8 +136,4 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
inputPort.getProcessGroup().removeInputPort(inputPort);
}
/* setters */
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
}

View File

@ -18,25 +18,18 @@ package org.apache.nifi.web.dao.impl;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.dao.PortDAO;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
public class StandardOutputPortDAO extends AbstractPortDAO implements PortDAO {
private FlowController flowController;
private Port locatePort(final String portId) {
protected Port locatePort(final String portId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final Port port = rootGroup.findOutputPort(portId);
@ -69,13 +62,18 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
// determine if this is the root group
Port port;
if (group.getParent() == null) {
port = flowController.getFlowManager().createRemoteOutputPort(portDTO.getId(), portDTO.getName());
if (group.getParent() == null || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
port = flowController.getFlowManager().createPublicOutputPort(portDTO.getId(), portDTO.getName());
} else {
port = flowController.getFlowManager().createLocalOutputPort(portDTO.getId(), portDTO.getName());
}
// ensure we can perform the update before we add the processor to the flow
// Unique public port check among all groups.
if (port instanceof PublicPort) {
verifyPublicPortUniqueness(port.getIdentifier(), port.getName());
}
// ensure we can perform the update before we add the port to the flow
verifyUpdate(port, portDTO);
// configure
@ -101,146 +99,31 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
}
@Override
public void verifyUpdate(PortDTO portDTO) {
final Port outputPort = locatePort(portDTO.getId());
verifyUpdate(outputPort, portDTO);
protected Set<Port> getPublicPorts() {
return flowController.getFlowManager().getPublicOutputPorts();
}
private void verifyUpdate(final Port outputPort, final PortDTO portDTO) {
if (isNotNull(portDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(outputPort.getScheduledState())) {
// perform the appropriate action
switch (purposedScheduledState) {
@Override
protected void handleStateTransition(final Port port, final ScheduledState proposedScheduledState) throws IllegalStateException {
final ProcessGroup processGroup = port.getProcessGroup();
switch (proposedScheduledState) {
case RUNNING:
processGroup.startOutputPort(port);
break;
case STOPPED:
switch (port.getScheduledState()) {
case RUNNING:
outputPort.verifyCanStart();
break;
case STOPPED:
switch (outputPort.getScheduledState()) {
case RUNNING:
outputPort.verifyCanStop();
break;
case DISABLED:
outputPort.verifyCanEnable();
break;
}
processGroup.stopOutputPort(port);
break;
case DISABLED:
outputPort.verifyCanDisable();
processGroup.enableOutputPort(port);
break;
}
}
break;
case DISABLED:
processGroup.disableOutputPort(port);
break;
}
// see what's be modified
if (isAnyNotNull(portDTO.getUserAccessControl(),
portDTO.getGroupAccessControl(),
portDTO.getConcurrentlySchedulableTaskCount(),
portDTO.getName(),
portDTO.getComments())) {
// validate the request
final List<String> requestValidation = validateProposedConfiguration(portDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
// ensure the port can be updated
outputPort.verifyCanUpdate();
}
}
private List<String> validateProposedConfiguration(PortDTO portDTO) {
List<String> validationErrors = new ArrayList<>();
if (isNotNull(portDTO.getName()) && portDTO.getName().trim().isEmpty()) {
validationErrors.add("The name of the port must be specified.");
}
if (isNotNull(portDTO.getConcurrentlySchedulableTaskCount()) && portDTO.getConcurrentlySchedulableTaskCount() <= 0) {
validationErrors.add("Concurrent tasks must be a positive integer.");
}
return validationErrors;
}
@Override
public Port updatePort(PortDTO portDTO) {
Port outputPort = locatePort(portDTO.getId());
// ensure we can do this update
verifyUpdate(outputPort, portDTO);
// handle state transition
if (portDTO.getState() != null) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(outputPort.getScheduledState())) {
try {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
outputPort.getProcessGroup().startOutputPort(outputPort);
break;
case STOPPED:
switch (outputPort.getScheduledState()) {
case RUNNING:
outputPort.getProcessGroup().stopOutputPort(outputPort);
break;
case DISABLED:
outputPort.getProcessGroup().enableOutputPort(outputPort);
break;
}
break;
case DISABLED:
outputPort.getProcessGroup().disableOutputPort(outputPort);
break;
}
} catch (IllegalStateException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
}
}
}
if (outputPort instanceof RootGroupPort) {
final RootGroupPort rootPort = (RootGroupPort) outputPort;
if (isNotNull(portDTO.getGroupAccessControl())) {
rootPort.setGroupAccessControl(portDTO.getGroupAccessControl());
}
if (isNotNull(portDTO.getUserAccessControl())) {
rootPort.setUserAccessControl(portDTO.getUserAccessControl());
}
}
// perform the configuration
final String name = portDTO.getName();
final String comments = portDTO.getComments();
final Integer concurrentTasks = portDTO.getConcurrentlySchedulableTaskCount();
if (isNotNull(portDTO.getPosition())) {
outputPort.setPosition(new Position(portDTO.getPosition().getX(), portDTO.getPosition().getY()));
}
if (isNotNull(name)) {
outputPort.setName(name);
}
if (isNotNull(comments)) {
outputPort.setComments(comments);
}
if (isNotNull(concurrentTasks)) {
outputPort.setMaxConcurrentTasks(concurrentTasks);
}
outputPort.getProcessGroup().onComponentModified();
return outputPort;
}
@Override
public void verifyDelete(final String portId) {
final Port outputPort = locatePort(portId);
outputPort.verifyCanDelete();
}
@Override
@ -249,8 +132,4 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
outputPort.getProcessGroup().removeOutputPort(outputPort);
}
/* setters */
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
}

View File

@ -20,7 +20,7 @@ import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
@ -251,7 +251,7 @@ public class TestDataTransferResource {
final DataTransferResource resource = getDataTransferResource();
final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null);
final RootGroupPort port = mock(RootGroupPort.class);
final PublicPort port = mock(PublicPort.class);
doReturn(port).when(serverProtocol).getPort();
doAnswer(invocation -> {
Peer peer = (Peer) invocation.getArguments()[0];
@ -282,7 +282,7 @@ public class TestDataTransferResource {
final DataTransferResource resource = getDataTransferResource();
final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null);
final RootGroupPort port = mock(RootGroupPort.class);
final PublicPort port = mock(PublicPort.class);
doReturn(port).when(serverProtocol).getPort();
doAnswer(invocation -> 0).when(port).receiveFlowFiles(any(Peer.class), any());

View File

@ -685,6 +685,7 @@
<include>${staging.dir}/css/shell.css</include>
<include>${staging.dir}/css/dialog.css</include>
<include>${staging.dir}/css/new-processor-dialog.css</include>
<include>${staging.dir}/css/new-port-dialog.css</include>
<include>${staging.dir}/css/new-controller-service-dialog.css</include>
<include>${staging.dir}/css/new-reporting-task-dialog.css</include>
<include>${staging.dir}/css/graph.css</include>

View File

@ -165,6 +165,9 @@
<div class="setting-field">
<div id="connection-source-group-name"></div>
</div>
<div class="setting-field">
<div id="connection-remote-source-url" class="hidden"></div>
</div>
</div>
<div id="relationship-names-container" class="hidden">
<div class="setting-name">For relationships</div>
@ -207,6 +210,9 @@
<div class="setting-field">
<div id="connection-destination-group-name"></div>
</div>
<div class="setting-field">
<div id="connection-remote-destination-url" class="hidden"></div>
</div>
</div>
</div>
</div>

View File

@ -23,5 +23,14 @@
<input id="new-port-name" type="text"/>
</div>
</div>
<div id="port-allow-remote-access-setting" class="setting">
<div class="setting-name">
<span id="port-allow-remote-access-label"></span>
<div id="port-allow-remote-access-info" class="fa fa-question-circle" alt="Info"></div>
</div>
<div class="setting-field">
<div id="port-allow-remote-access"></div>
</div>
</div>
</div>
</div>

View File

@ -29,6 +29,22 @@
<span id="read-only-port-id"></span>
</div>
</div>
<div class="port-setting">
<div class="setting-name">Allow Remote Access
<div class="fa fa-question-circle" alt="Info" title="Whether this port can be accessed as a RemoteGroupPort via Site-to-Site protocol."></div>
</div>
<div class="setting-field">
<span id="read-only-port-allow-remote-access"></span>
</div>
</div>
<div class="port-setting">
<div class="setting-name">Concurrent Tasks
<div class="fa fa-question-circle" alt="Info" title="The number of tasks that should be concurrently scheduled for this port."></div>
</div>
<div class="setting-field">
<span id="read-only-port-concurrent-tasks"></span>
</div>
</div>
<div class="port-setting">
<div class="setting-name">Comments</div>
<div class="setting-field">

View File

@ -27,11 +27,14 @@
<div id="read-only-connection-source" class="ellipsis"></div>
</div>
</div>
<div class="setting">
<div id="read-only-connection-source-group" class="setting">
<div class="setting-name">Within group</div>
<div class="setting-field">
<div id="read-only-connection-source-group-name"></div>
</div>
<div class="setting-field">
<div id="read-only-connection-remote-source-url" class="hidden"></div>
</div>
</div>
<div id="read-only-relationship-names-container" class="setting">
<div class="setting-name">
@ -51,11 +54,14 @@
<div id="read-only-connection-target" class="ellipsis"></div>
</div>
</div>
<div class="setting">
<div id="read-only-connection-target-group" class="setting">
<div class="setting-name">Within group</div>
<div class="setting-field">
<div id="read-only-connection-target-group-name"></div>
</div>
<div class="setting-field">
<div id="read-only-connection-remote-target-url" class="hidden"></div>
</div>
</div>
</div>
</div>

View File

@ -31,6 +31,7 @@
@import url(shell.css);
@import url(dialog.css);
@import url(new-processor-dialog.css);
@import url(new-port-dialog.css);
@import url(new-controller-service-dialog.css);
@import url(new-reporting-task-dialog.css);
@import url(graph.css);

View File

@ -104,7 +104,7 @@ text.stats-value {
font-weight: bold;
}
text.stats-value tspan.size, text.stats-value tspan.size {
text.stats-value tspan.size, text.stats-value tspan.size, text.stats-value tspan.public-ports {
font-weight: normal;
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
New port dialog at in a child process group.
*/
#new-port-dialog {
min-height:280px;
height: 280px;
width: 320px;
min-width: 320px;
}

View File

@ -22,7 +22,7 @@
z-index: 1301;
display: none;
width: 400px;
height: 375px;
height: 470px;
}
#port-name {

View File

@ -25,8 +25,9 @@
'nf.Storage',
'nf.Graph',
'nf.CanvasUtils',
'nf.ErrorHandler'],
function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler) {
'nf.ErrorHandler',
'nf.Common'],
function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler, nfDialog) {
return (nf.ng.InputPortComponent = factory($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler));
});
} else if (typeof exports === 'object' && typeof module === 'object') {
@ -37,7 +38,8 @@
require('nf.Storage'),
require('nf.Graph'),
require('nf.CanvasUtils'),
require('nf.ErrorHandler')));
require('nf.ErrorHandler'),
require('nf.Dialog')));
} else {
nf.ng.InputPortComponent = factory(root.$,
root.nf.Client,
@ -45,9 +47,10 @@
root.nf.Storage,
root.nf.Graph,
root.nf.CanvasUtils,
root.nf.ErrorHandler);
root.nf.ErrorHandler,
root.nf.Dialog);
}
}(this, function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler) {
}(this, function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler, nfDialog) {
'use strict';
return function (serviceProvider) {
@ -57,9 +60,10 @@
* Create the input port and add to the graph.
*
* @argument {string} portName The input port name.
* @argument {boolean} allowRemoteAccess Whether the input port can be accessed via S2S.
* @argument {object} pt The point that the input port was dropped.
*/
var createInputPort = function (portName, pt) {
var createInputPort = function (portName, allowRemoteAccess, pt) {
var inputPortEntity = {
'revision': nfClient.getRevision({
'revision': {
@ -69,6 +73,7 @@
'disconnectedNodeAcknowledged': nfStorage.isDisconnectionAcknowledged(),
'component': {
'name': portName,
'allowRemoteAccess': allowRemoteAccess,
'position': {
'x': pt.x,
'y': pt.y
@ -129,7 +134,7 @@
// configure the new port dialog
this.getElement().modal({
scrollableContentStyle: 'scrollable',
headerText: 'Add Port',
headerText: 'Add Input Port',
handler: {
close: function () {
$('#new-port-name').val('');
@ -152,6 +157,32 @@
* Show the modal.
*/
show: function () {
$('#new-port-dialog > .dialog-header > .dialog-header-text').text('Add Input Port')
var optionLocal = {
text: 'Local connections',
value: 'false',
description: 'Receive FlowFiles from components in parent process groups'
};
var optionRemote = {
text: 'Remote connections (site-to-site)',
value: 'true',
description: 'Receive FlowFiles from remote process group (site-to-site)'
};
// initialize the remote access combo
$('#port-allow-remote-access-label').text('Receive from');
$('#port-allow-remote-access-info').attr('title', 'Specify where FlowFiles are received from.');
if (nfCanvasUtils.getParentGroupId() === null) {
$('#port-allow-remote-access-setting').hide();
} else {
$('#port-allow-remote-access-setting').show();
}
$('#port-allow-remote-access').combo({
options: [optionLocal, optionRemote]
});
this.getElement().modal('show');
},
@ -219,9 +250,10 @@
var addInputPort = function () {
// get the name of the input port and clear the textfield
var portName = $('#new-port-name').val();
var allowRemoteAccess = $('#port-allow-remote-access').combo('getSelectedOption').value;
// create the input port
createInputPort(portName, pt);
createInputPort(portName, allowRemoteAccess, pt);
};
this.modal.update('setButtonModel', [{

View File

@ -25,8 +25,9 @@
'nf.Storage',
'nf.Graph',
'nf.CanvasUtils',
'nf.ErrorHandler'],
function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler) {
'nf.ErrorHandler',
'nf.Common'],
function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler, nfDialog) {
return (nf.ng.OutputPortComponent = factory($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler));
});
} else if (typeof exports === 'object' && typeof module === 'object') {
@ -37,7 +38,8 @@
require('nf.Storage'),
require('nf.Graph'),
require('nf.CanvasUtils'),
require('nf.ErrorHandler')));
require('nf.ErrorHandler'),
require('nf.Dialog')));
} else {
nf.ng.OutputPortComponent = factory(root.$,
root.nf.Client,
@ -45,9 +47,10 @@
root.nf.Storage,
root.nf.Graph,
root.nf.CanvasUtils,
root.nf.ErrorHandler);
root.nf.ErrorHandler,
root.nf.Dialog);
}
}(this, function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler) {
}(this, function ($, nfClient, nfBirdseye, nfStorage, nfGraph, nfCanvasUtils, nfErrorHandler, nfDialog) {
'use strict';
return function (serviceProvider) {
@ -57,9 +60,10 @@
* Create the input port and add to the graph.
*
* @argument {string} portName The output port name.
* @argument {boolean} allowRemoteAccess Whether the output port can be accessed via S2S.
* @argument {object} pt The point that the output port was dropped.
*/
var createOutputPort = function (portName, pt) {
var createOutputPort = function (portName, allowRemoteAccess, pt) {
var outputPortEntity = {
'revision': nfClient.getRevision({
'revision': {
@ -69,6 +73,7 @@
'disconnectedNodeAcknowledged': nfStorage.isDisconnectionAcknowledged(),
'component': {
'name': portName,
'allowRemoteAccess': allowRemoteAccess,
'position': {
'x': pt.x,
'y': pt.y
@ -143,6 +148,32 @@
* Show the modal.
*/
show: function () {
$('#new-port-dialog > .dialog-header > .dialog-header-text').text('Add Output Port')
var optionLocal = {
text: 'Local connections',
value: 'false',
description: 'Send FlowFiles to components in parent process groups.'
};
var optionRemote = {
text: 'Remote connections (site-to-site)',
value: 'true',
description: 'Send FlowFiles to remote process group (site-to-site).'
};
// initialize the remote access combo
$('#port-allow-remote-access-label').text('Send to');
$('#port-allow-remote-access-info').attr('title', 'Specify where FlowFiles are sent.');
if (nfCanvasUtils.getParentGroupId() === null) {
$('#port-allow-remote-access-setting').hide();
} else {
$('#port-allow-remote-access-setting').show();
}
$('#port-allow-remote-access').combo({
options: [optionLocal, optionRemote]
});
this.getElement().modal('show');
},
@ -210,9 +241,10 @@
var addOutputPort = function () {
// get the name of the output port and clear the textfield
var portName = $('#new-port-name').val();
var allowRemoteAccess = $('#port-allow-remote-access').combo('getSelectedOption').value;
// create the output port
createOutputPort(portName, pt);
createOutputPort(portName, allowRemoteAccess, pt);
};
this.modal.update('setButtonModel', [{

View File

@ -324,7 +324,12 @@
// show the output port options
var options = [];
var publicOutputPortCount = 0;
$.each(processGroupContents.outputPorts, function (i, outputPort) {
if (outputPort.allowRemoteAccess) {
publicOutputPortCount++;
return;
}
// require explicit access to the output port as it's the source of the connection
if (outputPort.permissions.canRead && outputPort.permissions.canWrite) {
var component = outputPort.component;
@ -363,9 +368,10 @@
deferred.resolve();
} else {
var message = '\'' + nfCommon.escapeHtml(processGroupName) + '\' does not have any output ports.';
if (nfCommon.isEmpty(processGroupContents.outputPorts) === false) {
message = 'Not authorized for any output ports in \'' + nfCommon.escapeHtml(processGroupName) + '\'.';
var message = '\'' + nfCommon.escapeHtml(processGroupName) + '\' does not have any local output ports.';
if (nfCommon.isEmpty(processGroupContents.outputPorts) === false
&& processGroupContents.outputPorts.length > publicOutputPortCount) {
message = 'Not authorized for any local output ports in \'' + nfCommon.escapeHtml(processGroupName) + '\'.';
}
// there are no output ports for this process group
@ -439,6 +445,8 @@
$('#connection-source-component-id').val(remoteProcessGroup.id);
// populate the group details
$('#connection-source-group div.setting-name').text('Within Remote Group')
$('#connection-remote-source-url').text(remoteProcessGroup.targetUri).show();
$('#connection-source-group-id').val(remoteProcessGroup.id);
$('#connection-source-group-name').text(remoteProcessGroup.name);
@ -559,11 +567,13 @@
// show the input port options
var options = [];
$.each(processGroupContents.inputPorts, function (i, inputPort) {
options.push({
text: inputPort.permissions.canRead ? inputPort.component.name : inputPort.id,
value: inputPort.id,
description: inputPort.permissions.canRead ? nfCommon.escapeHtml(inputPort.component.comments) : null
});
if (!inputPort.allowRemoteAccess) {
options.push({
text: inputPort.permissions.canRead ? inputPort.component.name : inputPort.id,
value: inputPort.id,
description: inputPort.permissions.canRead ? nfCommon.escapeHtml(inputPort.component.comments) : null
});
}
});
// only proceed if there are output ports
@ -596,7 +606,7 @@
// there are no relationships for this processor
nfDialog.showOkDialog({
headerText: 'Connection Configuration',
dialogContent: '\'' + nfCommon.escapeHtml(processGroupName) + '\' does not have any input ports.'
dialogContent: '\'' + nfCommon.escapeHtml(processGroupName) + '\' does not have any local input ports.'
});
// reset the dialog
@ -664,6 +674,8 @@
$('#connection-destination-component-id').val(remoteProcessGroup.id);
// populate the group details
$('#connection-destination-group div.setting-name').text('Within Remote Group')
$('#connection-remote-destination-url').text(remoteProcessGroup.targetUri).show();
$('#connection-destination-group-id').val(remoteProcessGroup.id);
$('#connection-destination-group-name').text(remoteProcessGroup.name);
@ -709,6 +721,13 @@
$('#connection-source-group-id').val(sourceData.id);
$('#connection-source-group-name').text(sourceName);
if (nfCanvasUtils.isRemoteProcessGroup(source)) {
$('#connection-source-group div.setting-name').text('Within Remote Group');
if (sourceData.permissions.canRead) {
$('#connection-remote-source-url').text(sourceData.component.targetUri).show();
}
}
// resolve the deferred
deferred.resolve();
}).promise();
@ -1319,6 +1338,12 @@
return;
}
// reset labels
$('#connection-source-group div.setting-name').text('Within Group')
$('#connection-destination-group div.setting-name').text('Within Group')
$('#connection-remote-source-url').hide();
$('#connection-remote-destination-url').hide();
// initialize the connection dialog
$.when(initializeSourceNewConnectionDialog(source), initializeDestinationNewConnectionDialog(destination)).done(function () {
// set the default values
@ -1371,6 +1396,12 @@
destination = d3.select('#id-' + destinationComponentId);
}
// reset labels
$('#connection-source-group div.setting-name').text('Within Group')
$('#connection-destination-group div.setting-name').text('Within Group')
$('#connection-remote-source-url').hide();
$('#connection-remote-destination-url').hide();
// initialize the connection dialog
$.when(initializeSourceEditConnectionDialog(source), initializeDestinationEditConnectionDialog(destination, connection.destination)).done(function () {
var availableRelationships = connection.availableRelationships;

View File

@ -1651,10 +1651,10 @@
$('#component-policy-target')
.combo('setOptionEnabled', {
value: 'write-receive-data'
}, nfCanvasUtils.isInputPort(selection) && nfCanvasUtils.getParentGroupId() === null)
}, nfCanvasUtils.isInputPort(selection) && d.allowRemoteAccess === true)
.combo('setOptionEnabled', {
value: 'write-send-data'
}, nfCanvasUtils.isOutputPort(selection) && nfCanvasUtils.getParentGroupId() === null)
}, nfCanvasUtils.isOutputPort(selection) && d.allowRemoteAccess === true)
.combo('setOptionEnabled', {
value: 'read-data'
}, !nfCanvasUtils.isLabel(selection))

View File

@ -172,8 +172,8 @@
portEnableStyle = 'checkbox-unchecked';
}
// show concurrent tasks for root groups only
if (nfCanvasUtils.getParentGroupId() === null) {
// show concurrent tasks for site-to-site port only
if (selectionData.allowRemoteAccess === true) {
$('#port-concurrent-task-container').show();
} else {
$('#port-concurrent-task-container').hide();

View File

@ -77,6 +77,8 @@
// populate the port settings
nfCommon.populateField('read-only-port-name', selectionData.component.name);
nfCommon.populateField('read-only-port-id', selectionData.id);
nfCommon.populateField('read-only-port-allow-remote-access', true === selectionData.allowRemoteAccess ? 'true' : 'false');
nfCommon.populateField('read-only-port-concurrent-tasks', selectionData.component.concurrentlySchedulableTaskCount);
nfCommon.populateField('read-only-port-comments', selectionData.component.comments);
// show the details

View File

@ -21,27 +21,30 @@
if (typeof define === 'function' && define.amd) {
define(['jquery',
'd3',
'nf.Connection',
'nf.Common',
'nf.Client',
'nf.CanvasUtils'],
function ($, d3, nfCommon, nfClient, nfCanvasUtils) {
return (nf.Port = factory($, d3, nfCommon, nfClient, nfCanvasUtils));
function ($, d3, nfConnection, nfCommon, nfClient, nfCanvasUtils) {
return (nf.Port = factory($, d3, nfConnection, nfCommon, nfClient, nfCanvasUtils));
});
} else if (typeof exports === 'object' && typeof module === 'object') {
module.exports = (nf.Port =
factory(require('jquery'),
require('d3'),
require('nf.Connection'),
require('nf.Common'),
require('nf.Client'),
require('nf.CanvasUtils')));
} else {
nf.Port = factory(root.$,
root.d3,
root.nf.Connection,
root.nf.Common,
root.nf.Client,
root.nf.CanvasUtils);
}
}(this, function ($, d3, nfCommon, nfClient, nfCanvasUtils) {
}(this, function ($, d3, nfConnection, nfCommon, nfClient, nfCanvasUtils) {
'use strict';
var nfConnectable;
@ -62,6 +65,10 @@
height: 80
};
var dimensions = function (d) {
return d.allowRemoteAccess === true ? remotePortDimensions : portDimensions;
};
// ----------------------------
// ports currently on the graph
// ----------------------------
@ -94,6 +101,22 @@
});
};
/**
* Utility method to check if the target port is a local port.
*/
var isLocalPort = function (d) {
return d.allowRemoteAccess !== true;
};
/**
* Utility method to calculate offset y position based on whether this port is remotely accessible.
*/
var offsetY = function(y) {
return function (d) {
return y + (isLocalPort(d) ? 0 : OFFSET_VALUE);
};
};
/**
* Renders the ports in the specified selection.
*
@ -150,30 +173,22 @@
'stroke-width': 0
});
var offset = 0;
// conditionally render the remote banner
if (nfCanvasUtils.getParentGroupId() === null) {
offset = OFFSET_VALUE;
// port remote banner
port.append('rect')
.attrs({
'class': 'remote-banner',
'width': function (d) {
return d.dimensions.width;
},
'height': offset,
'fill': '#e3e8eb'
});
}
// port remote banner
port.append('rect')
.attrs({
'class': 'remote-banner',
'width': remotePortDimensions.width,
'height': OFFSET_VALUE,
'fill': '#e3e8eb'
})
.classed('hidden', isLocalPort);
// port icon
port.append('text')
.attrs({
'class': 'port-icon',
'x': 10,
'y': 38 + offset
'y': offsetY(38)
})
.text(function (d) {
if (d.portType === 'INPUT_PORT') {
@ -187,7 +202,7 @@
port.append('text')
.attrs({
'x': 70,
'y': 25 + offset,
'y': offsetY(25),
'width': 95,
'height': 30,
'class': 'port-name'
@ -218,12 +233,22 @@
updated.select('rect.border')
.classed('unauthorized', function (d) {
return d.permissions.canRead === false;
})
.attrs({
'height': function(d) {
return d.dimensions.height;
}
});
// port body authorization
updated.select('rect.body')
.classed('unauthorized', function (d) {
return d.permissions.canRead === false;
})
.attrs({
'height': function(d) {
return d.dimensions.height;
}
});
updated.each(function (portData) {
@ -236,51 +261,44 @@
// if this process group is visible, render everything
if (port.classed('visible')) {
if (details.empty()) {
// Adding details when the port is rendered for the 1st time, or it becomes visible due to permission updates.
details = port.append('g').attr('class', 'port-details');
var offset = 0;
if (nfCanvasUtils.getParentGroupId() === null) {
offset = OFFSET_VALUE;
// port transmitting icon
details.append('text')
.attrs({
'class': 'port-transmission-icon',
'x': 10,
'y': 18
})
.classed('hidden', isLocalPort);
// port transmitting icon
details.append('text')
.attrs({
'class': 'port-transmission-icon',
'x': 10,
'y': 18
});
// bulletin background
details.append('rect')
.attrs({
'class': 'bulletin-background',
'x': remotePortDimensions.width - OFFSET_VALUE,
'width': OFFSET_VALUE,
'height': OFFSET_VALUE
})
.classed('hidden', isLocalPort);
// bulletin background
details.append('rect')
.attrs({
'class': 'bulletin-background',
'x': function (d) {
return portData.dimensions.width - offset;
},
'width': offset,
'height': offset
});
// bulletin icon
details.append('text')
.attrs({
'class': 'bulletin-icon',
'x': function (d) {
return portData.dimensions.width - 18;
},
'y': 18
})
.text('\uf24a');
}
// bulletin icon
details.append('text')
.attrs({
'class': 'bulletin-icon',
'x': remotePortDimensions.width - 18,
'y': 18
})
.text('\uf24a')
.classed('hidden', isLocalPort);
// run status icon
details.append('text')
.attrs({
'class': 'run-status-icon',
'x': 50,
'y': function () {
return 25 + offset;
}
'y': offsetY(25)
});
// --------
@ -302,7 +320,7 @@
details.append('text')
.attrs({
'class': 'active-thread-count-icon',
'y': 43 + offset
'y': offsetY(43)
})
.text('\ue83f');
@ -310,11 +328,30 @@
details.append('text')
.attrs({
'class': 'active-thread-count',
'y': 43 + offset
'y': offsetY(43)
});
}
if (portData.permissions.canRead) {
// Update the remote port banner, these are needed when remote access is changed.
port.select('rect.remote-banner')
.classed('hidden', isLocalPort);
port.select('text.port-icon')
.attrs({
'y': offsetY(38)
});
details.select('text.port-transmission-icon')
.classed('hidden', isLocalPort);
details.select('rect.bulletin-background')
.classed('hidden', isLocalPort);
details.select('rect.bulletin-icon')
.classed('hidden', isLocalPort);
// update the port name
port.select('text.port-name')
.each(function (d) {
@ -332,13 +369,16 @@
} else {
nfCanvasUtils.multilineEllipsis(portName, 2, name);
}
}).attrs({
'y': offsetY(25)
}).append('title').text(function (d) {
return d.component.name;
});
return d.component.name;
});
// update the port comments
port.select('path.component-comments')
.style('visibility', nfCommon.isBlank(portData.component.comments) ? 'hidden' : 'visible')
.attr('transform', 'translate(' + (portData.dimensions.width - 2) + ', ' + (portData.dimensions.height - 10) + ')')
.each(function () {
// get the tip
var tip = d3.select('#comments-tip-' + portData.id);
@ -379,6 +419,12 @@
// populate the stats
port.call(updatePortStatus);
// Update connections to update anchor point positions those may have been updated by changing ports remote accessibility.
nfConnection.getComponentConnections(portData.id).forEach(function (connection){
nfConnection.refresh(connection.id);
});
} else {
if (portData.permissions.canRead) {
// update the port name
@ -439,7 +485,8 @@
family = 'flowfont';
}
return family;
}
},
'y': offsetY(25)
})
.text(function (d) {
var img = '';
@ -533,6 +580,9 @@
offset = off;
});
port.select('text.active-thread-count-icon').attr('y', offsetY(43));
port.select('text.active-thread-count').attr('y', offsetY(43));
// ---------
// bulletins
// ---------
@ -615,12 +665,6 @@
selectAll = nfCommon.isDefinedAndNotNull(options.selectAll) ? options.selectAll : selectAll;
}
// determine the appropriate dimensions for this port
var dimensions = portDimensions;
if (nfCanvasUtils.getParentGroupId() === null) {
dimensions = remotePortDimensions;
}
// get the current time
var now = new Date().getTime();
@ -630,7 +674,7 @@
// add the port
portMap.set(portEntity.id, $.extend({
type: 'Port',
dimensions: dimensions,
dimensions: dimensions(portEntity),
status: {
activeThreadCount: 0
}
@ -672,12 +716,6 @@
overrideRevisionCheck = nfCommon.isDefinedAndNotNull(options.overrideRevisionCheck) ? options.overrideRevisionCheck : overrideRevisionCheck;
}
// determine the appropriate dimensions for this port
var dimensions = portDimensions;
if (nfCanvasUtils.getParentGroupId() === null) {
dimensions = remotePortDimensions;
}
var set = function (proposedPortEntity) {
var currentPortEntity = portMap.get(proposedPortEntity.id);
@ -686,7 +724,7 @@
// add the port
portMap.set(proposedPortEntity.id, $.extend({
type: 'Port',
dimensions: dimensions,
dimensions: dimensions(proposedPortEntity),
status: {
activeThreadCount: 0
}

View File

@ -722,6 +722,12 @@
'class': 'ports'
});
// in (remote)
inText.append('tspan')
.attrs({
'class': 'public-ports'
});
// read/write value
processGroupStatsValue.append('text')
.attrs({
@ -748,6 +754,12 @@
'class': 'ports'
});
// out ports (remote)
outText.append('tspan')
.attrs({
'class': 'public-ports'
});
// out count
outText.append('tspan')
.attrs({
@ -1319,6 +1331,12 @@
return ' ' + String.fromCharCode(8594) + ' ' + d.inputPortCount;
});
// in ports value (remote)
updated.select('text.process-group-in tspan.public-ports')
.text(function (d) {
return d.publicInputPortCount > 0 ? ' (' + d.publicInputPortCount + ' remote)' : '';
});
// read/write value
updated.select('text.process-group-read-write')
.text(function (d) {
@ -1328,13 +1346,19 @@
// out ports value
updated.select('text.process-group-out tspan.ports')
.text(function (d) {
return d.outputPortCount + ' ' + String.fromCharCode(8594) + ' ';
return d.outputPortCount;
});
// out ports value (remote)
updated.select('text.process-group-out tspan.public-ports')
.text(function (d) {
return d.publicOutputPortCount > 0 ? ' (' + d.publicOutputPortCount + ' remote) ' : '';
});
// out count value
updated.select('text.process-group-out tspan.count')
.text(function (d) {
return nfCommon.substringBeforeFirst(d.status.aggregateSnapshot.output, ' ');
return ' ' + String.fromCharCode(8594) + ' ' + nfCommon.substringBeforeFirst(d.status.aggregateSnapshot.output, ' ');
});
// out size value

View File

@ -134,6 +134,8 @@
$('#read-only-connection-source-label').text('From output');
$('#read-only-connection-source').text(source.name).attr('title', source.name);
$('#read-only-connection-source-group-name').text(remoteProcessGroup.name);
$('#read-only-connection-source-group div.setting-name').text('Within Remote Group');
$('#read-only-connection-remote-source-url').text(remoteProcessGroup.targetUri).show();
deferred.resolve();
}).fail(function (xhr, status, error) {
@ -142,6 +144,7 @@
$('#read-only-connection-source-label').text('From output');
$('#read-only-connection-source').text(source.name).attr('title', source.name);
$('#read-only-connection-source-group-name').text(source.groupId);
$('#read-only-connection-source-group div.setting-name').text('Within Remote Group');
deferred.resolve();
} else {
@ -294,6 +297,8 @@
$('#read-only-connection-target-label').text('To input');
$('#read-only-connection-target').text(destination.name).attr('title', destination.name);
$('#read-only-connection-target-group-name').text(remoteProcessGroup.name);
$('#read-only-connection-target-group div.setting-name').text('Within Remote Group');
$('#read-only-connection-remote-target-url').text(remoteProcessGroup.targetUri).show();
deferred.resolve();
}).fail(function (xhr, status, error) {
@ -302,6 +307,7 @@
$('#read-only-connection-target-label').text('To input');
$('#read-only-connection-target').text(destination.name).attr('title', destination.name);
$('#read-only-connection-target-group-name').text(destination.groupId);
$('#read-only-connection-target-group div.setting-name').text('Within Remote Group');
deferred.resolve();
} else {
@ -416,12 +422,16 @@
// clear the connection source details
$('#read-only-connection-source-label').text('');
$('#read-only-connection-source').empty();
$('#read-only-connection-source-group div.setting-name').text('Within Group')
$('#read-only-connection-source-group-name').text('');
$('#read-only-connection-remote-source-url').text('').hide();
// clear the connection target details
$('#read-only-connection-target-label').text('');
$('#read-only-connection-target').empty();
$('#read-only-connection-target-group div.setting-name').text('Within Group')
$('#read-only-connection-target-group-name').text('');
$('#read-only-connection-remote-target-url').text('').hide();
// clear the relationship details
$('#read-only-relationship-names').css('border-width', '0').empty();