diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java index ed88dc11e8..de5d6cfc95 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java @@ -91,6 +91,11 @@ public enum CommandOption { POS_X("px", "posX", "The x coordinate of a position", true), POS_Y("py", "posY", "The y coordinate of a position", true), + SOURCE_PG("sourcePg", "source-pg", "The ID of the source process group", true), + DESTINATION_PG("destPg", "destination-pg", "The ID of the destination process group", true), + SOURCE_OUTPUT_PORT("sourceOutput", "source-output-port", "The name of the output port in the source process group", true), + DESTINATION_INPUT_PORT("destInput", "destination-input-port", "The name of the input port in the destination process group", true), + // NiFi - Controller Services CS_ID("cs", "controllerServiceId", "The id of a controller service", true), diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java index 91c063ec0e..af24fa011a 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java @@ -60,6 +60,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetInheritedParamCon import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParam; import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParamProviderProperty; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGChangeVersion; +import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGConnect; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreate; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreateControllerService; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices; @@ -123,6 +124,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup { commands.add(new UpdateRegistryClient()); commands.add(new GetRegistryClientId()); commands.add(new PGImport()); + commands.add(new PGConnect()); commands.add(new PGStart()); commands.add(new PGStop()); commands.add(new PGCreate()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGConnect.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGConnect.java new file mode 100644 index 0000000000..c7d34227ca --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGConnect.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.pg; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.StringResult; +import org.apache.nifi.web.api.dto.ConnectableDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; + +import java.io.IOException; +import java.util.Properties; +import java.util.Set; + +/** + * Command to stop the components of a process group. + */ +public class PGConnect extends AbstractNiFiCommand { + + public PGConnect() { + super("pg-connect", StringResult.class); + } + + @Override + public String getDescription() { + return "Connects the output port of the source process group to the input port of a destination process group."; + } + + @Override + protected void doInitialize(final Context context) { + addOption(CommandOption.SOURCE_PG.createOption()); + addOption(CommandOption.SOURCE_OUTPUT_PORT.createOption()); + addOption(CommandOption.DESTINATION_PG.createOption()); + addOption(CommandOption.DESTINATION_INPUT_PORT.createOption()); + } + + @Override + public StringResult doExecute(final NiFiClient client, final Properties properties) + throws NiFiClientException, IOException, MissingOptionException { + + final String sourcePgId = getRequiredArg(properties, CommandOption.SOURCE_PG); + final String sourceOutputPort = getRequiredArg(properties, CommandOption.SOURCE_OUTPUT_PORT); + final String destinationPgId = getRequiredArg(properties, CommandOption.DESTINATION_PG); + final String destinationInputPort = getRequiredArg(properties, CommandOption.DESTINATION_INPUT_PORT); + + PortEntity source = null; + PortEntity destination = null; + + final FlowClient pgClient = client.getFlowClient(); + + final ProcessGroupFlowEntity sourcePgEntity = pgClient.getProcessGroup(sourcePgId); + final ProcessGroupFlowEntity destinationPgEntity = pgClient.getProcessGroup(destinationPgId); + + final String parentPgId = sourcePgEntity.getProcessGroupFlow().getParentGroupId(); + if(!parentPgId.equals(destinationPgEntity.getProcessGroupFlow().getParentGroupId())) { + throw new IOException("The source process group and the destination process group are not at the same level"); + } + + // retrieving the ID of the output port based on its name in source process group + Set outputPorts = sourcePgEntity.getProcessGroupFlow().getFlow().getOutputPorts(); + for(PortEntity outputPort : outputPorts) { + if(outputPort.getComponent().getName().equals(sourceOutputPort)) { + source = outputPort; + break; + } + } + if(source == null) { + throw new IOException("Unable to find an output port with the name '" + sourceOutputPort + "' in the source process group"); + } + + // retrieving the ID of the output port based on its name in source process group + Set inputPorts = destinationPgEntity.getProcessGroupFlow().getFlow().getInputPorts(); + for(PortEntity inputPort : inputPorts) { + if(inputPort.getComponent().getName().equals(destinationInputPort)) { + destination = inputPort; + break; + } + } + if(destination == null) { + throw new IOException("Unable to find an input port with the name '" + destinationInputPort + "' in the destination process group"); + } + + final ConnectionEntity connectionEntity = new ConnectionEntity(); + + connectionEntity.setDestinationGroupId(destinationPgId); + connectionEntity.setDestinationId(destination.getId()); + connectionEntity.setDestinationType(destination.getPortType()); + + connectionEntity.setSourceGroupId(sourcePgId); + connectionEntity.setSourceId(source.getId()); + connectionEntity.setSourceType(source.getPortType()); + + final RevisionDTO revisionDto = new RevisionDTO(); + revisionDto.setClientId(getClass().getName()); + revisionDto.setVersion(0L); + connectionEntity.setRevision(revisionDto); + + final ConnectionDTO connectionDto = new ConnectionDTO(); + connectionDto.setDestination(createConnectableDTO(destination)); + connectionDto.setSource(createConnectableDTO(source)); + connectionDto.setParentGroupId(parentPgId); + connectionEntity.setComponent(connectionDto); + + final ConnectionClient connectionClient = client.getConnectionClient(); + final ConnectionEntity createdEntity = connectionClient.createConnection(parentPgId, connectionEntity); + return new StringResult(createdEntity.getId(), getContext().isInteractive()); + } + + private ConnectableDTO createConnectableDTO(final PortEntity port) { + final ConnectableDTO dto = new ConnectableDTO(); + dto.setGroupId(port.getComponent().getParentGroupId()); + dto.setId(port.getId()); + dto.setName(port.getComponent().getName()); + dto.setRunning("RUNNING".equalsIgnoreCase(port.getComponent().getState())); + dto.setType(port.getPortType()); + return dto; + } + +}